본문으로 건너뛰기

Rust Stream

Mental model

Stream<Item = T> is the async counterpart to Iterator<Item = T>.

SynchronousAsynchronous
IteratorStream
next()next().await from StreamExt
Option<T>Option<T> after awaiting

A stream produces zero or more values over time. It is useful for incoming messages, timers, file or network chunks, server-sent events, and any API that should yield values before the full operation is complete.

The core trait is poll-based:

use futures_core::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
};

struct Counter {
current: usize,
end: usize,
}

impl Stream for Counter {
type Item = usize;

fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.current >= self.end {
return Poll::Ready(None);
}

let value = self.current;
self.current += 1;
Poll::Ready(Some(value))
}
}

Most application code should not implement poll_next by hand. Prefer constructors, adapters, channels, or async-stream.

Consume a stream

Use StreamExt::next to await values:

use futures_util::{pin_mut, stream, StreamExt};

#[tokio::main]
async fn main() {
let values = stream::iter([1, 2, 3]);

pin_mut!(values);

while let Some(value) = values.next().await {
println!("{value}");
}
}

StreamExt also provides adapters such as map, filter, filter_map, take, collect, and for_each.

Create a stream with async-stream

Use async_stream::stream! when stream logic is naturally written as a loop with await and yield:

use async_stream::stream;
use futures_core::Stream;
use std::time::Duration;

fn ticks() -> impl Stream<Item = usize> {
stream! {
let mut count = 0;

loop {
tokio::time::sleep(Duration::from_secs(1)).await;
yield count;
count += 1;
}
}
}

Consume it the same way:

use futures_util::{pin_mut, StreamExt};

let ticks = ticks();
pin_mut!(ticks);

while let Some(value) = ticks.next().await {
println!("{value}");
}

async-stream also provides try_stream! for fallible streams. It yields Result<T, E> and supports ?:

use async_stream::try_stream;
use futures_core::Stream;
use std::{io, path::PathBuf};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};

fn read_lines(path: PathBuf) -> impl Stream<Item = io::Result<String>> {
try_stream! {
let file = File::open(path).await?;
let mut lines = BufReader::new(file).lines();

while let Some(line) = lines.next_line().await? {
yield line;
}
}
}

Convert a channel to a stream

Tokio mpsc::Receiver has recv().await, but it is not itself a Stream. Wrap it with ReceiverStream when an API wants a stream:

use futures_util::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(16);

tokio::spawn(async move {
let _ = tx.send("hello").await;
let _ = tx.send("stream").await;
});

let mut stream = ReceiverStream::new(rx);

while let Some(message) = stream.next().await {
println!("{message}");
}
}

Use this pattern when a producer task should send values independently from the stream consumer.

Axum SSE

Axum SSE responses accept a stream of Result<Event, E> values. async-stream is a convenient way to build that stream:

use async_stream::stream;
use axum::{
response::sse::{Event, KeepAlive, Sse},
routing::get,
Router,
};
use futures_core::Stream;
use std::{convert::Infallible, time::Duration};

async fn events() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream! {
let mut count = 0;

loop {
yield Ok(Event::default()
.event("tick")
.data(count.to_string()));

count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
};

Sse::new(stream).keep_alive(KeepAlive::default())
}

fn app() -> Router {
Router::new().route("/events", get(events))
}

For SSE:

  • Use KeepAlive so proxies and clients can detect a live connection.
  • Yield Result<Event, E> so the stream can report errors when needed.
  • Stop producing values when the receiver is dropped; channel-based streams naturally stop when sends fail.

Return type choices

Prefer impl Stream<Item = T> when the caller does not need the concrete stream type:

use futures_core::Stream;

fn values() -> impl Stream<Item = usize> {
futures_util::stream::iter([1, 2, 3])
}

Use Pin<Box<dyn Stream<Item = T> + Send>> when different branches must return different stream implementations:

use futures_core::Stream;
use std::pin::Pin;

fn boxed_values(enabled: bool) -> Pin<Box<dyn Stream<Item = usize> + Send>> {
if enabled {
Box::pin(futures_util::stream::iter([1, 2, 3]))
} else {
Box::pin(futures_util::stream::empty())
}
}

Avoid exposing boxed streams unless the dynamic dispatch or type erasure is actually useful.