I thought I should be fair and go through many existing Stream implementations, simple and complex, and show off how they might look as async functions.
BufferedOrdered
is a stream method which, given a stream of futures, can execute multiple futures in parallel.
This implementation needs to change quite substantially to be written in async.
The naive implementation might be to tokio::select!
on both the base stream, and the FuturesOrdered
stream, like so:
impl<St> AsyncIterator for BufferedOrdered<St>
where
St: AsyncIterator,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;
async fn next(&mut self) -> Option<Self::Item> {
loop {
select! {
Some(s) = self.stream.next(), if !self.ordered.is_full() => {
self.ordered.push_back(s);
}
Some(val) = self.ordered.next() => return Some(val)
else => return None,
}
}
}
}
This has a big problem though: Cancellation safety. Let's use the cancel safe futures::future::select
method instead.
impl<St> AsyncIterator for BufferedOrdered<St>
where
St: AsyncIterator,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;
async fn next(&mut self) -> Option<Self::Item> {
let mut ordered_next = pin!(self.ordered.next());
loop {
if !self.ordered.is_full() {
match futures::future::select(pin!(self.stream.next()), ordered_next) {
Either::Left(s, _ordered_next) => {
self.ordered.push_back(s); // ERROR, ALREADY BORROWED MUTABLY
},
// ...
}
}
}
}
}
Well, that's annoying. We have to cancel the ordered_next future in order to insert a new future into the set.
The only way I can see to implement this is to still expose a poll_next
method on the FuturesOrdered
, that way
you only have to worry about the cancellation of the original AsyncIterator.
This will require PinnedAsyncIterator, however, since need to store the future returned by St::next()
in case it outlasts tasks
already pushed into the FuturesOrdered
. If we try to do this, we bump into
pin_project! {
pub struct BufferedOrdered<St>
where
St: Stream,
St::Item: Future,
{
pub(crate) stream: Option<St>,
// 1. what type is this?
// 2. this future borrows from `stream` so this is self-ref and a pain to implement safely...
pub(crate) stream_next: Option<St::...>.
pub(crate) in_progress_queue: FuturesOrdered<St::Item>,
}
}
Therefore, I consider this effectively impossible to implement using safe rust.
Throttle imposes a limit on how often items can be released from a stream.
pub fn throttle<T>(duration: Duration, iter: T) -> Throttle<T>
where
T: AsyncIterator,
{
Throttle {
delay: None,
duration,
iter,
}
}
/// AsyncIterator for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to
/// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`.
#[derive(Debug)]
#[must_use = "AsyncIterators do nothing unless polled"]
pub struct Throttle<T> {
delay: Option<Instant>,
duration: Duration,
iter: T,
}
impl<T: AsyncIterator> AsyncIterator for Throttle<T> {
type Item = T::Item;
async fn next(&mut self) -> Option<Self::Item> {
if let Some(delay) = self.delay.take() {
tokio::sleep_until(delay).await;
}
match self.iter.next().await {
Some(x) => {
self.delay = Some(Instant::now() + self.duration);
Some(x)
},
None => None
}
}
}
This one has genuinely simplified things. We no longer need to keep track of whether we're in the sleep state or the iter.next() state.
This seems like a simple one. Discover
is essentially a stream of its own kind. Let's assume this is also async.
impl<D, C> AsyncIterator for PeakEwmaDiscover<D, C>
where
D: Discover,
C: Clone,
{
type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;
async fn next(&mut self) -> Option<Self::Item> {
let change = match self.discover().await? {
Err(e) => return Some(Err(e)),
Ok(change) => change,
};
let change = match change {
Change::Remove(k) => Change::Remove(k),
Change::Insert(k, svc) => {
let peak_ewma = PeakEwma::new(
svc,
*self.default_rtt,
*self.decay_ns,
self.completion.clone(),
);
Change::Insert(k, peak_ewma)
}
};
Some(Ok(change))
}
}
Not much to say about this one. It's simple, but no simpler. We've replaced a ready!(poll_discover(cx))
with a discover().await
.