| name | tokio-networking |
| description | Network programming patterns with Hyper, Tonic, and Tower. Use when building HTTP services, gRPC applications, implementing middleware, connection pooling, or health checks. |
Tokio Networking Patterns
This skill provides network programming patterns for building production-grade services with the Tokio ecosystem.
HTTP Service with Hyper and Axum
Build HTTP services with routing and middleware:
use axum::{
Router,
routing::{get, post},
extract::{State, Path, Json},
response::IntoResponse,
middleware,
};
use std::sync::Arc;
#[derive(Clone)]
struct AppState {
db: Arc<Database>,
cache: Arc<Cache>,
}
async fn create_app() -> Router {
let state = AppState {
db: Arc::new(Database::new().await),
cache: Arc::new(Cache::new()),
};
Router::new()
.route("/health", get(health_check))
.route("/api/v1/users", get(list_users).post(create_user))
.route("/api/v1/users/:id", get(get_user).delete(delete_user))
.layer(middleware::from_fn(logging_middleware))
.layer(middleware::from_fn(auth_middleware))
.with_state(state)
}
async fn health_check() -> impl IntoResponse {
"OK"
}
async fn get_user(
State(state): State<AppState>,
Path(id): Path<u64>,
) -> Result<Json<User>, StatusCode> {
state.db.get_user(id)
.await
.map(Json)
.ok_or(StatusCode::NOT_FOUND)
}
async fn logging_middleware<B>(
req: Request<B>,
next: Next<B>,
) -> impl IntoResponse {
let method = req.method().clone();
let uri = req.uri().clone();
let start = Instant::now();
let response = next.run(req).await;
let duration = start.elapsed();
tracing::info!(
method = %method,
uri = %uri,
status = %response.status(),
duration_ms = duration.as_millis(),
"request completed"
);
response
}
gRPC Service with Tonic
Build type-safe gRPC services:
use tonic::{transport::Server, Request, Response, Status};
pub mod proto {
tonic::include_proto!("myservice");
}
use proto::my_service_server::{MyService, MyServiceServer};
#[derive(Default)]
pub struct MyServiceImpl {
db: Arc<Database>,
}
#[tonic::async_trait]
impl MyService for MyServiceImpl {
async fn get_user(
&self,
request: Request<proto::GetUserRequest>,
) -> Result<Response<proto::User>, Status> {
let req = request.into_inner();
let user = self.db.get_user(req.id)
.await
.map_err(|e| Status::internal(e.to_string()))?
.ok_or_else(|| Status::not_found("User not found"))?;
Ok(Response::new(proto::User {
id: user.id,
name: user.name,
email: user.email,
}))
}
type ListUsersStream = ReceiverStream<Result<proto::User, Status>>;
async fn list_users(
&self,
request: Request<proto::ListUsersRequest>,
) -> Result<Response<Self::ListUsersStream>, Status> {
let (tx, rx) = mpsc::channel(100);
let db = self.db.clone();
tokio::spawn(async move {
let mut users = db.list_users().await.unwrap();
while let Some(user) = users.next().await {
let proto_user = proto::User {
id: user.id,
name: user.name,
email: user.email,
};
if tx.send(Ok(proto_user)).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
async fn serve() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let service = MyServiceImpl::default();
Server::builder()
.add_service(MyServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
Tower Middleware Composition
Layer middleware for cross-cutting concerns:
use tower::{ServiceBuilder, Service};
use tower_http::{
trace::TraceLayer,
compression::CompressionLayer,
timeout::TimeoutLayer,
limit::RateLimitLayer,
};
use std::time::Duration;
fn create_middleware_stack<S>(service: S) -> impl Service
where
S: Service + Clone,
{
ServiceBuilder::new()
// Outermost layer (executed first)
.layer(TraceLayer::new_for_http())
.layer(CompressionLayer::new())
.layer(TimeoutLayer::new(Duration::from_secs(30)))
.layer(RateLimitLayer::new(100, Duration::from_secs(1)))
// Innermost layer (executed last)
.service(service)
}
// Custom middleware
use tower::Layer;
#[derive(Clone)]
struct MetricsLayer {
metrics: Arc<Metrics>,
}
impl<S> Layer<S> for MetricsLayer {
type Service = MetricsService<S>;
fn layer(&self, inner: S) -> Self::Service {
MetricsService {
inner,
metrics: self.metrics.clone(),
}
}
}
#[derive(Clone)]
struct MetricsService<S> {
inner: S,
metrics: Arc<Metrics>,
}
impl<S, Req> Service<Req> for MetricsService<S>
where
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
type Future = /* ... */;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
self.metrics.requests_total.inc();
let timer = self.metrics.request_duration.start_timer();
let future = self.inner.call(req);
let metrics = self.metrics.clone();
Box::pin(async move {
let result = future.await;
timer.observe_duration();
result
})
}
}
Connection Pooling
Manage connection pools efficiently:
use deadpool_postgres::{Config, Pool, Runtime};
use tokio_postgres::NoTls;
pub struct DatabasePool {
pool: Pool,
}
impl DatabasePool {
pub async fn new(config: &DatabaseConfig) -> Result<Self, Error> {
let mut cfg = Config::new();
cfg.host = Some(config.host.clone());
cfg.port = Some(config.port);
cfg.dbname = Some(config.database.clone());
cfg.user = Some(config.user.clone());
cfg.password = Some(config.password.clone());
let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls)?;
Ok(Self { pool })
}
pub async fn get(&self) -> Result<Client, Error> {
self.pool.get().await.map_err(Into::into)
}
pub async fn query<T>(&self, f: impl FnOnce(&Client) -> F) -> Result<T, Error>
where
F: Future<Output = Result<T, Error>>,
{
let client = self.get().await?;
f(&client).await
}
}
// Usage
let pool = DatabasePool::new(&config).await?;
let users = pool.query(|client| async move {
client.query("SELECT * FROM users", &[])
.await
.map_err(Into::into)
}).await?;
Health Checks and Readiness Probes
Implement comprehensive health checks:
use axum::{Router, routing::get, Json};
use serde::Serialize;
#[derive(Serialize)]
struct HealthResponse {
status: String,
version: String,
dependencies: Vec<DependencyHealth>,
}
#[derive(Serialize)]
struct DependencyHealth {
name: String,
status: String,
latency_ms: Option<u64>,
message: Option<String>,
}
async fn health_check(State(state): State<Arc<AppState>>) -> Json<HealthResponse> {
let mut dependencies = Vec::new();
// Check database
let db_start = Instant::now();
let db_status = match state.db.ping().await {
Ok(_) => DependencyHealth {
name: "database".into(),
status: "healthy".into(),
latency_ms: Some(db_start.elapsed().as_millis() as u64),
message: None,
},
Err(e) => DependencyHealth {
name: "database".into(),
status: "unhealthy".into(),
latency_ms: None,
message: Some(e.to_string()),
},
};
dependencies.push(db_status);
// Check cache
let cache_start = Instant::now();
let cache_status = match state.cache.ping().await {
Ok(_) => DependencyHealth {
name: "cache".into(),
status: "healthy".into(),
latency_ms: Some(cache_start.elapsed().as_millis() as u64),
message: None,
},
Err(e) => DependencyHealth {
name: "cache".into(),
status: "unhealthy".into(),
latency_ms: None,
message: Some(e.to_string()),
},
};
dependencies.push(cache_status);
let all_healthy = dependencies.iter().all(|d| d.status == "healthy");
Json(HealthResponse {
status: if all_healthy { "healthy" } else { "unhealthy" }.into(),
version: env!("CARGO_PKG_VERSION").into(),
dependencies,
})
}
async fn readiness_check(State(state): State<Arc<AppState>>) -> StatusCode {
if state.is_ready().await {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
pub fn health_routes() -> Router<Arc<AppState>> {
Router::new()
.route("/health", get(health_check))
.route("/ready", get(readiness_check))
.route("/live", get(|| async { StatusCode::OK }))
}
Circuit Breaker Pattern
Protect against cascading failures:
use std::sync::atomic::{AtomicU64, Ordering};
pub struct ServiceClient {
client: reqwest::Client,
circuit_breaker: CircuitBreaker,
}
impl ServiceClient {
pub async fn call(&self, req: Request) -> Result<Response, Error> {
self.circuit_breaker.call(async {
self.client
.execute(req)
.await
.map_err(Into::into)
}).await
}
}
Load Balancing
Distribute requests across multiple backends:
use tower::balance::p2c::Balance;
use tower::discover::ServiceList;
pub struct LoadBalancer {
balancer: Balance<ServiceList<Vec<ServiceEndpoint>>, Request>,
}
impl LoadBalancer {
pub fn new(endpoints: Vec<String>) -> Self {
let services: Vec<_> = endpoints
.into_iter()
.map(|endpoint| create_client(endpoint))
.collect();
let balancer = Balance::new(ServiceList::new(services));
Self { balancer }
}
pub async fn call(&mut self, req: Request) -> Result<Response, Error> {
self.balancer.call(req).await
}
}
Request Deduplication
Deduplicate concurrent identical requests:
use tokio::sync::Mutex;
use std::collections::HashMap;
pub struct RequestDeduplicator<K, V> {
in_flight: Arc<Mutex<HashMap<K, Arc<tokio::sync::Notify>>>>,
cache: Arc<Mutex<HashMap<K, Arc<V>>>>,
}
impl<K: Eq + Hash + Clone, V> RequestDeduplicator<K, V> {
pub fn new() -> Self {
Self {
in_flight: Arc::new(Mutex::new(HashMap::new())),
cache: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn get_or_fetch<F, Fut>(
&self,
key: K,
fetch: F,
) -> Result<Arc<V>, Error>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<V, Error>>,
{
// Check cache
{
let cache = self.cache.lock().await;
if let Some(value) = cache.get(&key) {
return Ok(value.clone());
}
}
// Check if request is in flight
let notify = {
let mut in_flight = self.in_flight.lock().await;
if let Some(notify) = in_flight.get(&key) {
notify.clone()
} else {
let notify = Arc::new(tokio::sync::Notify::new());
in_flight.insert(key.clone(), notify.clone());
notify
}
};
// Wait if another request is in progress
notify.notified().await;
// Check cache again
{
let cache = self.cache.lock().await;
if let Some(value) = cache.get(&key) {
return Ok(value.clone());
}
}
// Fetch value
let value = Arc::new(fetch().await?);
// Update cache
{
let mut cache = self.cache.lock().await;
cache.insert(key.clone(), value.clone());
}
// Remove from in-flight and notify
{
let mut in_flight = self.in_flight.lock().await;
in_flight.remove(&key);
}
notify.notify_waiters();
Ok(value)
}
}
Best Practices
- Use connection pooling for database and HTTP connections
- Implement health checks for all dependencies
- Add circuit breakers for external service calls
- Use appropriate timeouts for all network operations
- Implement retry logic with exponential backoff
- Add comprehensive middleware for logging, metrics, auth
- Use load balancing for high availability
- Deduplicate requests to reduce load
- Monitor latency and error rates
- Design for graceful degradation when services fail