Conrad Ludgate

Broke But Quick 4

This blog series is supplementary to my new live programming YouTube series. Watch me program this live

In the previous post, I got basic message consumption working, even with some form of message acknowledgement, but I wasn't quite finished. I had a few things that I needed to fix before I could move forward.

Waiting for messages

It was good that I could consume a message, but I had no way of being able to wait for a new message. I made the old code return None if there were no available messages.

To address this, I added a tokio::sync::Notify to a new QueueState struct. Using this Notify. If there was no message in the queue, the stream would wait for the notify signal. When publishing messages, I would call notify_one() to alert any waiting consumers.

Automatic Negative Acknowledgements

If a connection acquired a message, it would not handled again by another consumer unless it was "negatively acknowledged". Only then would the message go back to the available queue. However, if a connection exited without acknowledging the message, it would never be available again. In an "at least once" messaging system, we have no way of knowing that the message was processed, so it's safer to assume it wasn't and re-queue it.

To do so, I created a ConnectionState struct that stores the current list of acquired messages. On acknowledgement, the message ID is removed from the list. On connection failure, the message IDs would be iterated over and placed back to the ready state.

Durability

Currently, all state is stored in memory. Not great if we want to scale up and survive restarts. To solve this, I am introducing a sled database. Sled offers basic transactional key-value b-trees and nothing more.

I create a database and add it to the shared state of the app. This database will have 2 trees, for now, a list of available messages and a list of acquired messages.

I got messages being saved into the published queue, but I didn't get them being moved into an acquired queue just yet.

What next

I need to fix the message acquisition so I can remove the previous queue code and only use the database. Afterwards, I want to design a few different and customisable queuing structures:

  • FIFO
  • Durable at least once
  • At most once

Eventually, I wrote about this on twitter1,2, I want to be able to deploy this in a distributed configuration. This will add some more complications but make it more interesting! I think I need to update my thumbnails because this is no longer just "a message queue in Rust with QUIC". It's a "distributed message queue in Rust with QUIC and Raft" :D


You can follow along with the code in this post on GitHub

Prev