| name | api-dev |
| description | Modern API development patterns for building high-performance, scalable web services. Expert in async/await patterns, REST/GraphQL APIs, middleware, error handling, rate limiting, OpenAPI documentation, testing, and production optimizations. Framework-agnostic patterns that work with Python, Node.js, Go, and other languages. |
| license | MIT |
API Development Patterns & Best Practices
This skill provides comprehensive patterns for building modern APIs in 2025, focusing on async/await patterns, performance optimization, security, testing, and production-ready configurations that work across different frameworks and languages.
When to Use This Skill
Use this skill when you need to:
- Design RESTful or GraphQL APIs
- Implement async/await patterns for high performance
- Add middleware for authentication, logging, and validation
- Handle errors gracefully with proper HTTP status codes
- Implement rate limiting and throttling
- Generate OpenAPI/Swagger documentation
- Set up comprehensive testing strategies
- Optimize API performance with caching and connection pooling
- Implement API versioning and backward compatibility
- Set up monitoring and observability
Core API Design Principles
1. Async/Await Patterns for Performance
# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time
@dataclass
class RequestMetrics:
"""Request metrics for monitoring"""
duration: float
status_code: int
endpoint: str
method: str
user_id: Optional[str] = None
def with_metrics(func):
"""Decorator to add request metrics"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status_code = 200
endpoint = func.__name__
method = kwargs.get('method', 'GET')
try:
result = await func(*args, **kwargs)
if hasattr(result, 'status_code'):
status_code = result.status_code
return result
except Exception as e:
status_code = getattr(e, 'status_code', 500)
raise
finally:
duration = time.time() - start_time
metrics = RequestMetrics(
duration=duration,
status_code=status_code,
endpoint=endpoint,
method=method
)
# Send metrics to monitoring system
await send_metrics(metrics)
return wrapper
async def send_metrics(metrics: RequestMetrics):
"""Send metrics to monitoring system"""
# Implementation depends on your monitoring system
# Example: Prometheus, Datadog, or custom analytics
pass
class AsyncAPIClient:
"""Generic async API client with connection pooling"""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
self._session_lock = asyncio.Lock()
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create session with connection pooling"""
if self._session is None or self._session.closed:
async with self._session_lock:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Connections per host
force_close=False,
enable_cleanup_closed=True
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
)
return self._session
@with_metrics
async def get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make GET request with retry logic"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.get(
url,
params=params,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
@with_metrics
async def post(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make POST request with retry logic"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.post(
url,
data=data,
json=json,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
async def close(self):
"""Close the session"""
if self._session:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
2. Circuit Breaker Pattern
# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for fault tolerance"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def __call__(self, func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self._reset()
return result
except self.expected_exception as e:
self._record_failure()
raise
return wrapper
def _should_attempt_reset(self) -> bool:
return time.time() - self.last_failure_time >= self.timeout
def _record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
# Usage example
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
"""External API call with circuit breaker"""
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
3. Rate Limiting with Redis
# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimiter:
"""Rate limiter using Redis sliding window"""
def __init__(
self,
redis_url: str,
requests_per_minute: int = 100,
window_size: int = 60
):
self.redis = None
self.redis_url = redis_url
self.requests_per_minute = requests_per_minute
self.window_size = window_size
async def initialize(self):
"""Initialize Redis connection"""
self.redis = await aioredis.from_url(self.redis_url)
async def is_allowed(
self,
key: str,
limit: Optional[int] = None,
window: Optional[int] = None
) -> bool:
"""Check if request is allowed"""
if not self.redis:
await self.initialize()
limit = limit or self.requests_per_minute
window = window or self.window_size
current_time = time.time()
# Remove old requests from the sliding window
await self.redis.zremrangebyscore(
key,
0,
current_time - window
)
# Count current requests in window
current_requests = await self.redis.zcard(key)
if current_requests >= limit:
return False
# Add current request to window
await self.redis.zadd(key, {str(current_time): current_time})
await self.redis.expire(key, window)
return True
class RateLimitMiddleware(BaseHTTPMiddleware):
"""FastAPI middleware for rate limiting"""
def __init__(
self,
app,
redis_url: str,
requests_per_minute: int = 100,
identifier_func=None
):
super().__init__(app)
self.limiter = RateLimiter(redis_url, requests_per_minute)
self.identifier_func = identifier_func or self._default_identifier
def _default_identifier(self, request: Request) -> str:
"""Default identifier function"""
# Use IP address as identifier
return request.client.host
async def dispatch(self, request: Request, call_next):
identifier = self.identifier_func(request)
key = f"rate_limit:{identifier}:{request.url.path}"
if not await self.limiter.is_allowed(key):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await call_next(request)
4. API Versioning Strategy
# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod
class APIVersion(str, Enum):
V1 = "v1"
V2 = "v2"
V3 = "v3"
class APIVersionStrategy(ABC):
"""Base class for API versioning strategies"""
@abstractmethod
def get_version_from_request(self, request) -> Optional[APIVersion]:
"""Extract version from request"""
pass
class HeaderVersionStrategy(APIVersionStrategy):
"""Version from header strategy"""
def __init__(self, header_name: str = "API-Version"):
self.header_name = header_name
def get_version_from_request(self, request) -> Optional[APIVersion]:
version = request.headers.get(self.header_name)
return APIVersion(version) if version in APIVersion.__members__ else None
class URLPathVersionStrategy(APIVersionStrategy):
"""Version from URL path strategy"""
def __init__(self, prefix: str = "/api"):
self.prefix = prefix
def get_version_from_request(self, request) -> Optional[APIVersion]:
path = request.url.path
if not path.startswith(self.prefix):
return None
version_part = path[len(self.prefix):].split("/")[1]
return APIVersion(version_part) if version_part in APIVersion.__members__ else None
class QueryParamVersionStrategy(APIVersionStrategy):
"""Version from query parameter strategy"""
def __init__(self, param_name: str = "version"):
self.param_name = param_name
def get_version_from_request(self, request) -> Optional[APIVersion]:
version = request.query_params.get(self.param_name)
return APIVersion(version) if version in APIVersion.__members__ else None
class CompositeVersionStrategy(APIVersionStrategy):
"""Composite version strategy that tries multiple strategies"""
def __init__(self, strategies: list[APIVersionStrategy]):
self.strategies = strategies
self.default_version = APIVersion.V1
def get_version_from_request(self, request) -> Optional[APIVersion]:
for strategy in self.strategies:
version = strategy.get_version_from_request(request)
if version:
return version
return self.default_version
# Version-specific handlers
class APIHandlerRegistry:
"""Registry for version-specific API handlers"""
def __init__(self):
self.handlers: Dict[APIVersion, Dict[str, Any]] = {
version: {} for version in APIVersion
}
def register_handler(
self,
version: APIVersion,
endpoint: str,
handler: Any
):
"""Register handler for specific version"""
if version not in self.handlers:
self.handlers[version] = {}
self.handlers[version][endpoint] = handler
def get_handler(self, version: APIVersion, endpoint: str) -> Optional[Any]:
"""Get handler for version and endpoint"""
return self.handlers.get(version, {}).get(endpoint)
# Usage example
version_strategy = CompositeVersionStrategy([
HeaderVersionStrategy(),
URLPathVersionStrategy(),
QueryParamVersionStrategy()
])
5. OpenAPI Documentation Enhancement
# patterns/openapi.py
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class ErrorSchema(BaseModel):
"""Standard error response schema"""
error: str = Field(..., description="Error type")
message: str = Field(..., description="Error message")
details: Optional[Dict[str, Any]] = Field(
None,
description="Additional error details"
)
timestamp: datetime = Field(
default_factory=datetime.utcnow,
description="Error timestamp"
)
class PaginationLinks(BaseModel):
"""Pagination links schema"""
first: Optional[str] = Field(None, description="First page link")
last: Optional[str] = Field(None, description="Last page link")
next: Optional[str] = Field(None, description="Next page link")
prev: Optional[str] = Field(None, description="Previous page link")
class PaginationMeta(BaseModel):
"""Pagination metadata schema"""
total: int = Field(..., description="Total number of items")
page: int = Field(..., description="Current page number")
per_page: int = Field(..., description="Items per page")
pages: int = Field(..., description="Total number of pages")
class PaginatedResponse(BaseModel):
"""Generic paginated response schema"""
data: List[Any] = Field(..., description="Response data")
meta: PaginationMeta = Field(..., description="Pagination metadata")
links: PaginationLinks = Field(..., description="Pagination links")
# Enhanced OpenAPI configuration
OPENAPI_CONFIG = {
"title": "Modern API",
"description": "A modern REST API with async patterns",
"version": "1.0.0",
"docs_url": "/docs",
"redoc_url": "/redoc",
"openapi_url": "/openapi.json",
"servers": [
{
"url": "https://api.example.com/v1",
"description": "Production server"
},
{
"url": "https://staging-api.example.com/v1",
"description": "Staging server"
}
],
"components": {
"schemas": {
"Error": ErrorSchema.model_json_schema(),
"PaginatedResponse": PaginatedResponse.model_json_schema()
},
"responses": {
"ValidationError": {
"description": "Validation error",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"UnauthorizedError": {
"description": "Unauthorized error",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"NotFoundError": {
"description": "Resource not found",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"RateLimitError": {
"description": "Rate limit exceeded",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
}
}
}
}
6. Testing Patterns for APIs
# patterns/testing.py
import pytest
import asyncio
from typing import AsyncGenerator
import httpx
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch
class AsyncAPITestCase:
"""Base class for async API tests"""
@pytest.fixture(scope="class")
async def async_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
"""Create async test client"""
async with httpx.AsyncClient(
app=self.app,
base_url="http://test"
) as client:
yield client
@pytest.fixture
def mock_external_service(self):
"""Mock external service"""
with patch("external_api_client.ExternalAPIClient") as mock:
client = mock.return_value
client.get.return_value = {"status": "ok"}
yield client
# Example test cases
@pytest.mark.asyncio
class TestUserAPI(AsyncAPITestCase):
"""Test user API endpoints"""
async def test_create_user(self, async_client: httpx.AsyncClient):
"""Test user creation"""
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "securepassword123"
}
response = await async_client.post(
"/api/users",
json=user_data
)
assert response.status_code == 201
data = response.json()
assert data["email"] == user_data["email"]
assert data["username"] == user_data["username"]
assert "password" not in data
async def test_get_user(self, async_client: httpx.AsyncClient):
"""Test get user"""
# First create a user
create_response = await async_client.post(
"/api/users",
json={
"email": "test2@example.com",
"username": "testuser2",
"password": "securepassword123"
}
)
user_id = create_response.json()["id"]
# Get user
response = await async_client.get(f"/api/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
async def test_list_users_with_pagination(
self,
async_client: httpx.AsyncClient
):
"""Test user listing with pagination"""
response = await async_client.get("/api/users?page=1&per_page=10")
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "meta" in data
assert "links" in data
assert data["meta"]["page"] == 1
assert data["meta"]["per_page"] == 10
async def test_rate_limiting(
self,
async_client: httpx.AsyncClient
):
"""Test rate limiting"""
# Make multiple requests quickly
responses = []
for _ in range(150): # Assuming rate limit is 100/minute
response = await async_client.get("/api/users")
responses.append(response)
# Check if rate limiting kicked in
rate_limited = any(
r.status_code == 429 for r in responses
)
assert rate_limited
# Integration tests
@pytest.mark.asyncio
async def test_full_user_flow():
"""Test complete user workflow"""
async with httpx.AsyncClient(
app=app,
base_url="http://test"
) as client:
# Create user
user_data = {
"email": "flowtest@example.com",
"username": "flowtest",
"password": "securepassword123"
}
create_resp = await client.post("/api/users", json=user_data)
assert create_resp.status_code == 201
user = create_resp.json()
user_id = user["id"]
# Get user
get_resp = await client.get(f"/api/users/{user_id}")
assert get_resp.status_code == 200
assert get_resp.json()["id"] == user_id
# Update user
update_data = {"username": "updateduser"}
update_resp = await client.patch(
f"/api/users/{user_id}",
json=update_data
)
assert update_resp.status_code == 200
assert update_resp.json()["username"] == "updateduser"
# Delete user
delete_resp = await client.delete(f"/api/users/{user_id}")
assert delete_resp.status_code == 204
# Verify user is deleted
get_resp = await client.get(f"/api/users/{user_id}")
assert get_resp.status_code == 404
7. Performance Optimization Patterns
# patterns/performance.py
import asyncio
import asyncio.cache
import aioredis
from typing import Any, Optional, Callable
from functools import wraps
import hashlib
import json
import pickle
class ResponseCache:
"""Response caching with Redis"""
def __init__(self, redis_url: str, default_ttl: int = 300):
self.redis = None
self.redis_url = redis_url
self.default_ttl = default_ttl
async def initialize(self):
"""Initialize Redis connection"""
self.redis = await aioredis.from_url(self.redis_url)
def _make_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate cache key from arguments"""
key_data = {
"args": args,
"kwargs": kwargs
}
key_str = json.dumps(key_data, sort_keys=True, default=str)
key_hash = hashlib.sha256(key_str.encode()).hexdigest()
return f"{prefix}:{key_hash}"
async def get(self, key: str) -> Optional[Any]:
"""Get cached value"""
if not self.redis:
await self.initialize()
cached = await self.redis.get(key)
if cached:
return pickle.loads(cached)
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
):
"""Set cached value"""
if not self.redis:
await self.initialize()
ttl = ttl or self.default_ttl
serialized = pickle.dumps(value)
await self.redis.setex(key, ttl, serialized)
async def delete(self, key: str):
"""Delete cached value"""
if self.redis:
await self.redis.delete(key)
def cache_response(
prefix: str,
ttl: Optional[int] = None,
cache: Optional[ResponseCache] = None
):
"""Decorator to cache response"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
cache_instance = cache or ResponseCache("redis://localhost")
# Generate cache key
cache_key = cache_instance._make_cache_key(prefix, *args, **kwargs)
# Try to get from cache
cached_result = await cache_instance.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache result
await cache_instance.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Usage example
@cache_response(prefix="user_data", ttl=300)
async def get_user_data(user_id: int) -> Dict[str, Any]:
"""Get user data with caching"""
# Expensive database operation or API call
await asyncio.sleep(1) # Simulate slow operation
return {"id": user_id, "name": "John Doe", "email": "john@example.com"}
# Batch processing for performance
class BatchProcessor:
"""Batch processor for optimizing multiple operations"""
def __init__(self, batch_size: int = 100, flush_interval: int = 5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = asyncio.Queue()
self.processing = False
async def add(self, item: Any):
"""Add item to batch queue"""
await self.queue.put(item)
if not self.processing:
self.processing = True
asyncio.create_task(self._process_batch())
async def _process_batch(self):
"""Process items in batches"""
while True:
batch = []
# Collect batch items
try:
deadline = asyncio.get_event_loop().time() + self.flush_interval
while len(batch) < self.batch_size:
try:
timeout = max(0, deadline - asyncio.get_event_loop().time())
item = await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
batch.append(item)
except asyncio.TimeoutError:
break
if batch:
await self._process_batch_items(batch)
except Exception as e:
print(f"Error processing batch: {e}")
continue
async def _process_batch_items(self, batch: list[Any]):
"""Process a batch of items"""
# Override in subclasses
# Example: batch database insert, batch API calls, etc.
print(f"Processing batch of {len(batch)} items")
8. Production Configuration Checklist
# api/production_checklist.yaml
performance:
async_patterns:
connection_pooling: true
max_connections: 100
connection_timeout: 30
keep_alive: true
caching:
redis_cache: true
default_ttl: 300
cache_headers: true
compression:
gzip: true
level: 6
threshold: 1024
rate_limiting:
enabled: true
default_limit: 100/minute
burst_limit: 200
sliding_window: true
security:
authentication:
jwt_validation: true
token_refresh: true
revoke_tokens: true
authorization:
rbac: true
rate_limit_by_role: true
validation:
input_validation: true
sql_injection_protection: true
xss_protection: true
headers:
security_headers: true
cors: true
csrf_protection: true
monitoring:
metrics:
prometheus: true
request_duration: true
error_rate: true
throughput: true
logging:
structured_logging: true
correlation_ids: true
error_tracking: true
health_checks:
liveness_probe: true
readiness_probe: true
dependency_checks: true
documentation:
openapi:
auto_generation: true
examples: true
schemas: true
versioning:
strategy: "header_path_query"
deprecation_warnings: true
backward_compatibility: true
testing:
unit_tests: true
integration_tests: true
performance_tests: true
contract_tests: true
This comprehensive API development skill provides modern patterns for building high-performance APIs in 2025, including async/await patterns, circuit breakers, rate limiting, versioning strategies, comprehensive testing, and production optimization techniques that work across different frameworks.