Conrad Ludgate

Async Iterators 2

I thought I should be fair and go through many existing Stream implementations, simple and complex, and show off how they might look as async functions.

BufferedOrdered

source

BufferedOrdered is a stream method which, given a stream of futures, can execute multiple futures in parallel.

This implementation needs to change quite substantially to be written in async. The naive implementation might be to tokio::select! on both the base stream, and the FuturesOrdered stream, like so:

rust
impl<St> AsyncIterator for BufferedOrdered<St>
where
    St: AsyncIterator,
    St::Item: Future,
{
    type Item = <St::Item as Future>::Output;

    async fn next(&mut self) -> Option<Self::Item> {
        loop {
            select! {
                Some(s) = self.stream.next(), if !self.ordered.is_full() => {
                    self.ordered.push_back(s);
                }
                Some(val) = self.ordered.next() => return Some(val)
                else => return None,
            }
        }
    }
}

This has a big problem though: Cancellation safety. Let's use the cancel safe futures::future::select method instead.

rust
impl<St> AsyncIterator for BufferedOrdered<St>
where
    St: AsyncIterator,
    St::Item: Future,
{
    type Item = <St::Item as Future>::Output;

    async fn next(&mut self) -> Option<Self::Item> {
        let mut ordered_next = pin!(self.ordered.next());
        loop {
            if !self.ordered.is_full() {
                match futures::future::select(pin!(self.stream.next()), ordered_next) {
                    Either::Left(s, _ordered_next) => {
                        self.ordered.push_back(s); // ERROR, ALREADY BORROWED MUTABLY
                    },
                    // ...
                }
            }
        }
    }
}

Well, that's annoying. We have to cancel the ordered_next future in order to insert a new future into the set.

The only way I can see to implement this is to still expose a poll_next method on the FuturesOrdered, that way you only have to worry about the cancellation of the original AsyncIterator.

This will require PinnedAsyncIterator, however, since need to store the future returned by St::next() in case it outlasts tasks already pushed into the FuturesOrdered. If we try to do this, we bump into

rust
pin_project! {
    pub struct BufferedOrdered<St>
    where
        St: Stream,
        St::Item: Future,
    {
        pub(crate) stream: Option<St>,
        // 1. what type is this?
        // 2. this future borrows from `stream` so this is self-ref and a pain to implement safely...
        pub(crate) stream_next: Option<St::...>.
        pub(crate) in_progress_queue: FuturesOrdered<St::Item>,
    }
}

Therefore, I consider this effectively impossible to implement using safe rust.

Throttle

source

Throttle imposes a limit on how often items can be released from a stream.

rust
pub fn throttle<T>(duration: Duration, iter: T) -> Throttle<T>
where
    T: AsyncIterator,
{
    Throttle {
        delay: None,
        duration,
        iter,
    }
}

/// AsyncIterator for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to
/// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`.
#[derive(Debug)]
#[must_use = "AsyncIterators do nothing unless polled"]
pub struct Throttle<T> {
    delay: Option<Instant>,
    duration: Duration,
    iter: T,
}

impl<T: AsyncIterator> AsyncIterator for Throttle<T> {
    type Item = T::Item;

    async fn next(&mut self) -> Option<Self::Item> {
        if let Some(delay) = self.delay.take() {
            tokio::sleep_until(delay).await;
        }
        match self.iter.next().await {
            Some(x) => {
                self.delay = Some(Instant::now() + self.duration);
                Some(x)
            },
            None => None
        }
    }
}

This one has genuinely simplified things. We no longer need to keep track of whether we're in the sleep state or the iter.next() state.

PeakEwmaDiscover

source

This seems like a simple one. Discover is essentially a stream of its own kind. Let's assume this is also async.

rust
impl<D, C> AsyncIterator for PeakEwmaDiscover<D, C>
where
    D: Discover,
    C: Clone,
{
    type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;

    async fn next(&mut self) -> Option<Self::Item> {
        let change = match self.discover().await? {
            Err(e) => return Some(Err(e)),
            Ok(change) => change,
        };
        let change = match change {
            Change::Remove(k) => Change::Remove(k),
            Change::Insert(k, svc) => {
                let peak_ewma = PeakEwma::new(
                    svc,
                    *self.default_rtt,
                    *self.decay_ns,
                    self.completion.clone(),
                );
                Change::Insert(k, peak_ewma)
            }
        };
        Some(Ok(change))
    }
}

Not much to say about this one. It's simple, but no simpler. We've replaced a ready!(poll_discover(cx)) with a discover().await.