| name | rate-limiting |
| description | API rate limiting with token bucket, sliding window, and Redis distributed patterns. Use when protecting APIs from abuse, implementing tiered limits, or scaling rate limiting across instances. |
| context | fork |
| agent | backend-system-architect |
| version | 1.0.0 |
| tags | rate-limiting, redis, token-bucket, fastapi, security, 2026 |
| hooks | [object Object] |
Rate Limiting Patterns
Protect APIs with distributed rate limiting using Redis and modern algorithms.
When to Use
- Protecting public APIs from abuse
- Implementing tiered rate limits (free/pro/enterprise)
- Scaling rate limiting across multiple instances
- Preventing brute force attacks on auth endpoints
- Managing third-party API consumption
Algorithm Selection
| Algorithm | Use Case | Burst Handling |
|---|---|---|
| Token Bucket | General API, allows bursts | Excellent |
| Sliding Window | Precise, no burst spikes | Good |
| Leaky Bucket | Steady rate, queue excess | None |
| Fixed Window | Simple, some edge issues | Moderate |
SlowAPI + Redis (FastAPI)
Basic Setup
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379",
strategy="moving-window", # sliding window
)
app = FastAPI()
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
Endpoint Limits
from slowapi import Limiter
@router.post("/api/v1/auth/login")
@limiter.limit("10/minute") # Strict for auth
async def login(request: Request, credentials: LoginRequest):
...
@router.get("/api/v1/analyses")
@limiter.limit("100/minute") # Normal for reads
async def list_analyses(request: Request):
...
@router.post("/api/v1/analyses")
@limiter.limit("20/minute") # Moderate for writes
async def create_analysis(request: Request, data: AnalysisCreate):
...
User-Based Limits
def get_user_identifier(request: Request) -> str:
"""Rate limit by user ID if authenticated, else IP."""
if hasattr(request.state, "user"):
return f"user:{request.state.user.id}"
return f"ip:{get_remote_address(request)}"
limiter = Limiter(key_func=get_user_identifier)
Token Bucket with Redis (Custom)
import redis.asyncio as redis
from datetime import datetime
class TokenBucketLimiter:
def __init__(
self,
redis_client: redis.Redis,
capacity: int = 100,
refill_rate: float = 10.0, # tokens per second
):
self.redis = redis_client
self.capacity = capacity
self.refill_rate = refill_rate
async def is_allowed(self, key: str, tokens: int = 1) -> bool:
"""Check if request is allowed, consume tokens atomically."""
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local current_tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
-- Calculate refill
local elapsed = now - last_update
local refill = elapsed * refill_rate
current_tokens = math.min(capacity, current_tokens + refill)
-- Check and consume
if current_tokens >= tokens_requested then
current_tokens = current_tokens - tokens_requested
redis.call('HMSET', key, 'tokens', current_tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
else
return 0
end
"""
now = datetime.utcnow().timestamp()
result = await self.redis.eval(
lua_script, 1, key,
self.capacity, self.refill_rate, tokens, now
)
return result == 1
Sliding Window Counter
class SlidingWindowLimiter:
def __init__(self, redis_client: redis.Redis, window_seconds: int = 60):
self.redis = redis_client
self.window = window_seconds
async def is_allowed(self, key: str, limit: int) -> tuple[bool, int]:
"""Returns (allowed, remaining)."""
now = datetime.utcnow().timestamp()
window_start = now - self.window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current window
pipe.zcard(key)
# Add this request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, self.window * 2)
results = await pipe.execute()
current_count = results[1]
if current_count < limit:
return True, limit - current_count - 1
return False, 0
Tiered Rate Limits
from enum import Enum
class UserTier(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {"requests": 100, "window": 3600}, # 100/hour
UserTier.PRO: {"requests": 1000, "window": 3600}, # 1000/hour
UserTier.ENTERPRISE: {"requests": 10000, "window": 3600}, # 10000/hour
}
async def get_rate_limit(user: User) -> str:
limits = TIER_LIMITS[user.tier]
return f"{limits['requests']}/{limits['window']}seconds"
@router.get("/api/v1/data")
@limiter.limit(get_rate_limit)
async def get_data(request: Request, user: User = Depends(get_current_user)):
...
Response Headers (RFC 6585)
from fastapi import Response
async def add_rate_limit_headers(
response: Response,
limit: int,
remaining: int,
reset_at: datetime,
):
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(int(reset_at.timestamp()))
response.headers["Retry-After"] = str(int((reset_at - datetime.utcnow()).seconds))
Error Response (429)
from fastapi import HTTPException
from fastapi.responses import JSONResponse
def rate_limit_exceeded_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=429,
content={
"type": "https://api.example.com/errors/rate-limit-exceeded",
"title": "Too Many Requests",
"status": 429,
"detail": "Rate limit exceeded. Please retry after the reset time.",
"instance": str(request.url),
},
headers={
"Retry-After": "60",
"X-RateLimit-Limit": "100",
"X-RateLimit-Remaining": "0",
}
)
Anti-Patterns (FORBIDDEN)
# NEVER use in-memory counters in distributed systems
request_counts = {} # Lost on restart, not shared across instances
# NEVER skip rate limiting on internal APIs (defense in depth)
@router.get("/internal/admin")
async def admin_endpoint(): # No rate limit = vulnerable
...
# NEVER use fixed window without considering edge spikes
# A user can hit 100 at 0:59 and 100 at 1:01 = 200 in 2 seconds
Key Decisions
| Decision | Recommendation |
|---|---|
| Storage | Redis (distributed, atomic) |
| Algorithm | Token bucket for most APIs |
| Key | User ID if auth, else IP + fingerprint |
| Auth endpoints | 10/min (strict) |
| Read endpoints | 100-1000/min (based on tier) |
| Write endpoints | 20-100/min (moderate) |
Related Skills
auth-patterns- Authentication integrationresilience-patterns- Circuit breakersobservability-monitoring- Rate limit metrics
Capability Details
token-bucket
Keywords: token bucket, rate limit, burst, capacity Solves:
- How do I implement token bucket rate limiting?
- Allow bursts while limiting rate
sliding-window
Keywords: sliding window, moving window, rate limit Solves:
- How to implement precise rate limiting?
- Avoid fixed window edge cases
slowapi-redis
Keywords: slowapi, fastapi rate limit, redis limiter Solves:
- How to add rate limiting to FastAPI?
- Distributed rate limiting
tiered-limits
Keywords: tiered, user tier, free pro enterprise Solves:
- Different rate limits per subscription tier
- User-based rate limiting