Claude Code Plugins

Community-maintained marketplace

Feedback
0
0

FastAPI patterns with layered architecture, typing, and dependency injection. Use when creating backend API endpoints.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name backend-api
description FastAPI patterns with layered architecture, typing, and dependency injection. Use when creating backend API endpoints.

Backend API Patterns

When to Use

  • Creating new API endpoints
  • Implementing REST APIs
  • Structuring service layers
  • Adding request validation
  • Setting up dependency injection

Pattern

Layered architecture (Endpoint → Service → Model) with full typing

Checklist

  • Endpoint → Service → Model separation
  • Define response_model for validation
  • Use dependency injection for db and auth
  • Request validation (Pydantic schemas)
  • Error handling (custom exceptions)
  • Type hint return values
  • Async where I/O-bound
  • Include docstrings

Examples

Basic CRUD Endpoint

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.schemas import ScanCreate, ScanResponse
from app.models import Scan
from sqlalchemy import select

router = APIRouter(prefix="/scans", tags=["scans"])

@router.get("", response_model=list[ScanResponse])
async def list_scans(
    skip: int = Query(0, ge=0, description="Number of records to skip"),
    limit: int = Query(100, le=1000, description="Maximum records to return"),
    db: AsyncSession = Depends(get_db)
) -> list[ScanResponse]:
    """List all scans with pagination."""
    result = await db.execute(
        select(Scan).offset(skip).limit(limit)
    )
    return result.scalars().all()

@router.get("/{scan_id}", response_model=ScanResponse)
async def get_scan(
    scan_id: int,
    db: AsyncSession = Depends(get_db)
) -> ScanResponse:
    """Get a specific scan by ID."""
    result = await db.execute(
        select(Scan).where(Scan.id == scan_id)
    )
    scan = result.scalar_one_or_none()
    
    if not scan:
        raise HTTPException(status_code=404, detail="Scan not found")
    
    return scan

@router.post("", response_model=ScanResponse, status_code=201)
async def create_scan(
    scan_data: ScanCreate,
    db: AsyncSession = Depends(get_db)
) -> ScanResponse:
    """Create a new network scan."""
    scan = Scan(**scan_data.dict())
    db.add(scan)
    await db.commit()
    await db.refresh(scan)
    return scan

Layered Architecture with Service

# Endpoint Layer
from app.services.scan_service import ScanService

@router.post("", response_model=ScanResponse)
async def create_scan(
    scan_data: ScanCreate,
    service: ScanService = Depends()
) -> ScanResponse:
    """Create a new network scan."""
    return await service.create_scan(scan_data)

# Service Layer
from sqlalchemy.ext.asyncio import AsyncSession

class ScanService:
    def __init__(self, db: AsyncSession = Depends(get_db)):
        self.db = db
    
    async def create_scan(self, scan_data: ScanCreate) -> Scan:
        """Business logic for creating a scan."""
        # Validate target
        if not self._is_valid_target(scan_data.target):
            raise ValidationError("Invalid target")
        
        # Create model
        scan = Scan(**scan_data.dict())
        self.db.add(scan)
        await self.db.commit()
        await self.db.refresh(scan)
        
        # Trigger background task
        await self._start_scan_task(scan.id)
        
        return scan
    
    def _is_valid_target(self, target: str) -> bool:
        """Validate target IP or network."""
        import ipaddress
        try:
            ipaddress.ip_network(target, strict=False)
            return True
        except ValueError:
            return False

# Model Layer
from sqlalchemy import Column, Integer, String, DateTime
from app.database import Base

class Scan(Base):
    __tablename__ = "scans"
    
    id = Column(Integer, primary_key=True)
    target = Column(String, nullable=False)
    status = Column(String, default="pending")
    created_at = Column(DateTime, server_default=func.now())

Custom Validators

from pydantic import BaseModel, validator, Field

