Rust Stream
Mental model
Stream<Item = T> is the async counterpart to Iterator<Item = T>.
| Synchronous | Asynchronous |
|---|---|
Iterator | Stream |
next() | next().await from StreamExt |
Option<T> | Option<T> after awaiting |
A stream produces zero or more values over time. It is useful for incoming messages, timers, file or network chunks, server-sent events, and any API that should yield values before the full operation is complete.
The core trait is poll-based:
use futures_core::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
};
struct Counter {
current: usize,
end: usize,
}
impl Stream for Counter {
type Item = usize;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.current >= self.end {
return Poll::Ready(None);
}
let value = self.current;
self.current += 1;
Poll::Ready(Some(value))
}
}
Most application code should not implement poll_next by hand. Prefer constructors, adapters, channels, or async-stream.
Consume a stream
Use StreamExt::next to await values:
use futures_util::{pin_mut, stream, StreamExt};
#[tokio::main]
async fn main() {
let values = stream::iter([1, 2, 3]);
pin_mut!(values);
while let Some(value) = values.next().await {
println!("{value}");
}
}
StreamExt also provides adapters such as map, filter, filter_map, take, collect, and for_each.
Create a stream with async-stream
Use async_stream::stream! when stream logic is naturally written as a loop with await and yield:
use async_stream::stream;
use futures_core::Stream;
use std::time::Duration;
fn ticks() -> impl Stream<Item = usize> {
stream! {
let mut count = 0;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
yield count;
count += 1;
}
}
}
Consume it the same way:
use futures_util::{pin_mut, StreamExt};
let ticks = ticks();
pin_mut!(ticks);
while let Some(value) = ticks.next().await {
println!("{value}");
}
async-stream also provides try_stream! for fallible streams. It yields Result<T, E> and supports ?:
use async_stream::try_stream;
use futures_core::Stream;
use std::{io, path::PathBuf};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
fn read_lines(path: PathBuf) -> impl Stream<Item = io::Result<String>> {
try_stream! {
let file = File::open(path).await?;
let mut lines = BufReader::new(file).lines();
while let Some(line) = lines.next_line().await? {
yield line;
}
}
}
Convert a channel to a stream
Tokio mpsc::Receiver has recv().await, but it is not itself a Stream. Wrap it with ReceiverStream when an API wants a stream:
use futures_util::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(16);
tokio::spawn(async move {
let _ = tx.send("hello").await;
let _ = tx.send("stream").await;
});
let mut stream = ReceiverStream::new(rx);
while let Some(message) = stream.next().await {
println!("{message}");
}
}
Use this pattern when a producer task should send values independently from the stream consumer.
Axum SSE
Axum SSE responses accept a stream of Result<Event, E> values. async-stream is a convenient way to build that stream:
use async_stream::stream;
use axum::{
response::sse::{Event, KeepAlive, Sse},
routing::get,
Router,
};
use futures_core::Stream;
use std::{convert::Infallible, time::Duration};
async fn events() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream! {
let mut count = 0;
loop {
yield Ok(Event::default()
.event("tick")
.data(count.to_string()));
count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
};
Sse::new(stream).keep_alive(KeepAlive::default())
}
fn app() -> Router {
Router::new().route("/events", get(events))
}
For SSE:
- Use
KeepAliveso proxies and clients can detect a live connection. - Yield
Result<Event, E>so the stream can report errors when needed. - Stop producing values when the receiver is dropped; channel-based streams naturally stop when sends fail.
Return type choices
Prefer impl Stream<Item = T> when the caller does not need the concrete stream type:
use futures_core::Stream;
fn values() -> impl Stream<Item = usize> {
futures_util::stream::iter([1, 2, 3])
}
Use Pin<Box<dyn Stream<Item = T> + Send>> when different branches must return different stream implementations:
use futures_core::Stream;
use std::pin::Pin;
fn boxed_values(enabled: bool) -> Pin<Box<dyn Stream<Item = usize> + Send>> {
if enabled {
Box::pin(futures_util::stream::iter([1, 2, 3]))
} else {
Box::pin(futures_util::stream::empty())
}
}
Avoid exposing boxed streams unless the dynamic dispatch or type erasure is actually useful.