Conrad Ludgate

Let's talk about this async

This is a deep dive into the land of async rust. Get a drink, grab a snack, because this is going to be a long one.

What the Async?

Async code exists to allow a programming language to offer more control in the way that code can run concurrently. A famous example of an async programming language is JS, which can only execute code using 1 thread, but still needs to support allowing multiple streams of work to happen at the same time.

For instance, say you want to make a network request, but while you're communicating with the external service, you want to update the web page with a live updating progress bar.

Leaving the toilet seat up

One way we can accomplish this in a single threaded setup is by allowing multiplexing. That is, allowing one task to temporarily pause to allow another task to resume. Eventually, both tasks will be run to completion, but neither blocks the other.

I thought this blog post was about Rust...?

I've already hinted at another mechanism to allow concurrency, threads! Rust allows multithreading, so why do we need async code? Well, there's many reasons. Maybe you're writing code on an embedded device that only supports single threaded operations. Maybe you're working in the Linux kernel itself and don't have access to just create threads as you please. Maybe you just want more control about what tasks can run and in which order, and to not be at the mercy of your OS scheduler.

Whatever your reasoning, async rust is general enough to support your needs.

Back to the Future

Before we dive to deep into async runtimes and how the OS handles IO etc, we need to talk about the future (trait). If you're familiar with async/await in JS, you might have seen that it's syntax sugar for the Promise class. It's similar in Rust, but instead we have the Future trait.

Futures represent that the value it computes might not be ready immediately, but at a later point in time. Let's take a look at the definition:

rust
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

There's a lot going on here... I don't want to talk about some of this yet, so for now, let's give it a bit of a hair cut...

rust
pub trait Future2 /* electric boogaloo */ {
    type Output;
    fn poll(&mut self) -> Poll<Self::Output>;
}

This is actually quite similar to another trait that rust has - FnMut (which is the trait behind a lot of closures).

rust
pub trait FnMut<Args> {
    type Output;
    fn call_mut(&mut self, args: Args) -> Self::Output;
}

We're not defining a function, so we can get rid of the args, but it can return a value.

But since a Future's output might not be ready immediately, we can make it return an option-like value. One variant indicates that it's Ready, and one the other indicates the value is Pending.

rust
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Much like functions can be made of more functions, our futures can be made of more futures:

rust
async fn foo() {
    bar().await
}
async fn bar() {}

When these are turned into futures, our foo future will call poll() on bar. If that returns Pending, then foo also returns Pending. If it returns Ready(()), then foo will attempt to make some progress.

You'll also notice the poll function takes in a mut reference to itself. This is how it keeps track of the progress it has made. Each call to poll will try to make some progress, and it'll be stored for next time.

Baby's first Future

Let's write a future by hand and see how they work

rust
#[derive(Debug)]
enum MyFuture {
    Init,
    Step1(usize),
    Done,
}

impl Future2 for MyFuture {
    type Output = usize;
    fn poll(&mut self) -> Poll<Self::Output> {
        // take ownership of the current state, we pinky-promise to put it back
        let this = std::mem::replace(self, Self::Done);     // (4)

        let new = match this {
            Self::Init => Self::Step1(6),                   // (1)
            Self::Step1(n) => return Poll::Ready(7 * n),    // (3)
            Self::Done => panic!("please stop polling me"), // (5)
        };

        *self = new;                                        // (2)
        Poll::Pending
    }
}

This is an implementation of our simple Future trait over an enum state machine. The state first computes the value of 6 (1), moving us into the Step1 state (2). On the next call, we'll compute the value of 7*6, where we return ready (3). The state is set to Done by the first line (4). Calling poll on a Done future should be a bug so we are free to panic (5).

Does it work? Let's try it!

