This blog series is supplementary to my new live programming YouTube series. Watch me program this live
In the previous post, I wrote about using QUIC to write a message broker. It would be good if it behaved like a message broker. What I wanted to get done is quite simple: be able to send a message to an exchange with a routing key.
I started by abstracting the ideas I worked on prior. The client application shouldn't worry about how to create connections and streams,
they should just ask for a connection with Connction::new
and then they should be able to publish
messages without opening the streams.
Once I had created the Connection
struct, I started writing the publish method. It took in the exchange
, a routing_key
and a message
to send.
I wrote a small binary encoding protocol with length-prefixed byte arrays, and I gave the entire block a length prefix.
I later realised that I would also need to tag the stream requests with their intent, so I also prefixed the message with "SEND"
as a 4-byte header.
After writing the publish implementation, I needed to update the server to process the message. I first read 4 bytes from the RecvStream
, and matched on it.
If it matched the publish header magic bytes, I forwarded the stream to a new function. This started by reading the length prefix.
My first instinct was to then read the 8 bytes of the exchange string length prefix, and read the exchange, but that had some problems. QUIC has some built-in restrictions on payload length. If you try and send too much data, the other peer will close the entire connection immediately. By asking the QUIC stream to read only the exchange length number of bytes, it would immediately see the rest of the bytes after and cut the connection.
It turns out that I was lucky to add the total payload length. I instead asked the RecvStream for all of that payload length and then I parsed the exchange, routing key and message out from that. This worked very well.
A message broker like RabbitMQ also stores messages in queues, to support some amount of bursting. If the consumers are not ready, the messages can wait in the queue.
To get started on that, I naively used a Mutex<HashMap>
. I instantiated the state, put it into an Arc
and then shared that arc with connections
and then with the stream handlers. When a publish message came through, I stored the message in the hashmap. This will do the job for now but it is not at all my final intention.
You can follow along with the code in this post on GitHub