class ScanCreate(BaseModel):
    target: str = Field(..., description="Target IP or network")
    ports: str = Field("1-1000", description="Port range")
    scan_type: str = Field("tcp", description="Scan type")
    
    @validator('target')
    def validate_target(cls, v):
        """Validate target is valid IP or CIDR."""
        import ipaddress
        try:
            ipaddress.ip_network(v, strict=False)
        except ValueError:
            raise ValueError('Invalid IP address or network')
        return v
    
    @validator('ports')
    def validate_ports(cls, v):
        """Validate port range format."""
        import re
        if not re.match(r'^\d+-\d+$', v):
            raise ValueError('Invalid port range format (use: 1-1000)')
        start, end = map(int, v.split('-'))
        if start > end or start < 1 or end > 65535:
            raise ValueError('Invalid port range')
        return v

Background Tasks

from fastapi import BackgroundTasks

async def run_scan_task(scan_id: int, db: AsyncSession):
    """Background task to run the scan."""
    result = await db.execute(
        select(Scan).where(Scan.id == scan_id)
    )
    scan = result.scalar_one()
    scan.status = "running"
    await db.commit()
    
    try:
        results = await perform_scan(scan.target, scan.ports)
        scan.status = "completed"
        scan.results = results
    except Exception as e:
        scan.status = "failed"
        scan.error = str(e)
    
    await db.commit()

@router.post("/{scan_id}/start", response_model=ScanResponse)
async def start_scan(
    scan_id: int,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db)
) -> ScanResponse:
    """Start a scan in the background."""
    result = await db.execute(
        select(Scan).where(Scan.id == scan_id)
    )
    scan = result.scalar_one_or_none()
    
    if not scan:
        raise HTTPException(status_code=404, detail="Scan not found")
    
    background_tasks.add_task(run_scan_task, scan_id, db)
    
    scan.status = "queued"
    await db.commit()
    await db.refresh(scan)
    return scan

WebSocket Streaming

from fastapi import WebSocket, WebSocketDisconnect

@router.websocket("/ws/traffic")
async def traffic_stream(websocket: WebSocket):
    """Stream network traffic in real-time."""
    await websocket.accept()
    
    try:
        while True:
            # Get traffic data
            packet = await get_next_packet()
            
            # Send to client
            await websocket.send_json({
                "timestamp": packet.timestamp,
                "src": packet.src_ip,
                "dst": packet.dst_ip,
                "protocol": packet.protocol
            })
            
    except WebSocketDisconnect:
        logger.info("Client disconnected")
    finally:
        await cleanup_connection()

File Upload

from fastapi import File, UploadFile
import aiofiles

@router.post("/{scan_id}/evidence", response_model=dict)
async def upload_evidence(
    scan_id: int,
    file: UploadFile = File(...),
    db: AsyncSession = Depends(get_db)
) -> dict:
    """Upload evidence file for a scan."""
    # Validate file type
    allowed_types = ['image/png', 'image/jpeg', 'application/pdf']
    if file.content_type not in allowed_types:
        raise HTTPException(
            status_code=400,
            detail=f"File type not allowed. Allowed: {allowed_types}"
        )
    
    # Validate file size (10MB max)
    max_size = 10 * 1024 * 1024
    contents = await file.read()
    if len(contents) > max_size:
        raise HTTPException(
            status_code=400,
            detail="File too large. Max 10MB"
        )
    
    # Save file
    file_path = f"volumes/evidence/{scan_id}_{file.filename}"
    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(contents)
    
    return {
        "filename": file.filename,
        "path": file_path,
        "size": len(contents)
    }

Common Patterns

Dependency Injection

# Database
async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

# Service
class ScanService:
    def __init__(self, db: AsyncSession = Depends(get_db)):
        self.db = db

# In endpoint
@router.get("/")
async def handler(service: ScanService = Depends()):
    return await service.get_all()

Error Responses

from app.exceptions import NotFoundError, ValidationError

@router.get("/{id}")
async def get_item(id: int, db: AsyncSession = Depends(get_db)):
    try:
        result = await db.execute(
            select(Item).where(Item.id == id)
        )
        item = result.scalar_one_or_none()
        
        if not item:
            raise NotFoundError(f"Item {id} not found")
        
        return item
        
    except NotFoundError:
        raise
    except Exception as e:
        logger.error(f"Error fetching item {id}: {e}")
        raise HTTPException(
            status_code=500,
            detail="Internal server error"
        )