| name | tokio-patterns |
| description | Common Tokio patterns and idioms for async programming. Use when implementing worker pools, request-response patterns, pub/sub, timeouts, retries, or graceful shutdown. |
Tokio Patterns
This skill provides common patterns and idioms for building robust async applications with Tokio.
Worker Pool Pattern
Limit concurrent task execution using a semaphore:
use tokio::sync::Semaphore;
use std::sync::Arc;
pub struct WorkerPool {
semaphore: Arc<Semaphore>,
}
impl WorkerPool {
pub fn new(size: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(size)),
}
}
pub async fn execute<F, T>(&self, f: F) -> T
where
F: Future<Output = T>,
{
let _permit = self.semaphore.acquire().await.unwrap();
f.await
}
}
// Usage
let pool = WorkerPool::new(10);
let results = futures::future::join_all(
(0..100).map(|i| pool.execute(process_item(i)))
).await;
Request-Response Pattern
Use oneshot channels for request-response communication:
use tokio::sync::{mpsc, oneshot};
pub enum Command {
Get { key: String, respond_to: oneshot::Sender<Option<String>> },
Set { key: String, value: String },
}
pub async fn actor(mut rx: mpsc::Receiver<Command>) {
let mut store = HashMap::new();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, respond_to } => {
let value = store.get(&key).cloned();
let _ = respond_to.send(value);
}
Command::Set { key, value } => {
store.insert(key, value);
}
}
}
}
// Client usage
let (tx, rx) = mpsc::channel(32);
tokio::spawn(actor(rx));
let (respond_to, response) = oneshot::channel();
tx.send(Command::Get { key: "foo".into(), respond_to }).await.unwrap();
let value = response.await.unwrap();
Pub/Sub with Channels
Use broadcast channels for pub/sub messaging:
use tokio::sync::broadcast;
pub struct PubSub<T: Clone> {
tx: broadcast::Sender<T>,
}
impl<T: Clone> PubSub<T> {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self { tx }
}
pub fn subscribe(&self) -> broadcast::Receiver<T> {
self.tx.subscribe()
}
pub fn publish(&self, message: T) -> Result<usize, broadcast::error::SendError<T>> {
self.tx.send(message)
}
}
// Usage
let pubsub = PubSub::new(100);
// Subscriber 1
let mut rx1 = pubsub.subscribe();
tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!("Subscriber 1: {:?}", msg);
}
});
// Subscriber 2
let mut rx2 = pubsub.subscribe();
tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!("Subscriber 2: {:?}", msg);
}
});
// Publisher
pubsub.publish("Hello".to_string()).unwrap();
Timeout Pattern
Wrap operations with timeouts:
use tokio::time::{timeout, Duration};
pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
where
F: Future<Output = Result<T, Error>>,
{
match timeout(duration, future).await {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(TimeoutError::Inner(e)),
Err(_) => Err(TimeoutError::Elapsed),
}
}
// Usage
let result = with_timeout(
Duration::from_secs(5),
fetch_data()
).await?;
Retry with Exponential Backoff
Retry failed operations with backoff:
use tokio::time::{sleep, Duration};
pub async fn retry_with_backoff<F, T, E>(
mut operation: F,
max_retries: u32,
initial_backoff: Duration,
) -> Result<T, E>
where
F: FnMut() -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
{
let mut retries = 0;
let mut backoff = initial_backoff;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) if retries < max_retries => {
retries += 1;
sleep(backoff).await;
backoff *= 2; // Exponential backoff
}
Err(e) => return Err(e),
}
}
}
// Usage
let result = retry_with_backoff(
|| Box::pin(fetch_data()),
3,
Duration::from_millis(100)
).await?;
Graceful Shutdown
Coordinate graceful shutdown across components:
use tokio::sync::broadcast;
use tokio::select;
pub struct ShutdownCoordinator {
tx: broadcast::Sender<()>,
}
impl ShutdownCoordinator {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(1);
Self { tx }
}
pub fn subscribe(&self) -> broadcast::Receiver<()> {
self.tx.subscribe()
}
pub fn shutdown(&self) {
let _ = self.tx.send(());
}
}
// Worker pattern
pub async fn worker(mut shutdown: broadcast::Receiver<()>) {
loop {
select! {
_ = shutdown.recv() => {
// Cleanup
break;
}
result = do_work() => {
// Process result
}
}
}
}
// Main
let coordinator = ShutdownCoordinator::new();
let shutdown_rx1 = coordinator.subscribe();
let h1 = tokio::spawn(worker(shutdown_rx1));
let shutdown_rx2 = coordinator.subscribe();
let h2 = tokio::spawn(worker(shutdown_rx2));
// Wait for signal
tokio::signal::ctrl_c().await.unwrap();
coordinator.shutdown();
// Wait for workers
let _ = tokio::join!(h1, h2);
Async Initialization
Lazy async initialization with OnceCell:
use tokio::sync::OnceCell;
pub struct Service {
connection: OnceCell<Connection>,
}
impl Service {
pub fn new() -> Self {
Self {
connection: OnceCell::new(),
}
}
async fn get_connection(&self) -> &Connection {
self.connection
.get_or_init(|| async {
Connection::connect().await.unwrap()
})
.await
}
pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
let conn = self.get_connection().await;
conn.query(sql).await
}
}
Resource Cleanup with Drop
Ensure cleanup even on task cancellation:
pub struct Resource {
handle: SomeHandle,
}
impl Resource {
pub async fn new() -> Self {
Self {
handle: acquire_resource().await,
}
}
pub async fn use_resource(&self) -> Result<()> {
// Use the resource
Ok(())
}
}
impl Drop for Resource {
fn drop(&mut self) {
// Synchronous cleanup
// For async cleanup, use a separate shutdown method
self.handle.close();
}
}
// For async cleanup
impl Resource {
pub async fn shutdown(self) {
// Async cleanup
self.handle.close_async().await;
}
}
Select Multiple Futures
Use select! to race multiple operations:
use tokio::select;
pub async fn select_example() {
let mut rx1 = channel1();
let mut rx2 = channel2();
loop {
select! {
msg = rx1.recv() => {
if let Some(msg) = msg {
handle_channel1(msg).await;
} else {
break;
}
}
msg = rx2.recv() => {
if let Some(msg) = msg {
handle_channel2(msg).await;
} else {
break;
}
}
_ = tokio::time::sleep(Duration::from_secs(60)) => {
check_timeout().await;
}
}
}
}
Cancellation Token Pattern
Use tokio_util::sync::CancellationToken for cooperative cancellation:
use tokio_util::sync::CancellationToken;
pub async fn worker(token: CancellationToken) {
loop {
tokio::select! {
_ = token.cancelled() => {
// Cleanup
break;
}
_ = do_work() => {
// Continue
}
}
}
}
// Hierarchical cancellation
let parent_token = CancellationToken::new();
let child_token = parent_token.child_token();
tokio::spawn(worker(child_token));
// Cancel all
parent_token.cancel();
Best Practices
- Use semaphores for limiting concurrent operations
- Implement graceful shutdown in all long-running tasks
- Add timeouts to external operations
- Use channels for inter-task communication
- Handle cancellation properly in all tasks
- Clean up resources in Drop or explicit shutdown methods
- Use appropriate channel types for different patterns
- Implement retries for transient failures
- Use select! for coordinating multiple async operations
- Document lifetime and ownership patterns clearly