Claude Code Plugins

Community-maintained marketplace

Feedback

fastapi-development

@tachyon-beep/skillpacks
2
0

Use when building FastAPI applications, encountering async/sync issues, dependency injection questions, testing strategies, or production deployment patterns - covers FastAPI-specific patterns, anti-patterns, and production best practices

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name fastapi-development
description Use when building FastAPI applications, encountering async/sync issues, dependency injection questions, testing strategies, or production deployment patterns - covers FastAPI-specific patterns, anti-patterns, and production best practices

FastAPI Development

Overview

FastAPI specialist skill providing production-ready patterns, anti-patterns to avoid, and testing strategies.

Core principle: FastAPI's type hints, dependency injection, and async-first design enable fast, maintainable APIs - but require understanding async/sync boundaries, proper dependency management, and production hardening patterns.

When to Use This Skill

Use when encountering:

  • Dependency injection: Database connections, auth, shared resources, testing overrides
  • Async/sync boundaries: Mixing blocking I/O with async endpoints, performance issues
  • Background tasks: Choosing between BackgroundTasks, Celery, or other task queues
  • File uploads: Streaming large files, memory management
  • Testing: Dependency overrides, async test clients, fixture patterns
  • Production deployment: ASGI servers, lifespan management, connection pooling
  • Security: SQL injection, CORS, authentication patterns
  • Performance: Connection pooling, query optimization, caching

Quick Reference - Common Patterns

Pattern Use Case Code Snippet
DB dependency with pooling Per-request database access def get_db(): db = SessionLocal(); try: yield db; finally: db.close()
Dependency override for testing Test with mock/test DB app.dependency_overrides[get_db] = override_get_db
Lifespan events Startup/shutdown resources @asynccontextmanager async def lifespan(app): ... yield ...
Streaming file upload Large files without memory issues async with aiofiles.open(...) as f: while chunk := await file.read(CHUNK_SIZE): await f.write(chunk)
Background tasks (short) < 30 sec tasks background_tasks.add_task(func, args)
Task queue (long) > 1 min tasks, retries needed Use Celery/Arq with Redis
Parameterized queries Prevent SQL injection cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

Core Patterns

1. Dependency Injection Architecture

Pattern: Connection pooling with yield dependencies

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from fastapi import Depends, FastAPI

# One-time pool creation at module level
engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=20,          # Max connections
    max_overflow=0,        # No overflow beyond pool_size
    pool_pre_ping=True,    # Verify connection health before use
    pool_recycle=3600      # Recycle connections every hour
)
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)

