Conrad Ludgate

Broke But Quick 3

This blog series is supplementary to my new live programming YouTube series. Watch me program this live

In the previous post, I managed to publish a message to the server, such that it could save the message. But that's not so useful if we cannot consume the message back.

Exchange and Queues

To start, I needed to fix up the work from last time. When a message is published to an exchange, it is not stored by that exchange, but instead routed through that exchange and stored on queues.

I quickly updated the shared state to have a concept of exchanges, routes, and queues and updated publishing to save messages to the queues.

Consuming

When the client wants to consume a message, they open a new bi-directional stream, send the consume header tag and then send the queue they want to consume from.

The server then finds a relevant message and sends that back to the client.

Message Acknowledgements

When the client is happy with the message, they can acknowledge it. This will delete the message from the queue for good.

I realised that when I consume a message, I never mark the message as taken, which means another consumer would see the same message. This isn't super great, so I added a field to the messages in the queue for "in flight", which ensures that messages are repeatedly taken from the queue.

I then updated the client code to keep the send stream open, such that a user can send back an acknowledgement code. On the server, after sending the payload to the client, it will wait until an acknowledgement code is sent back, and it will appropriately delete the message from the queue, or unset the in-flight flag if the message was "not acknowledged".

What next

  • I want to add dead-letter queues so that messages can be rejected properly.
  • I want to keep a list of in-flight message IDs associated with the connection/stream, such that if they close they will be returned.
  • I want to store published messages into a database, so they can be persisted through restarts.

You can follow along with the code in this post on GitHub

Prev Next