Tokio Channels
Mental Model
Tokio channels are message-passing primitives for coordinating async tasks without sharing mutable state directly.
Use the channel shape to describe who can send and who can receive:
| Channel | Shape | Keeps | Use when |
|---|---|---|---|
mpsc | many producers, one consumer | queued messages | one task should process each job, command, or event |
oneshot | one producer, one consumer | one value | one task needs to return one result to one waiter |
broadcast | many producers, many consumers | bounded event history | every subscriber should see each event if it keeps up |
watch | many producers, many consumers | latest value | receivers only need the current state |
The practical rule is:
If every item matters, use mpsc or broadcast.
If only the newest value matters, use watch.
If there is exactly one reply, use oneshot.
mpsc
mpsc is a multi-producer, single-consumer queue. Use it for work items, actor commands, and event streams where each message should be handled by exactly one receiver.
use tokio::sync::mpsc;
#[derive(Debug)]
enum Command {
Refresh,
Stop,
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let worker = tokio::spawn(async move {
while let Some(command) = rx.recv().await {
println!("handle {command:?}");
}
});
tx.send(Command::Refresh).await.unwrap();
tx.send(Command::Stop).await.unwrap();
drop(tx);
worker.await.unwrap();
}
Prefer bounded mpsc::channel(capacity) for application queues because capacity gives backpressure. If the receiver falls behind, send().await waits instead of allowing unbounded memory growth.
Use mpsc when:
- one background task owns a resource, such as a socket, database connection, or mutable state machine;
- producers submit commands to that owner task;
- message order and intermediate messages matter;
- overload should slow producers through bounded capacity.
oneshot
oneshot sends one value from one sender to one receiver. It is commonly paired with mpsc for request-response commands.
use tokio::sync::{mpsc, oneshot};
enum Command {
GetStatus {
reply: oneshot::Sender<String>,
},
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Command>(16);
let worker = tokio::spawn(async move {
while let Some(command) = rx.recv().await {
match command {
Command::GetStatus { reply } => {
let _ = reply.send("ready".to_owned());
}
}
}
});
let (reply_tx, reply_rx) = oneshot::channel();
tx.send(Command::GetStatus { reply: reply_tx }).await.unwrap();
let status = reply_rx.await.unwrap();
assert_eq!(status, "ready");
drop(tx);
worker.await.unwrap();
}
Use a task JoinHandle instead of oneshot when the spawned task only needs to return its final value. Use oneshot when the reply is part of a larger command protocol or when the task keeps running after sending the reply.
broadcast
broadcast is a multi-producer, multi-consumer event channel. Every receiver has its own cursor and receives its own copy of each value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx_a) = broadcast::channel(16);
let mut rx_b = tx.subscribe();
tx.send("reloaded").unwrap();
assert_eq!(rx_a.recv().await.unwrap(), "reloaded");
assert_eq!(rx_b.recv().await.unwrap(), "reloaded");
}
Use broadcast when:
- events should fan out to multiple independent consumers;
- subscribers can tolerate lag detection and recovery;
- the message type is cheap enough to clone for each receiver.
broadcast is bounded. If a receiver is too slow, older messages can be overwritten and that receiver gets a lag error. If losing events is unacceptable, use one mpsc queue per consumer or persist events somewhere that supports replay.
watch
watch is a latest-value channel. It can have many senders and many receivers, but it stores only the most recent value.
use tokio::sync::watch;
#[derive(Clone, Debug, PartialEq, Eq)]
enum ShutdownState {
Running,
Draining,
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel(ShutdownState::Running);
let worker = tokio::spawn(async move {
while rx.changed().await.is_ok() {
if *rx.borrow_and_update() == ShutdownState::Draining {
break;
}
}
});
tx.send(ShutdownState::Draining).unwrap();
worker.await.unwrap();
}
Use watch for current state:
- configuration snapshots;
- shutdown state;
- readiness or health state;
- leader epoch, mode, or feature-flag state.
Do not use watch for event logs or work queues. Slow receivers can skip intermediate values, by design.
Choosing Between Channels
| Need | Prefer |
|---|---|
| Send jobs to one worker task | mpsc |
| Send commands to a resource owner task | mpsc |
| Attach one reply to a command | mpsc plus oneshot |
| Return one final value from a spawned task | JoinHandle |
| Notify many subscribers of each event | broadcast |
| Share the latest config or shutdown state | watch |
| Wake tasks without data | Notify |
When the choice is unclear, decide whether the value is an event or state:
- Event: "this happened" and every occurrence may matter. Use
mpscorbroadcast. - State: "the current value is this" and old values can be skipped. Use
watch.