# Dependency pattern with automatic cleanup
def get_db() -> Session:
    """
    Yields database session from pool.
    Ensures cleanup even if endpoint raises exception.
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Usage in endpoints
@app.get("/items/{item_id}")
def get_item(item_id: int, db: Session = Depends(get_db)):
    return db.query(Item).filter(Item.id == item_id).first()

Why this pattern:

  • Pool created once (expensive operation)
  • Per-request connections from pool (cheap)
  • yield ensures cleanup on success AND exceptions
  • pool_pre_ping prevents stale connection errors
  • pool_recycle prevents long-lived connection issues

Testing pattern:

# conftest.py
import pytest
from fastapi.testclient import TestClient

@pytest.fixture
def test_db():
    """Test database fixture"""
    db = TestSessionLocal()
    try:
        yield db
    finally:
        db.rollback()
        db.close()

@pytest.fixture
def client(test_db):
    """Test client with overridden dependencies"""
    def override_get_db():
        yield test_db

    app.dependency_overrides[get_db] = override_get_db
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()

# test_items.py
def test_get_item(client, test_db):
    # Setup test data
    test_db.add(Item(id=1, name="Test"))
    test_db.commit()

    # Test endpoint
    response = client.get("/items/1")
    assert response.status_code == 200

2. Async/Sync Boundary Management

❌ Anti-pattern: Blocking calls in async endpoints

# BAD - Blocks event loop
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    conn = psycopg2.connect(...)  # Blocking!
    cursor = conn.cursor()
    cursor.execute(...)           # Blocking!
    return cursor.fetchone()

✅ Pattern: Use async libraries or run_in_threadpool

# GOOD Option 1: Async database library
from databases import Database

database = Database("postgresql://...")

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    query = "SELECT * FROM users WHERE id = :user_id"
    return await database.fetch_one(query=query, values={"user_id": user_id})

# GOOD Option 2: Run blocking code in thread pool
from fastapi.concurrency import run_in_threadpool

def blocking_db_call(user_id: int):
    conn = psycopg2.connect(...)
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    return cursor.fetchone()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    return await run_in_threadpool(blocking_db_call, user_id)

Decision table:

Scenario Use
PostgreSQL with async needed asyncpg or databases library
PostgreSQL, sync is fine psycopg2 with def (not async def) endpoints
MySQL with async aiomysql
SQLite aiosqlite (async) or sync with def endpoints
External API calls httpx.AsyncClient
CPU-intensive work run_in_threadpool or Celery

3. Lifespan Management (Modern Pattern)

✅ Use lifespan context manager (replaces deprecated @app.on_event)

from contextlib import asynccontextmanager
from fastapi import FastAPI

# Global resources
resources = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    resources["db_pool"] = await create_async_pool(
        "postgresql://...",
        min_size=10,
        max_size=20
    )
    resources["redis"] = await aioredis.create_redis_pool("redis://...")
    resources["ml_model"] = load_ml_model()  # Can be sync or async

    yield  # Application runs

    # Shutdown
    await resources["db_pool"].close()
    resources["redis"].close()
    await resources["redis"].wait_closed()
    resources.clear()

app = FastAPI(lifespan=lifespan)

# Access resources in endpoints
@app.get("/predict")
async def predict(data: dict):
    model = resources["ml_model"]
    return {"prediction": model.predict(data)}

4. File Upload Patterns

For 100MB+ files: Stream to disk, never load into memory

from fastapi import UploadFile, File, HTTPException
import aiofiles
import os

UPLOAD_DIR = "/var/uploads"
CHUNK_SIZE = 1024 * 1024  # 1MB chunks
MAX_FILE_SIZE = 500 * 1024 * 1024  # 500MB

@app.post("/upload")
async def upload_large_file(file: UploadFile = File(...)):
    # Validate content type
    if not file.content_type.startswith("video/"):
        raise HTTPException(400, "Only video files accepted")

    filepath = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}")
    size = 0

    try:
        async with aiofiles.open(filepath, 'wb') as f:
            while chunk := await file.read(CHUNK_SIZE):
                size += len(chunk)
                if size > MAX_FILE_SIZE:
                    raise HTTPException(413, "File too large")
                await f.write(chunk)
    except Exception as e:
        # Cleanup on failure
        if os.path.exists(filepath):
            os.remove(filepath)
        raise

    return {"filename": file.filename, "size": size}

For very large files (1GB+): Direct S3 upload with presigned URLs

import boto3

@app.post("/upload/presigned-url")
async def get_presigned_upload_url(filename: str):
    s3_client = boto3.client('s3')
    presigned_post = s3_client.generate_presigned_post(
        Bucket='my-bucket',
        Key=f'uploads/{uuid.uuid4()}_{filename}',
        ExpiresIn=3600
    )
    return presigned_post  # Client uploads directly to S3

5. Background Task Decision Matrix

Task Duration Needs Retries? Needs Monitoring? Solution
< 30 seconds No No BackgroundTasks
< 30 seconds Yes Maybe Celery/Arq
> 1 minute Don't care Don't care Celery/Arq
Any Yes Yes Celery/Arq with monitoring

BackgroundTasks pattern (simple, in-process):

from fastapi import BackgroundTasks

async def send_email(email: str):
    await asyncio.sleep(2)  # Async work
    print(f"Email sent to {email}")

@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
    # ... save user ...
    background_tasks.add_task(send_email, email)
    return {"status": "registered"}  # Returns immediately

Celery pattern (distributed, persistent):

# celery_app.py
from celery import Celery

celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task(bind=True, max_retries=3)
def process_video(self, filepath: str):
    try:
        # Long-running work
        extract_frames(filepath)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

# main.py
from celery_app import process_video

@app.post("/upload")
async def upload(file: UploadFile):
    filepath = await save_file(file)
    task = process_video.delay(filepath)
    return {"task_id": task.id}

@app.get("/status/{task_id}")
async def get_status(task_id: str):
    from celery_app import celery_app
    result = celery_app.AsyncResult(task_id)
    return {"status": result.state, "result": result.result}

Security Patterns

SQL Injection Prevention

❌ NEVER use f-strings or string concatenation

# DANGEROUS
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
cursor.execute("SELECT * FROM users WHERE email = '" + email + "'")

✅ ALWAYS use parameterized queries

# SQLAlchemy ORM (safe)
db.query(User).filter(User.id == user_id).first()

# Raw SQL (safe with parameters)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
cursor.execute("SELECT * FROM users WHERE email = :email", {"email": email})

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],  # Specific origins, not "*" in production
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

Authentication Pattern

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        user_id = payload.get("sub")
        if not user_id:
            raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
        return await get_user_by_id(user_id)
    except jwt.JWTError:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")

@app.get("/protected")
async def protected_route(current_user = Depends(get_current_user)):
    return {"user": current_user}

Middleware Ordering

Critical: Middleware wraps in order added, executes in reverse for responses

# Correct order:
app.add_middleware(CORSMiddleware, ...)       # 1. FIRST - handles preflight
app.add_middleware(RequestLoggingMiddleware)  # 2. Logs entire request
app.add_middleware(ErrorHandlingMiddleware)   # 3. Catches errors from auth/routes
app.add_middleware(AuthenticationMiddleware)  # 4. LAST - closest to routes

Common Anti-Patterns

Anti-Pattern Why Bad Fix
Global database connection Not thread-safe, connection leaks Use connection pool with dependency injection
async def with blocking I/O Blocks event loop, kills performance Use async libraries or run_in_threadpool
time.sleep() in async code Blocks entire event loop Use asyncio.sleep()
Loading large files into memory Memory exhaustion, OOM crashes Stream with aiofiles and chunks
BackgroundTasks for long work Lost on restart, no retries Use Celery/Arq
String formatting in SQL SQL injection vulnerability Parameterized queries only
allow_origins=["*"] with credentials Security vulnerability Specify exact origins
Not closing database connections Connection pool exhaustion Use yield in dependencies

Testing Best Practices

import pytest
from fastapi.testclient import TestClient
from httpx import AsyncClient

# Sync tests (simpler, faster for most cases)
def test_read_item(client):
    response = client.get("/items/1")
    assert response.status_code == 200

# Async tests (needed for testing async endpoints with real async operations)
@pytest.mark.asyncio
async def test_async_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/items/1")
    assert response.status_code == 200

# Dependency override pattern
def test_with_mock_db(client):
    def override_get_db():
        yield mock_db

    app.dependency_overrides[get_db] = override_get_db
    response = client.get("/items/1")
    app.dependency_overrides.clear()
    assert response.status_code == 200

Production Deployment

ASGI server configuration (Uvicorn + Gunicorn):

# gunicorn with uvicorn workers (production)
gunicorn main:app \
  --workers 4 \
  --worker-class uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --graceful-timeout 30 \
  --keep-alive 5

Environment-based configuration:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    redis_url: str
    secret_key: str
    debug: bool = False

    class Config:
        env_file = ".env"

settings = Settings()

# Use in app
engine = create_engine(settings.database_url)

Cross-References

Related skills:

  • Securityordis-security-architect (threat modeling, OWASP top 10)
  • Python patternsaxiom-python-engineering (async patterns, type hints)
  • API testingapi-testing (contract testing, integration tests)
  • API documentationapi-documentation or muna-technical-writer
  • Database optimizationdatabase-integration (query optimization, migrations)
  • Authentication deep diveapi-authentication (OAuth2, JWT patterns)
  • GraphQL alternativegraphql-api-design

Performance Tips

  1. Use connection pooling - Create pool once, not per-request
  2. Enable response caching - Use fastapi-cache2 for expensive queries
  3. Limit response size - Paginate large result sets
  4. Use async for I/O - Database, HTTP calls, file operations
  5. Profile slow endpoints - Use starlette-prometheus for monitoring
  6. Enable gzip compression - GZipMiddleware for large JSON responses

When NOT to Use FastAPI

  • Simple CRUD with admin panel → Django (has built-in admin)
  • Heavy template rendering → Django or Flask
  • Mature ecosystem needed → Django (more third-party packages)
  • Team unfamiliar with async → Flask or Django (simpler mental model)

FastAPI excels at: Modern APIs, microservices, ML model serving, real-time features, high performance requirements.