Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I convert a Stream into a Future?

I have an asynchronous Stream and I'd like to get the first value out of it. How can I do so?

use futures::Stream; // 0.3.5

async fn example<T>(s: impl Stream<Item = T>) -> Option<T> {
    todo!("What goes here?")
}
like image 941
Shepmaster Avatar asked Oct 28 '25 15:10

Shepmaster


1 Answers

You can use StreamExt::next:

use futures::{Stream, StreamExt}; // 0.3.5

async fn example<T>(mut s: impl Stream<Item = T> + Unpin) -> Option<T> {
    s.next().await
}

You can use StreamExt::into_future:

use futures::{FutureExt, Stream, StreamExt}; // 0.3.5

async fn example<T>(s: impl Stream<Item = T> + Unpin) -> Option<T> {
    s.into_future().map(|(v, _)| v).await
}

In rare cases, you may want to use future::poll_fn to have complete control:

use futures::{future, task::Poll, Stream, StreamExt}; // 0.3.5

async fn example<T>(mut s: impl Stream<Item = T> + Unpin) -> Option<T> {
    future::poll_fn(|ctx| {
        // Could use methods like `Poll::map` or
        // the `?` operator instead of a `match`.
        match s.poll_next_unpin(ctx) {
            Poll::Ready(v) => {
                // Do any special logic here
                Poll::Ready(v)
            }
            Poll::Pending => Poll::Pending,
        }
    })
    .await
}

See also:

  • How to convert a Future into a Stream?

More broadly

If you wanted to act upon all the values in the stream, producing a single value, you can use StreamExt::fold:

use futures::{Stream, StreamExt}; // 0.3.5

async fn example(s: impl Stream + Unpin) -> usize {
    s.fold(0, |st, _| async move { st + 1 }).await
}

If you wanted to act upon all the values in the stream without producing a value, you can use StreamExt::for_each:

use futures::{Stream, StreamExt}; // 0.3.5

async fn example<I: std::fmt::Debug>(s: impl Stream<Item = I> + Unpin) {
    s.for_each(|i| async {
        dbg!(i);
    })
    .await;
}

See also:

  • How to select between a future and stream in Rust?

Unpin

These example all require that the incoming Stream implement Unpin. You could also pin the stream yourself via Box::pin or the pin_mut! macro.

See also:

  • What are the use cases of the newly proposed Pin type?
like image 96
Shepmaster Avatar answered Oct 31 '25 13:10

Shepmaster



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!