rust
fn main() {
    // initialise the future
    let mut fut = MyFuture::Init;
    let n = loop {
        println!("polling - {fut:?}")
        // call poll
        match fut.poll() {
            // if ready, break the poll loop with our value
            Poll::Ready(n) => break n,
            // if pending, continue the loop
            Poll::Pending => println!("pending"),
        }
    };
    // Done!
    println!("ready!  - {n:?}");
}
rust
Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 1.28s
     Running `target/debug/playground`

polling - Init
pending
polling - Step1(6)
ready!  - 42

Woo! We successfully made a future and polled it to completion.

Stick a Pin in it

Ok, so we've seen a little bit of async code and how to write futures by hand. Let's go a bit deeper.

Can we capture?

rust
let x = "foo";
async {
    println!("{x}");
};

Yes! Just like closures, async blocks can capture outside scope. Since it's borrowing x, we should probably expect the future to be bound by the lifetime of x

To see if we understand, let's write this out by hand:

rust
struct CaptureFuture<'x> {
    Init {
        x: &'x str
    },
    Done,
}

impl Future2 for CaptureFuture<'_> {
    type Output = ();

    fn poll(&mut self) -> Poll<Self::Output> {
        match self {
            Self::Init { x } => {
                println!("{x}");
                *self = Self::Done;
                Poll::Ready(())
            }
            Self::Done => panic!("please stop polling me"),
        }
    }
}

Note that we could also use async move {} (analogous to move || {}) which would not borrow but instead move the captures inside.

Ok, let's try something else. This time, maybe even a bit more realistic! Let's say we have a Vec<u8> and we use an AsyncRead (works the same as Read, but async!).

rust
async {
    let mut buf = Vec::new();

    let mut reader = TcpStream::new().await; // use your imagination

    let n = reader.read(&mut buf).await.unwrap();
    //             ^^^^           ^^^^^
    //               like read      but async :)

    println!("Read {:?}", &buf[..n]);
}

This seems simple enough. Let's apply our magic and try and build this by hand.

First, let's dissect this .read() future. It borrows both our buffer and the reader as mutable, so its declaration must look something like

rust
struct Read<'buf, 'read, R> {
    reader: &'read mut R,
    buf: &'buf mut [u8],
    // more stuff here...
}

Ok, now for our actual future

rust
enum TcpRead {
    // first, we create our buffer
    Init {
        buf: Vec<u8>
    },
    // then we create our reader to go along with it
    Reader {
        buf: Vec<u8>,
        reader: TcpStream,
    },
    // then we borrow buf and reader while we read...
    Reading {
        buf: Vec<u8>,
        reader: TcpStream,
        reading: Read<'buf, 'read, TcpStream>,
    },
    // then we print and finish
    Done,
}

Seems simple enough

rust
Compiling playground v0.0.1 (/playground)
error[E0261]: use of undeclared lifetime name `'buf`
  --> src/lib.rs:15:23
   |
