본문으로 건너뛰기

Tokio Channels

Mental Model

Tokio channels are message-passing primitives for coordinating async tasks without sharing mutable state directly.

Use the channel shape to describe who can send and who can receive:

ChannelShapeKeepsUse when
mpscmany producers, one consumerqueued messagesone task should process each job, command, or event
oneshotone producer, one consumerone valueone task needs to return one result to one waiter
broadcastmany producers, many consumersbounded event historyevery subscriber should see each event if it keeps up
watchmany producers, many consumerslatest valuereceivers only need the current state

The practical rule is:

If every item matters, use mpsc or broadcast.
If only the newest value matters, use watch.
If there is exactly one reply, use oneshot.

mpsc

mpsc is a multi-producer, single-consumer queue. Use it for work items, actor commands, and event streams where each message should be handled by exactly one receiver.

use tokio::sync::mpsc;

#[derive(Debug)]
enum Command {
Refresh,
Stop,
}

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);

let worker = tokio::spawn(async move {
while let Some(command) = rx.recv().await {
println!("handle {command:?}");
}
});

tx.send(Command::Refresh).await.unwrap();
tx.send(Command::Stop).await.unwrap();
drop(tx);

worker.await.unwrap();
}

Prefer bounded mpsc::channel(capacity) for application queues because capacity gives backpressure. If the receiver falls behind, send().await waits instead of allowing unbounded memory growth.

Use mpsc when:

  • one background task owns a resource, such as a socket, database connection, or mutable state machine;
  • producers submit commands to that owner task;
  • message order and intermediate messages matter;
  • overload should slow producers through bounded capacity.

oneshot

oneshot sends one value from one sender to one receiver. It is commonly paired with mpsc for request-response commands.

use tokio::sync::{mpsc, oneshot};

enum Command {
GetStatus {
reply: oneshot::Sender<String>,
},
}

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Command>(16);

let worker = tokio::spawn(async move {
while let Some(command) = rx.recv().await {
match command {
Command::GetStatus { reply } => {
let _ = reply.send("ready".to_owned());
}
}
}
});

let (reply_tx, reply_rx) = oneshot::channel();
tx.send(Command::GetStatus { reply: reply_tx }).await.unwrap();

let status = reply_rx.await.unwrap();
assert_eq!(status, "ready");

drop(tx);
worker.await.unwrap();
}

Use a task JoinHandle instead of oneshot when the spawned task only needs to return its final value. Use oneshot when the reply is part of a larger command protocol or when the task keeps running after sending the reply.

broadcast

broadcast is a multi-producer, multi-consumer event channel. Every receiver has its own cursor and receives its own copy of each value.

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
let (tx, mut rx_a) = broadcast::channel(16);
let mut rx_b = tx.subscribe();

tx.send("reloaded").unwrap();

assert_eq!(rx_a.recv().await.unwrap(), "reloaded");
assert_eq!(rx_b.recv().await.unwrap(), "reloaded");
}

Use broadcast when:

  • events should fan out to multiple independent consumers;
  • subscribers can tolerate lag detection and recovery;
  • the message type is cheap enough to clone for each receiver.

broadcast is bounded. If a receiver is too slow, older messages can be overwritten and that receiver gets a lag error. If losing events is unacceptable, use one mpsc queue per consumer or persist events somewhere that supports replay.

watch

watch is a latest-value channel. It can have many senders and many receivers, but it stores only the most recent value.

use tokio::sync::watch;

#[derive(Clone, Debug, PartialEq, Eq)]
enum ShutdownState {
Running,
Draining,
}

#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel(ShutdownState::Running);

let worker = tokio::spawn(async move {
while rx.changed().await.is_ok() {
if *rx.borrow_and_update() == ShutdownState::Draining {
break;
}
}
});

tx.send(ShutdownState::Draining).unwrap();
worker.await.unwrap();
}

Use watch for current state:

  • configuration snapshots;
  • shutdown state;
  • readiness or health state;
  • leader epoch, mode, or feature-flag state.

Do not use watch for event logs or work queues. Slow receivers can skip intermediate values, by design.

Choosing Between Channels

NeedPrefer
Send jobs to one worker taskmpsc
Send commands to a resource owner taskmpsc
Attach one reply to a commandmpsc plus oneshot
Return one final value from a spawned taskJoinHandle
Notify many subscribers of each eventbroadcast
Share the latest config or shutdown statewatch
Wake tasks without dataNotify

When the choice is unclear, decide whether the value is an event or state:

  • Event: "this happened" and every occurrence may matter. Use mpsc or broadcast.
  • State: "the current value is this" and old values can be skipped. Use watch.