In the previous post, I got basic message consumption working, even with some form of message acknowledgement, but I wasn't quite finished. I had a few things that I needed to fix before I could move forward.
It was good that I could consume a message, but I had no way of being able to wait for a new message. I made the old code return None if there were no available messages.
To address this, I added a
tokio::sync::Notify to a new
struct. Using this
Notify. If there was no message in the queue, the stream would wait for the notify signal. When publishing messages,
I would call
notify_one() to alert any waiting consumers.
If a connection acquired a message, it would not handled again by another consumer unless it was "negatively acknowledged". Only then would the message go back to the available queue. However, if a connection exited without acknowledging the message, it would never be available again. In an "at least once" messaging system, we have no way of knowing that the message was processed, so it's safer to assume it wasn't and re-queue it.
To do so, I created a
ConnectionState struct that stores the current list of acquired messages. On acknowledgement, the message ID
is removed from the list. On connection failure, the message IDs would be iterated over and placed back to the ready state.
Currently, all state is stored in memory. Not great if we want to scale up and survive restarts. To solve this, I am introducing
sled database. Sled offers basic transactional key-value b-trees and nothing more.
I create a database and add it to the shared state of the app. This database will have 2 trees, for now, a list of available messages and a list of acquired messages.
I got messages being saved into the published queue, but I didn't get them being moved into an acquired queue just yet.
I need to fix the message acquisition so I can remove the previous queue code and only use the database. Afterwards, I want to design a few different and customisable queuing structures:
Eventually, I wrote about this on twitter1,2, I want to be able to deploy this in a distributed configuration. This will add some more complications but make it more interesting! I think I need to update my thumbnails because this is no longer just "a message queue in Rust with QUIC". It's a "distributed message queue in Rust with QUIC and Raft" :D
You can follow along with the code in this post on GitHub