1  | enum TcpRead {
   |             - help: consider introducing lifetime `'buf` here: `<'buf>`
...
15 |         reading: Read<'buf, 'read, TcpStream>,
   |                       ^^^^ undeclared lifetime

error[E0261]: use of undeclared lifetime name `'read`
  --> src/lib.rs:15:29
   |
1  | enum TcpRead {
   |             - help: consider introducing lifetime `'read` here: `<'read>`
...
15 |         reading: Read<'buf, 'read, TcpStream>,
   |                             ^^^^^ undeclared lifetime

For more information about this error, try `rustc --explain E0261`.
error: could not compile `playground` due to 2 previous errors

Oh, we didn't define these lifetimes!

Rust is suggesting we add these lifetimes to our enum... ok!

diff
- enum TcpRead {
+ enum TcpRead<'buf, 'read> {

Ok, cool, it compiles.

Let's go ahead and implement our future

rust
impl<'buf, 'read> Future2 for TcpRead<'buf, 'read> {
    type Output = ();
    fn poll(&mut self) -> Poll<Self::Output> {
        let this = std::mem::replace(self, Done);
        let new = match this {
            Init { buf } => {
                Reader { buf, reader: TcpStream }
            }
            Reader { mut buf, mut reader } => {
                let reading = reader.read(&mut buf);
                Reading { buf, reader, reading }
            }
            Reading { buf, reader, mut reading } => {
                match reading.poll() {
                    Poll::Pending => Reading { buf, reader, reading },
                    Poll::Ready(n) => {
                        let n = n.unwrap();
                        println!("Read {:?}", &buf[..n]);
                        return Poll::Ready(())
                    }
                }
            }
            Done => panic!("I'm done already!"),
        };
        *self = new;
        Poll::Pending
    }
}

Aaaanndddd

rust
error[E0597]: `buf` does not live long enough
  --> src/lib.rs:31:43
   |
22 | impl<'buf, 'read> Future2 for TcpRead<'buf, 'read> {
   |      ---- lifetime `'buf` defined here
...
31 |                 let reading = reader.read(&mut buf);
   |                                           ^^^^^^^^ borrowed value does not live long enough
32 |                 Reading { buf, reader, reading }
   |                 -------------------------------- assignment requires that `buf` is borrowed for `'buf`
33 |             }
   |             - `buf` dropped here while still borrowed

error[E0505]: cannot move out of `buf` because it is borrowed
  --> src/lib.rs:32:27
   |
22 | impl<'buf, 'read> Future2 for TcpRead<'buf, 'read> {
   |      ---- lifetime `'buf` defined here
...
31 |                 let reading = reader.read(&mut buf);
   |                                           -------- borrow of `buf` occurs here
32 |                 Reading { buf, reader, reading }
   |                 ----------^^^-------------------
   |                 |         |
   |                 |         move out of `buf` occurs here
   |                 assignment requires that `buf` is borrowed for `'buf`

Some errors have detailed explanations: E0505, E0597.
For more information about an error, try `rustc --explain E0505`.
error: could not compile `playground` due to 4 previous errors

Well, that's a lot of errors. Let's see what they are saying:

borrow of buf occurs here move out of buf occurs here

So we are trying to borrow buf from ourselves. This is classic self-referential data types. Self referential structs are hard.

So where do we go from here? We could use unsafe to break out of this problem. Let's try that!

Ok, maybe futures are just unsafe, and our async blocks will write the unsafe code for us, and our executors will have to deal with that unsafe too?

rust
enum TcpRead {
    // ...
    Reading {
        buf: Vec<u8>,
        reader: TcpStream,
        reading: Read<'static, 'static, TcpStream>, // static borrows!!!
    },
    // ...
}

impl Future2 for TcpRead {
    type Output = ();
    fn poll(&mut self) -> Poll<Self::Output> {
        let this = std::mem::replace(self, Done);
        let new = match this {
            // ..
            Reader { buf, reader } => {
                unsafe {
                    // lifetime extension
                    let buf2: &'static mut [u8] = &mut *(&mut *buf as *mut _);
                    let reader2: &'static mut TcpStream = &mut *(&mut reader as *mut _);
                    let reading = reader2.read(buf2);

                    Reading { buf, reader, reading }
                }
            }
        }
    }
}

Ok, This is probably a #BadIdea. But what safety issues might we have?

  1. So our lifetime extensions are fine as long as they point to valid memory
  2. Our memory is valid as long as buf/reader do not move
  3. buf/reader won't move if our TcpRead future doesn't move.

So our safety requirement is that TcpRead future doesn't move. Got it, cool. Let's slap unsafe in front of our poll method and call it a day?

Not so fast. Instead of making it unsafe to poll future, we just need to make it unsafe to move them. Well, all data types in rust can be moved, unless they are being borrowed. Borrows can be made at any point, which means they can be discarded while the type is moved, so that doesn't help us.

Maybe we can design a borrow that is unsafe to construct? Yeah!

That's where Pin comes in! It provides an unsafe Pin::new_unchecked(pointer) method that will borrow the owned future. It's unsafe to call this function, so any callers should be sure that they don't move it if they call it again.

There are safe variants of Pin creation, but they either involve special traits or heap allocations

Let's update our future trait again:

rust
pub trait Future3 /* Revenge of the Pin */ {
    type Output;
    fn poll(self: Pin<&mut Self>) -> Poll<Self::Output>;
    //            ^^^ pinned! our callers have promised not to move us
}

Executor Order 66

Ok, now that we have futures out of the way (sorry, that took a while) we can get to executors.

I showed off a simple executor earlier, now let's update it to work with our new future trait:

rust
fn main() {
    // Initialise our future
    let mut fut = TcpRead::Init { buf: vec![] };

    // Pin the future
    // Safety: we'll be good and not move it until we stop polling it!
    let mut fut = unsafe { Pin::new_unchecked(&mut fut) };

    let n = loop {
        // call poll (as_mut does a reborrow for us so we can poll it again next time)
        match fut.as_mut().poll() {
            Poll::Ready(n) => break n,
            Poll::Pending => println!("value was not ready"),
        }
    };
    // Done!
    println!("value is ready -> {n:?}");
}

Ok, so what about spawning more tasks to run along side this one?

Well, I gave a solution earlier! We just keep a list of tasks and loop through them!

rust
fn main() {
    // a ring buffer with efficient FIFO operations
    let mut tasks = VecDeque::new();

    // Pin the future (using box now because our tasks will be moving around :flushed:)
    let fut = Box::pin(/* fut */);

    // insert our task!
    tasks.push_back(fut);

    loop {
        let mut fut = tasks.pop_front().unwrap(); // take the first task
        match fut.as_mut().poll() {
            Poll::Ready(_) => break,
            Poll::Pending => {
                tasks.push_back(fut); // we need to re-queue our task now!
            },
        }
    };
}

Now, our TcpRead future just needs access to some spawn() function that pushes a second task into our tasks queue.

Ok, let's extract it into a struct and make it a thread local instead - that way we can easily call spawn() anywhere in our code stack!

rust
// Task alias
type Task = Pin<Box<dyn Future<Output = ()>>>;

// our executor struct owns our task queue
#[derive(Default)]
pub struct Executor {
    // Using a RefCell because we'll need interior mutability :O
    queue: RefCell<VecDeque<Task>>
}

thread_local! {
    // Our thread local executor. Will be initialised later.
    // Uses an Rc because it will need multiple owners
    static EXECUTOR: OnceCell<Rc<Executor>> = OnceCell::new();
}

/// Spawns a future in our thread-local executor
pub fn spawn(&self, fut: impl Future<Output = ()>) {
    EXECUTOR.with(|e| e.get().unwrap().spawn(fut));
}

impl Executor {
    /// registers this executor onto the current thread
    fn register(self: &Rc<Self>) {
        EXECUTOR.with(|e| e.set(self.clone));
    }

    pub fn spawn(&self, fut: impl Future<Output = ()>) {
        self.tasks.borrow_mut().push_back(Box::pin(fut));
    }

    /// Waits for the future to complete
    pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
        self.register(); // we're now in executor land

        // a way to store the output of the future, which will also signal
        // that we are done
        let output: Rc<RefCell<Option<F::Output>>> = Rc::new(RefCell::new(None));

        let output2 = Rc::clone(output);
        self.spawn(async move {
            let output = fut.await;
            *output2.borrow_mut() = Some(output); // set out output value
        });

        loop {
            let mut fut = self.tasks.borrow_mut().pop_front().unwrap(); // take the first task

            if fut.as_mut().poll().is_pending() {
                self.tasks.borrow_mut().push_back(fut); // we need to re-queue our task now!
            }

            // exit our loop if we have our final value :)
            if let Some(output) = output.borrow_mut().take() {
                break output
            }
        }
    }
}

And that's it! This is a functioning single threaded async executor. Let's try it out!

rust
fn main() {
    let executor = Executor::default();
    executor.block_on(start());
}

async fn start() {
    // our magic business logic goes here
    println!("start!");
    for i in 0..10 {
        spawn(async move {
            println!("hello from task {i}");
        });
    }
    println!("spawned 10 tasks!");
}
rust
Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 1.28s
     Running `target/debug/playground`

start!
spawned 10 tasks!
hello from task 0
hello from task 1
hello from task 2
hello from task 3
hello from task 4
hello from task 5
hello from task 6
hello from task 7
hello from task 8
hello from task 9

Time to add some more Context

You might notice that our executor doesn't do anything interesting yet.

Let's say we want a way to sleep in our async tasks. We could just use std::thread::sleep() in our task, but that will block our whole thread loop. Ideally, our sleep feature would be async aware, and would just return Pending whenever it is not ready.

rust
struct Sleep {
    until: Instant,
}

impl Future for Sleep {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.until > Instant::now() {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

pub fn sleep(dur: Duration) -> Sleep {
    sleep_until(Instant::now() + dur)
}
pub fn sleep_until(until: Instant) -> Sleep {
    Sleep { until }
}

Cool, this works. It will allow other tasks to work while it's sleeping, but if you're paying attention, you'll notice that we've turned our efficient thread::sleep(dur) into

rust
let until = Instant::now() + dur
while self.until < Instant::now() { }

Busy loops like this are usually not a good idea...

Wouldn't it be nice if we could tell our executor when our tasks are ready to make progress?

This is where the Context type comes in. We're ready to see the full Future trait again

rust
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
    //                            ^^^^^^^^^^^^^^^^^^^^  New!
}

Let's see the docs for Context:

rust
/// The Context of an asynchronous task.
pub struct Context<'a> { /* private fields */ }

impl<'a> Context<'a> {
    /// Create a new Context from a &Waker.
    pub fn from_waker(waker: &'a Waker) -> Context<'a>;

    /// Returns a reference to the Waker for the current task.
    pub fn waker(&self) -> &'a Waker;
}

Ok, so Context is related to an async task, and is seemingly a light wrapper over Waker

rust
/// A Waker is a handle for waking up a task by notifying its executor that it is ready to be run.
pub struct Waker { /* private fields */ }

impl Waker {
    /// Wake up the task associated with this Waker.
    pub fn wake(self);
}

Right, so Waker has wake() method that lets us wake up a task.

Currently in our executor block_on function, we have

rust
if fut.as_mut().poll().is_pending() {
    self.tasks.borrow_mut().push_back(fut);
}

When our task returns pending, instead of giving it back to the executor queue, we can instead give it to a Waker. And the sleep future will put that Waker onto a scheduler that will call the wake method when it's ready, this will then place the task back onto the executor queue.

First, we need our waker:

rust
struct TaskWaker {
    task: RefCell<Option<Task>>,
}

impl Wake for TaskWaker {
    fn wake(self: Arc<Self>) {
        // if we have a task
        if let Some(task) = self.task.borrow_mut().take() {
            // place it into our executor queue
            EXECUTOR.with(|e| {
                let e = e.get().unwrap();
                e.tasks.borrow_mut().push_back(fut)
            });
        }
    }
}

Now, we will construct our task context

rust
let wake = Arc::new(TaskWaker { task: RefCell::new(None) });
let waker = Waker::from(Arc::clone(wake));
let mut cx = Context::from_waker(&waker);

Finally, we update the task polling to push the task here instead of the main queue

rust
if fut.as_mut().poll(&mut cx).is_pending() {
    // our task is now in charge of scheduling itself!
    wake.task.borrow_mut().push_back(fut);
}

Yay! So now we're not wasting cycles polling tasks that aren't ready, we should probably add a check in our main loop to not panic if the tasks list is empty now

diff
- let mut fut = self.tasks.borrow_mut().pop_front().unwrap(); // take the first task
+ let mut fut = match self.tasks.borrow_mut().pop_front() {
+     Some(fut) => fut,
+     None => continue,
+ }

Iron Man wants his Arc<Reactor> back

Currently, there's nothing for our sleep future to register to in order to wake up. Let's fix that.

Let's give our executor a 'reactor', which will be reacting to side-effects. In this case, our side effect will be time progressing

time doesn't progress as a result of our code, it's an intrinsic property of the computer, therefore it's an outside effect of the program

rust
pub struct TimeEntry {
    instant: Instant,
    waker: Waker,
}

pub struct Timers {
    // we'll be inserting the entries ordered by time
    // the earlier times will be at the end
    queue: Vec<TimeEntry>
}

impl Timers {
    pub fn insert(&mut self, instant: Instant, waker: Waker) {
        // reverse means it sorts with earlier times at the end (faster pop)
        let index = match self.queue.binary_search_by_key(&Reverse(instant), |e| Reverse(e.instant)) {
            Ok(index) | Err(index) => index,
        };
        let entry = TimeEntry { instant, waker };
        queue.insert(index, entry);
    }

    fn pop(&mut self) -> Option<Waker> {
        // get the next timer
        let TimeEntry { instant, waker } = self.queue.pop()?;
        if instant > Instant::now() {
            // push it back if it's not ready yet
            self.0.push(TimeEntry { instant, waker });
            None
        } else {
            Some(waker)
        }
    }
}

pub struct Reactor {
    timers: RefCell<Timers>
}

impl Reactor {
    pub fn insert_timer(&self, instant: Instant, waker: Waker) {
        self.timers.borrow_mut().insert(instant, waker);
    }

    pub fn tick(&self) {
        // try get a timer and wake it up
        if let Some(task) = self.timers.borrow_mut().pop() {
            task.wake()
        }
    }
}

Now let's give the reactor to our executor

diff
pub struct Executor {
    // Using a RefCell because we'll need interior mutability :O
    queue: RefCell<VecDeque<Task>>
+   reactor: Reactor,
}

And now let's update our sleep future to insert into the reactor

diff
impl Future for Sleep {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.until > Instant::now() {
+           EXECUTOR.with(|e| e.get().unwrap().reactor.insert_timer(self.until, cx.waker().clone()));
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

Finally, we need to make our block_on function tick the reactor

diff
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
    // setup ...

    loop {
+       // tick the reactor, we can be smarter about this, but it works
+       self.reactor.tick();

        // poll our futures ...
    }
}

Hooray! And that's basically all there is to making an async runtime.

Closing thoughts

Phew, that was a lot of text. If you made it this far, great job. I really appreciate it. This is my longest blog post by far and I want to thank Amos (fasterthanlime) for opening my mind into these long form educational blog posts. I hope I have done him proud <3

I also hope I've helped make some of the choices that the Future trait made make sense, I was confused by a lot of these things in the beginning and I never found a single definitive guide for why they are the way they are, and how to use them effectively. This is hoping to filling that gap.

Currently, what we have written in this post is only a single-threaded runtime. Making it multi-threaded is a bit more complex and requires an executor per thread, and a shared reactor between threads.

There's also the problem of having a queue per executor + a global queue. If you're interested, you can read into how tokio manages their executor scheduler.

The timers are also very naive. Tokio has another post talking about how they optimise their timers for both sub millisecond precision and also multi minute sleeps.

I've skipped over IO in this post too, but it can be added using the reactor model too. AsyncIO futures would register their IO interests onto the reactor, the reactor would poll the OS for events, these would be fed through and wake up the dependant tasks. Tokio does exactly this, and created the mio crate for a better OS abstraction.

If you want a complete code sample, I have published a multi-threaded version of this code: https://github.com/conradludgate/what-the-async

Finally, I want to thank all the tokio team/contributors for their early efforts in this space. Great code and great blogs, and they have been very helpful in guiding me. Also thanks to all those who hang out in the #async channel in the Rust Community Discord Server.