This is a deep dive into the land of async rust. Get a drink, grab a snack, because this is going to be a long one.
Async code exists to allow a programming language to offer more control in the way that code can run concurrently. A famous example of an async programming language is JS, which can only execute code using 1 thread, but still needs to support allowing multiple streams of work to happen at the same time.
For instance, say you want to make a network request, but while you're communicating with the external service, you want to update the web page with a live updating progress bar.
One way we can accomplish this in a single threaded setup is by allowing multiplexing. That is, allowing one task to temporarily pause to allow another task to resume. Eventually, both tasks will be run to completion, but neither blocks the other.
I've already hinted at another mechanism to allow concurrency, threads! Rust allows multithreading, so why do we need async code? Well, there's many reasons. Maybe you're writing code on an embedded device that only supports single threaded operations. Maybe you're working in the Linux kernel itself and don't have access to just create threads as you please. Maybe you just want more control about what tasks can run and in which order, and to not be at the mercy of your OS scheduler.
Whatever your reasoning, async rust is general enough to support your needs.
Before we dive to deep into async runtimes and how the OS handles IO etc, we need to talk about the future (trait).
If you're familiar with async/await in JS, you might have seen that it's syntax sugar for the Promise
class.
It's similar in Rust, but instead we have the Future
trait.
Futures represent that the value it computes might not be ready immediately, but at a later point in time. Let's take a look at the definition:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
There's a lot going on here... I don't want to talk about some of this yet, so for now, let's give it a bit of a hair cut...
pub trait Future2 /* electric boogaloo */ {
type Output;
fn poll(&mut self) -> Poll<Self::Output>;
}
This is actually quite similar to another trait that rust has -
FnMut
(which is the trait behind a lot of closures).
pub trait FnMut<Args> {
type Output;
fn call_mut(&mut self, args: Args) -> Self::Output;
}
We're not defining a function, so we can get rid of the args
, but it can return a value.
But since a Future
's output might not be ready immediately, we can make it return an option-like
value. One variant indicates that it's Ready
, and one the other indicates the value is Pending
.
pub enum Poll<T> {
Ready(T),
Pending,
}
Much like functions can be made of more functions, our futures can be made of more futures:
async fn foo() {
bar().await
}
async fn bar() {}
When these are turned into futures, our foo
future will call poll()
on bar
.
If that returns Pending
, then foo
also returns Pending
.
If it returns Ready(())
, then foo
will attempt to make some progress.
You'll also notice the poll function takes in a mut reference to itself. This is how it keeps track of the progress it has made. Each call to poll will try to make some progress, and it'll be stored for next time.
Let's write a future by hand and see how they work
#[derive(Debug)]
enum MyFuture {
Init,
Step1(usize),
Done,
}
impl Future2 for MyFuture {
type Output = usize;
fn poll(&mut self) -> Poll<Self::Output> {
// take ownership of the current state, we pinky-promise to put it back
let this = std::mem::replace(self, Self::Done); // (4)
let new = match this {
Self::Init => Self::Step1(6), // (1)
Self::Step1(n) => return Poll::Ready(7 * n), // (3)
Self::Done => panic!("please stop polling me"), // (5)
};
*self = new; // (2)
Poll::Pending
}
}
This is an implementation of our simple Future trait over an enum state machine.
The state first computes the value of 6 (1)
, moving us into the Step1
state (2)
.
On the next call, we'll compute the value of 7*6, where we return ready (3)
.
The state is set to Done
by the first line (4)
.
Calling poll
on a Done
future should be a bug so we are free to panic (5)
.
Does it work? Let's try it!
fn main() {
// initialise the future
let mut fut = MyFuture::Init;
let n = loop {
println!("polling - {fut:?}")
// call poll
match fut.poll() {
// if ready, break the poll loop with our value
Poll::Ready(n) => break n,
// if pending, continue the loop
Poll::Pending => println!("pending"),
}
};
// Done!
println!("ready! - {n:?}");
}
Compiling playground v0.0.1 (/playground)
Finished dev [unoptimized + debuginfo] target(s) in 1.28s
Running `target/debug/playground`
polling - Init
pending
polling - Step1(6)
ready! - 42
Woo! We successfully made a future and polled it to completion.
Ok, so we've seen a little bit of async code and how to write futures by hand. Let's go a bit deeper.
Can we capture?
let x = "foo";
async {
println!("{x}");
};
Yes! Just like closures, async blocks can capture outside scope.
Since it's borrowing x
, we should probably expect the future to be bound by the lifetime of x
To see if we understand, let's write this out by hand:
struct CaptureFuture<'x> {
Init {
x: &'x str
},
Done,
}
impl Future2 for CaptureFuture<'_> {
type Output = ();
fn poll(&mut self) -> Poll<Self::Output> {
match self {
Self::Init { x } => {
println!("{x}");
*self = Self::Done;
Poll::Ready(())
}
Self::Done => panic!("please stop polling me"),
}
}
}
Note that we could also use
async move {}
(analogous tomove || {}
) which would not borrow but instead move the captures inside.
Ok, let's try something else. This time, maybe even a bit more realistic!
Let's say we have a Vec<u8>
and we use an AsyncRead
(works the same as Read
, but async!).
async {
let mut buf = Vec::new();
let mut reader = TcpStream::new().await; // use your imagination
let n = reader.read(&mut buf).await.unwrap();
// ^^^^ ^^^^^
// like read but async :)
println!("Read {:?}", &buf[..n]);
}
This seems simple enough. Let's apply our magic and try and build this by hand.
First, let's dissect this .read()
future. It borrows both our buffer and the reader as mutable,
so its declaration must look something like
struct Read<'buf, 'read, R> {
reader: &'read mut R,
buf: &'buf mut [u8],
// more stuff here...
}
Ok, now for our actual future
enum TcpRead {
// first, we create our buffer
Init {
buf: Vec<u8>
},
// then we create our reader to go along with it
Reader {
buf: Vec<u8>,
reader: TcpStream,
},
// then we borrow buf and reader while we read...
Reading {
buf: Vec<u8>,
reader: TcpStream,
reading: Read<'buf, 'read, TcpStream>,
},
// then we print and finish
Done,
}
Seems simple enough
Compiling playground v0.0.1 (/playground)
error[E0261]: use of undeclared lifetime name `'buf`
--> src/lib.rs:15:23
|
1 | enum TcpRead {
| - help: consider introducing lifetime `'buf` here: `<'buf>`
...
15 | reading: Read<'buf, 'read, TcpStream>,
| ^^^^ undeclared lifetime
error[E0261]: use of undeclared lifetime name `'read`
--> src/lib.rs:15:29
|
1 | enum TcpRead {
| - help: consider introducing lifetime `'read` here: `<'read>`
...
15 | reading: Read<'buf, 'read, TcpStream>,
| ^^^^^ undeclared lifetime
For more information about this error, try `rustc --explain E0261`.
error: could not compile `playground` due to 2 previous errors
Oh, we didn't define these lifetimes!
Rust is suggesting we add these lifetimes to our enum... ok!
- enum TcpRead {
+ enum TcpRead<'buf, 'read> {
Ok, cool, it compiles.
Let's go ahead and implement our future
impl<'buf, 'read> Future2 for TcpRead<'buf, 'read> {
type Output = ();
fn poll(&mut self) -> Poll<Self::Output> {
let this = std::mem::replace(self, Done);
let new = match this {
Init { buf } => {
Reader { buf, reader: TcpStream }
}
Reader { mut buf, mut reader } => {
let reading = reader.read(&mut buf);
Reading { buf, reader, reading }
}
Reading { buf, reader, mut reading } => {
match reading.poll() {
Poll::Pending => Reading { buf, reader, reading },
Poll::Ready(n) => {
let n = n.unwrap();
println!("Read {:?}", &buf[..n]);
return Poll::Ready(())
}
}
}
Done => panic!("I'm done already!"),
};
*self = new;
Poll::Pending
}
}
Aaaanndddd
error[E0597]: `buf` does not live long enough
--> src/lib.rs:31:43
|
22 | impl<'buf, 'read> Future2 for TcpRead<'buf, 'read> {
| ---- lifetime `'buf` defined here
...
31 | let reading = reader.read(&mut buf);
| ^^^^^^^^ borrowed value does not live long enough
32 | Reading { buf, reader, reading }
| -------------------------------- assignment requires that `buf` is borrowed for `'buf`
33 | }
| - `buf` dropped here while still borrowed
error[E0505]: cannot move out of `buf` because it is borrowed
--> src/lib.rs:32:27
|
22 | impl<'buf, 'read> Future2 for TcpRead<'buf, 'read> {
| ---- lifetime `'buf` defined here
...
31 | let reading = reader.read(&mut buf);
| -------- borrow of `buf` occurs here
32 | Reading { buf, reader, reading }
| ----------^^^-------------------
| | |
| | move out of `buf` occurs here
| assignment requires that `buf` is borrowed for `'buf`
Some errors have detailed explanations: E0505, E0597.
For more information about an error, try `rustc --explain E0505`.
error: could not compile `playground` due to 4 previous errors
Well, that's a lot of errors. Let's see what they are saying:
borrow of
buf
occurs here move out ofbuf
occurs here
So we are trying to borrow buf
from ourselves. This is classic self-referential data types.
Self referential structs are hard.
So where do we go from here? We could use unsafe to break out of this problem. Let's try that!
Ok, maybe futures are just unsafe, and our async blocks will write the unsafe code for us, and our executors will have to deal with that unsafe too?
enum TcpRead {
// ...
Reading {
buf: Vec<u8>,
reader: TcpStream,
reading: Read<'static, 'static, TcpStream>, // static borrows!!!
},
// ...
}
impl Future2 for TcpRead {
type Output = ();
fn poll(&mut self) -> Poll<Self::Output> {
let this = std::mem::replace(self, Done);
let new = match this {
// ..
Reader { buf, reader } => {
unsafe {
// lifetime extension
let buf2: &'static mut [u8] = &mut *(&mut *buf as *mut _);
let reader2: &'static mut TcpStream = &mut *(&mut reader as *mut _);
let reading = reader2.read(buf2);
Reading { buf, reader, reading }
}
}
}
}
}
Ok, This is probably a #BadIdea. But what safety issues might we have?
buf
/reader
do not movebuf
/reader
won't move if our TcpRead
future doesn't move.So our safety requirement is that TcpRead
future doesn't move. Got it, cool. Let's slap
unsafe
in front of our poll
method and call it a day?
Not so fast. Instead of making it unsafe to poll future, we just need to make it unsafe to move them. Well, all data types in rust can be moved, unless they are being borrowed. Borrows can be made at any point, which means they can be discarded while the type is moved, so that doesn't help us.
Maybe we can design a borrow that is unsafe to construct? Yeah!
That's where Pin
comes in! It provides an unsafe Pin::new_unchecked(pointer)
method that will
borrow the owned future. It's unsafe to call this function, so any callers should be sure
that they don't move it if they call it again.
There are safe variants of
Pin
creation, but they either involve special traits or heap allocations
Let's update our future trait again:
pub trait Future3 /* Revenge of the Pin */ {
type Output;
fn poll(self: Pin<&mut Self>) -> Poll<Self::Output>;
// ^^^ pinned! our callers have promised not to move us
}
Ok, now that we have futures out of the way (sorry, that took a while) we can get to executors.
I showed off a simple executor earlier, now let's update it to work with our new future trait:
fn main() {
// Initialise our future
let mut fut = TcpRead::Init { buf: vec![] };
// Pin the future
// Safety: we'll be good and not move it until we stop polling it!
let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
let n = loop {
// call poll (as_mut does a reborrow for us so we can poll it again next time)
match fut.as_mut().poll() {
Poll::Ready(n) => break n,
Poll::Pending => println!("value was not ready"),
}
};
// Done!
println!("value is ready -> {n:?}");
}
Ok, so what about spawning more tasks to run along side this one?
Well, I gave a solution earlier! We just keep a list of tasks and loop through them!
fn main() {
// a ring buffer with efficient FIFO operations
let mut tasks = VecDeque::new();
// Pin the future (using box now because our tasks will be moving around :flushed:)
let fut = Box::pin(/* fut */);
// insert our task!
tasks.push_back(fut);
loop {
let mut fut = tasks.pop_front().unwrap(); // take the first task
match fut.as_mut().poll() {
Poll::Ready(_) => break,
Poll::Pending => {
tasks.push_back(fut); // we need to re-queue our task now!
},
}
};
}
Now, our TcpRead
future just needs access to some spawn()
function that pushes a second
task into our tasks queue.
Ok, let's extract it into a struct and make it a thread local instead -
that way we can easily call spawn()
anywhere in our code stack!
// Task alias
type Task = Pin<Box<dyn Future<Output = ()>>>;
// our executor struct owns our task queue
#[derive(Default)]
pub struct Executor {
// Using a RefCell because we'll need interior mutability :O
queue: RefCell<VecDeque<Task>>
}
thread_local! {
// Our thread local executor. Will be initialised later.
// Uses an Rc because it will need multiple owners
static EXECUTOR: OnceCell<Rc<Executor>> = OnceCell::new();
}
/// Spawns a future in our thread-local executor
pub fn spawn(&self, fut: impl Future<Output = ()>) {
EXECUTOR.with(|e| e.get().unwrap().spawn(fut));
}
impl Executor {
/// registers this executor onto the current thread
fn register(self: &Rc<Self>) {
EXECUTOR.with(|e| e.set(self.clone));
}
pub fn spawn(&self, fut: impl Future<Output = ()>) {
self.tasks.borrow_mut().push_back(Box::pin(fut));
}
/// Waits for the future to complete
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.register(); // we're now in executor land
// a way to store the output of the future, which will also signal
// that we are done
let output: Rc<RefCell<Option<F::Output>>> = Rc::new(RefCell::new(None));
let output2 = Rc::clone(output);
self.spawn(async move {
let output = fut.await;
*output2.borrow_mut() = Some(output); // set out output value
});
loop {
let mut fut = self.tasks.borrow_mut().pop_front().unwrap(); // take the first task
if fut.as_mut().poll().is_pending() {
self.tasks.borrow_mut().push_back(fut); // we need to re-queue our task now!
}
// exit our loop if we have our final value :)
if let Some(output) = output.borrow_mut().take() {
break output
}
}
}
}
And that's it! This is a functioning single threaded async executor. Let's try it out!
fn main() {
let executor = Executor::default();
executor.block_on(start());
}
async fn start() {
// our magic business logic goes here
println!("start!");
for i in 0..10 {
spawn(async move {
println!("hello from task {i}");
});
}
println!("spawned 10 tasks!");
}
Compiling playground v0.0.1 (/playground)
Finished dev [unoptimized + debuginfo] target(s) in 1.28s
Running `target/debug/playground`
start!
spawned 10 tasks!
hello from task 0
hello from task 1
hello from task 2
hello from task 3
hello from task 4
hello from task 5
hello from task 6
hello from task 7
hello from task 8
hello from task 9
You might notice that our executor doesn't do anything interesting yet.
Let's say we want a way to sleep in our async tasks.
We could just use std::thread::sleep()
in our task, but that will block
our whole thread loop. Ideally, our sleep feature would be async aware,
and would just return Pending whenever it is not ready.
struct Sleep {
until: Instant,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.until > Instant::now() {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
pub fn sleep(dur: Duration) -> Sleep {
sleep_until(Instant::now() + dur)
}
pub fn sleep_until(until: Instant) -> Sleep {
Sleep { until }
}
Cool, this works. It will allow other tasks to work while it's sleeping,
but if you're paying attention,
you'll notice that we've turned our efficient thread::sleep(dur)
into
let until = Instant::now() + dur
while self.until < Instant::now() { }
Busy loops like this are usually not a good idea...
Wouldn't it be nice if we could tell our executor when our tasks are ready to make progress?
This is where the Context
type comes in. We're ready to see the full Future
trait again
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
// ^^^^^^^^^^^^^^^^^^^^ New!
}
Let's see the docs for Context:
/// The Context of an asynchronous task.
pub struct Context<'a> { /* private fields */ }
impl<'a> Context<'a> {
/// Create a new Context from a &Waker.
pub fn from_waker(waker: &'a Waker) -> Context<'a>;
/// Returns a reference to the Waker for the current task.
pub fn waker(&self) -> &'a Waker;
}
Ok, so Context
is related to an async task, and is seemingly a light wrapper over Waker
/// A Waker is a handle for waking up a task by notifying its executor that it is ready to be run.
pub struct Waker { /* private fields */ }
impl Waker {
/// Wake up the task associated with this Waker.
pub fn wake(self);
}
Right, so Waker
has wake()
method that lets us wake up a task.
Currently in our executor block_on
function, we have
if fut.as_mut().poll().is_pending() {
self.tasks.borrow_mut().push_back(fut);
}
When our task returns pending, instead of giving it back to the executor queue,
we can instead give it to a Waker
.
And the sleep future will put that Waker
onto a scheduler that will call the wake
method when it's ready,
this will then place the task back onto the executor queue.
First, we need our waker:
struct TaskWaker {
task: RefCell<Option<Task>>,
}
impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
// if we have a task
if let Some(task) = self.task.borrow_mut().take() {
// place it into our executor queue
EXECUTOR.with(|e| {
let e = e.get().unwrap();
e.tasks.borrow_mut().push_back(fut)
});
}
}
}
Now, we will construct our task context
let wake = Arc::new(TaskWaker { task: RefCell::new(None) });
let waker = Waker::from(Arc::clone(wake));
let mut cx = Context::from_waker(&waker);
Finally, we update the task polling to push the task here instead of the main queue
if fut.as_mut().poll(&mut cx).is_pending() {
// our task is now in charge of scheduling itself!
wake.task.borrow_mut().push_back(fut);
}
Yay! So now we're not wasting cycles polling tasks that aren't ready, we should probably add a check in our main loop to not panic if the tasks list is empty now
- let mut fut = self.tasks.borrow_mut().pop_front().unwrap(); // take the first task
+ let mut fut = match self.tasks.borrow_mut().pop_front() {
+ Some(fut) => fut,
+ None => continue,
+ }
Arc<Reactor>
back Currently, there's nothing for our sleep future to register to in order to wake up. Let's fix that.
Let's give our executor a 'reactor', which will be reacting to side-effects. In this case, our side effect will be time progressing
time doesn't progress as a result of our code, it's an intrinsic property of the computer, therefore it's an outside effect of the program
pub struct TimeEntry {
instant: Instant,
waker: Waker,
}
pub struct Timers {
// we'll be inserting the entries ordered by time
// the earlier times will be at the end
queue: Vec<TimeEntry>
}
impl Timers {
pub fn insert(&mut self, instant: Instant, waker: Waker) {
// reverse means it sorts with earlier times at the end (faster pop)
let index = match self.queue.binary_search_by_key(&Reverse(instant), |e| Reverse(e.instant)) {
Ok(index) | Err(index) => index,
};
let entry = TimeEntry { instant, waker };
queue.insert(index, entry);
}
fn pop(&mut self) -> Option<Waker> {
// get the next timer
let TimeEntry { instant, waker } = self.queue.pop()?;
if instant > Instant::now() {
// push it back if it's not ready yet
self.0.push(TimeEntry { instant, waker });
None
} else {
Some(waker)
}
}
}
pub struct Reactor {
timers: RefCell<Timers>
}
impl Reactor {
pub fn insert_timer(&self, instant: Instant, waker: Waker) {
self.timers.borrow_mut().insert(instant, waker);
}
pub fn tick(&self) {
// try get a timer and wake it up
if let Some(task) = self.timers.borrow_mut().pop() {
task.wake()
}
}
}
Now let's give the reactor to our executor
pub struct Executor {
// Using a RefCell because we'll need interior mutability :O
queue: RefCell<VecDeque<Task>>
+ reactor: Reactor,
}
And now let's update our sleep future to insert into the reactor
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.until > Instant::now() {
+ EXECUTOR.with(|e| e.get().unwrap().reactor.insert_timer(self.until, cx.waker().clone()));
Poll::Pending
} else {
Poll::Ready(())
}
}
}
Finally, we need to make our block_on
function tick the reactor
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
// setup ...
loop {
+ // tick the reactor, we can be smarter about this, but it works
+ self.reactor.tick();
// poll our futures ...
}
}
Hooray! And that's basically all there is to making an async runtime.
Phew, that was a lot of text. If you made it this far, great job. I really appreciate it. This is my longest blog post by far and I want to thank Amos (fasterthanlime) for opening my mind into these long form educational blog posts. I hope I have done him proud <3
I also hope I've helped make some of the choices that the Future
trait made make sense,
I was confused by a lot of these things in the beginning and I never found a single definitive guide
for why they are the way they are, and how to use them effectively. This is hoping to filling that gap.
Currently, what we have written in this post is only a single-threaded runtime. Making it multi-threaded is a bit more complex and requires an executor per thread, and a shared reactor between threads.
There's also the problem of having a queue per executor + a global queue. If you're interested, you can read into how tokio manages their executor scheduler.
The timers are also very naive. Tokio has another post talking about how they optimise their timers for both sub millisecond precision and also multi minute sleeps.
I've skipped over IO in this post too, but it can be added using the reactor model too. AsyncIO futures would register their IO interests onto the reactor, the reactor would poll the OS for events, these would be fed through and wake up the dependant tasks. Tokio does exactly this, and created the mio crate for a better OS abstraction.
If you want a complete code sample, I have published a multi-threaded version of this code: https://github.com/conradludgate/what-the-async
Finally, I want to thank all the tokio team/contributors for their early efforts in this space. Great code and great blogs, and they have been very helpful in guiding me. Also thanks to all those who hang out in the #async channel in the Rust Community Discord Server.