Claude Code Plugins

Community-maintained marketplace

Feedback

FastAPI 2026 advanced patterns including lifespan, dependencies, middleware, settings, and async best practices. Use when building production FastAPI applications.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name fastapi-advanced
description FastAPI 2026 advanced patterns including lifespan, dependencies, middleware, settings, and async best practices. Use when building production FastAPI applications.
context fork
agent backend-system-architect
version 1.0.0
tags fastapi, python, async, middleware, dependencies, 2026
hooks [object Object]

FastAPI Advanced Patterns (2026)

Production-ready FastAPI patterns for modern Python backends.

When to Use

  • Building production FastAPI applications
  • Managing application lifecycle (startup/shutdown)
  • Creating reusable dependencies
  • Implementing custom middleware
  • Configuring settings with validation

Lifespan Management (2026)

Modern Lifespan Context Manager

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import redis.asyncio as redis

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan with resource management."""
    # Startup
    app.state.db_engine = create_async_engine(
        settings.database_url,
        pool_size=5,
        max_overflow=10,
    )
    app.state.redis = redis.from_url(settings.redis_url)

    # Health check connections
    async with app.state.db_engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    await app.state.redis.ping()

    yield  # Application runs

    # Shutdown
    await app.state.db_engine.dispose()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

Lifespan with Services

from app.services import EmbeddingsService, LLMService

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize services
    app.state.embeddings = EmbeddingsService(
        model=settings.embedding_model,
        batch_size=100,
    )
    app.state.llm = LLMService(
        providers=["openai", "anthropic"],
        default="anthropic",
    )

    # Warm up models (optional)
    await app.state.embeddings.warmup()

    yield

    # Cleanup
    await app.state.embeddings.close()
    await app.state.llm.close()

Dependency Injection Patterns

Database Session

from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request

async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    """Yield database session from app state."""
    async with AsyncSession(
        request.app.state.db_engine,
        expire_on_commit=False,
    ) as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Service Dependencies

from functools import lru_cache

class AnalysisService:
    def __init__(
        self,
        db: AsyncSession,
        embeddings: EmbeddingsService,
        llm: LLMService,
    ):
        self.db = db
        self.embeddings = embeddings
        self.llm = llm

def get_analysis_service(
    db: AsyncSession = Depends(get_db),
    request: Request = None,
) -> AnalysisService:
    return AnalysisService(
        db=db,
        embeddings=request.app.state.embeddings,
        llm=request.app.state.llm,
    )

@router.post("/analyses")
async def create_analysis(
    data: AnalysisCreate,
    service: AnalysisService = Depends(get_analysis_service),
):
    return await service.create(data)

Cached Dependencies

from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    redis_url: str
    api_key: str

    model_config = {"env_file": ".env"}

@lru_cache
def get_settings() -> Settings:
    return Settings()

# Usage in dependencies
def get_db_url(settings: Settings = Depends(get_settings)) -> str:
    return settings.database_url

Authenticated User

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = credentials.credentials
    payload = decode_jwt(token)

    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(401, "Invalid credentials")
    return user

async def get_admin_user(
    user: User = Depends(get_current_user),
) -> User:
    if not user.is_admin:
        raise HTTPException(403, "Admin access required")
    return user

Middleware Patterns

Request ID Middleware

import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id

        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response

app.add_middleware(RequestIDMiddleware)

Timing Middleware

import time
from starlette.middleware.base import BaseHTTPMiddleware

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration = time.perf_counter() - start

        response.headers["X-Response-Time"] = f"{duration:.3f}s"
        return response

Structured Logging Middleware

import structlog
from starlette.middleware.base import BaseHTTPMiddleware

logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        log = logger.bind(
            request_id=getattr(request.state, "request_id", None),
            method=request.method,
            path=request.url.path,
        )

        try:
            response = await call_next(request)
            log.info(
                "request_completed",
                status_code=response.status_code,
            )
            return response
        except Exception as exc:
            log.exception("request_failed", error=str(exc))
            raise

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["*"],
    expose_headers=["X-Request-ID", "X-Response-Time"],
)

Settings with Pydantic

from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # Database
    database_url: PostgresDsn
    db_pool_size: int = Field(default=5, ge=1, le=20)
    db_max_overflow: int = Field(default=10, ge=0, le=50)

    # Redis
    redis_url: str = "redis://localhost:6379"

    # API
    api_key: str = Field(min_length=32)
    debug: bool = False

    # LLM
    openai_api_key: str | None = None
    anthropic_api_key: str | None = None

    @field_validator("database_url", mode="before")
    @classmethod
    def validate_database_url(cls, v: str) -> str:
        if v and "+asyncpg" not in v:
            return v.replace("postgresql://", "postgresql+asyncpg://")
        return v

    @property
    def async_database_url(self) -> str:
        return str(self.database_url)

Exception Handlers

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import IntegrityError
from app.core.exceptions import ProblemException

@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
    return JSONResponse(
        status_code=exc.status_code,
        content=exc.to_problem_detail(),
        media_type="application/problem+json",
    )

@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
    return JSONResponse(
        status_code=409,
        content={
            "type": "https://api.example.com/problems/conflict",
            "title": "Conflict",
            "status": 409,
            "detail": "Resource already exists or constraint violated",
        },
        media_type="application/problem+json",
    )

Response Optimization

from fastapi.responses import ORJSONResponse

# Use orjson for faster JSON serialization
app = FastAPI(default_response_class=ORJSONResponse)

# Streaming response
from fastapi.responses import StreamingResponse

@router.get("/export")
async def export_data():
    async def generate():
        async for chunk in fetch_large_dataset():
            yield json.dumps(chunk) + "\n"

    return StreamingResponse(
        generate(),
        media_type="application/x-ndjson",
    )

Health Checks

from fastapi import APIRouter

health_router = APIRouter(tags=["health"])

@health_router.get("/health")
async def health_check(request: Request):
    checks = {}

    # Database
    try:
        async with request.app.state.db_engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        checks["database"] = "healthy"
    except Exception as e:
        checks["database"] = f"unhealthy: {e}"

    # Redis
    try:
        await request.app.state.redis.ping()
        checks["redis"] = "healthy"
    except Exception as e:
        checks["redis"] = f"unhealthy: {e}"

    status = "healthy" if all(v == "healthy" for v in checks.values()) else "unhealthy"
    return {"status": status, "checks": checks}

Anti-Patterns (FORBIDDEN)

# NEVER use global state
db_session = None  # Global mutable state!

# NEVER block the event loop
def sync_db_query():  # Blocking in async context!
    return session.query(User).all()

# NEVER skip dependency injection
@router.get("/users")
async def get_users():
    db = create_session()  # Creating session in route!
    return db.query(User).all()

# NEVER ignore lifespan cleanup
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.pool = create_pool()
    yield
    # Missing cleanup! Pool never closed

Key Decisions

Decision Recommendation
Lifespan Use asynccontextmanager (not events)
Dependencies Class-based services with DI
Settings Pydantic Settings with .env
Response ORJSONResponse for performance
Middleware Order: CORS → RequestID → Timing → Logging
Health Check all critical dependencies

Related Skills

  • clean-architecture - Service layer patterns
  • database-schema-designer - SQLAlchemy models
  • observability-monitoring - Logging and metrics

Capability Details

lifespan

Keywords: lifespan, startup, shutdown, asynccontextmanager Solves:

  • FastAPI startup/shutdown
  • Resource management in FastAPI

dependencies

Keywords: dependency injection, Depends, get_db, service dependency Solves:

  • FastAPI dependency injection patterns
  • Reusable dependencies

middleware

Keywords: middleware, request id, timing, cors, logging middleware Solves:

  • Custom FastAPI middleware
  • Request/response interceptors

settings

Keywords: settings, pydantic settings, env, configuration Solves:

  • FastAPI configuration management
  • Environment variables

health-checks

Keywords: health check, readiness, liveness, health endpoint Solves:

  • Kubernetes health checks
  • Service health monitoring