| name | recommendation-system |
| description | Deploy production recommendation systems with feature stores, caching, A/B testing. Use for personalization APIs, low latency serving, or encountering cache invalidation, experiment tracking, quality monitoring issues. |
| keywords | recommendation system, personalization, feature store, model serving, caching strategy, Redis, A/B testing, Thompson sampling, recommendation metrics, CTR, conversion rate, catalog coverage, diversity, Prometheus monitoring, recommendation API, real-time recommendations, collaborative filtering integration, production recommendations, experiment tracking |
| license | MIT |
Recommendation System
Production-ready architecture for scalable recommendation systems with feature stores, multi-tier caching, A/B testing, and comprehensive monitoring.
When to Use This Skill
Load this skill when:
- Building Recommendation APIs: Serving personalized recommendations at scale
- Implementing Caching: Multi-tier caching for sub-millisecond latency
- Running A/B Tests: Experimenting with recommendation algorithms
- Monitoring Quality: Tracking CTR, conversion, diversity, coverage
- Optimizing Performance: Reducing latency, increasing throughput
- Feature Engineering: Managing user/item features with feature stores
Quick Start: Recommendation API in 5 Steps
# 1. Install dependencies
pip install fastapi==0.109.0 redis==5.0.0 prometheus-client==0.19.0
# 2. Start Redis (for caching and feature store)
docker run -d -p 6379:6379 redis:alpine
# 3. Create recommendation service: app.py
cat > app.py << 'EOF'
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import redis
import json
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, decode_responses=True)
class RecommendationResponse(BaseModel):
user_id: str
items: List[str]
cached: bool
@app.post("/recommendations", response_model=RecommendationResponse)
async def get_recommendations(user_id: str, n: int = 10):
# Check cache
cache_key = f"recs:{user_id}:{n}"
cached = cache.get(cache_key)
if cached:
return RecommendationResponse(
user_id=user_id,
items=json.loads(cached),
cached=True
)
# Generate recommendations (simplified)
items = [f"item_{i}" for i in range(n)]
# Cache for 5 minutes
cache.setex(cache_key, 300, json.dumps(items))
return RecommendationResponse(
user_id=user_id,
items=items,
cached=False
)
@app.get("/health")
async def health():
return {"status": "healthy"}
EOF
# 4. Run API
uvicorn app:app --host 0.0.0.0 --port 8000
# 5. Test
curl -X POST "http://localhost:8000/recommendations?user_id=user_123&n=10"
Result: Working recommendation API with caching in under 5 minutes.
System Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User Events │────▶│ Feature │────▶│ Model │
│ (clicks, │ │ Store │ │ Serving │
│ purchases) │ │ (Redis) │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Training │ │ API │
│ Pipeline │ │ (FastAPI) │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Monitoring │
│ (Prometheus)│
└─────────────┘
Core Components
1. Feature Store
Centralized storage for user and item features:
import redis
import json
class FeatureStore:
"""Fast feature access with Redis caching."""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def get_user_features(self, user_id: str) -> dict:
cache_key = f"user_features:{user_id}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
features = fetch_from_db(user_id)
# Cache
self.redis.setex(cache_key, self.ttl, json.dumps(features))
return features
2. Model Serving
Serve multiple models for A/B testing:
class ModelServing:
"""Serve multiple recommendation models."""
def __init__(self):
self.models = {}
def register_model(self, name: str, model, is_default: bool = False):
self.models[name] = model
if is_default:
self.default_model = name
def predict(self, user_features: dict, item_features: list, model_name: str = None):
model = self.models.get(model_name or self.default_model)
return model.predict(user_features, item_features)
3. Caching Layer
Multi-tier caching for low latency:
class TieredCache:
"""L1 (memory) -> L2 (Redis) -> L3 (database)."""
def __init__(self, redis_client):
self.l1_cache = {} # In-memory
self.redis = redis_client # L2
def get(self, key: str):
# L1: In-memory (fastest)
if key in self.l1_cache:
return self.l1_cache[key]
# L2: Redis
cached = self.redis.get(key)
if cached:
value = json.loads(cached)
self.l1_cache[key] = value # Promote to L1
return value
# L3: Miss (fetch from database)
return None
Key Metrics
| Metric | Description | Target |
|---|---|---|
| CTR | Click-through rate | >5% |
| Conversion Rate | Purchases from recs | >2% |
| P95 Latency | 95th percentile response time | <200ms |
| Cache Hit Rate | % served from cache | >80% |
| Coverage | % of catalog recommended | >50% |
| Diversity | Variety in recommendations | >0.7 |
Known Issues Prevention
1. Cold Start for New Users
Problem: No recommendations for users without history, poor initial experience.
Solution: Use popularity-based fallback:
def get_recommendations(user_id: str, n: int = 10):
user_features = feature_store.get_user_features(user_id)
# Check if new user (no purchase history)
if user_features.get('total_purchases', 0) == 0:
# Fallback to popular items
return get_popular_items(n)
# Personalized recommendations
return generate_personalized_recs(user_id, n)
2. Cache Invalidation on User Actions
Problem: User makes purchase, cache still shows purchased item in recommendations.
Solution: Invalidate cache on relevant actions:
INVALIDATING_ACTIONS = {'purchase', 'rating', 'add_to_cart'}
def on_user_action(user_id: str, action: str):
if action in INVALIDATING_ACTIONS:
cache_key = f"recs:{user_id}:*"
redis_client.delete(cache_key)
logger.info(f"Invalidated cache for {user_id} due to {action}")
3. Thundering Herd on Cache Expiry
Problem: Many users' caches expire simultaneously, overload database/model.
Solution: Add random jitter to TTL:
import random
def set_cache(key: str, value: dict, base_ttl: int = 300):
# Add ±10% jitter
jitter = random.uniform(-0.1, 0.1) * base_ttl
ttl = int(base_ttl + jitter)
redis_client.setex(key, ttl, json.dumps(value))
4. Poor Diversity = Filter Bubble
Problem: Recommendations too similar, users only see same category.
Solution: Implement diversity constraint:
def rank_with_diversity(items: list, scores: list, n: int = 10):
selected = []
category_counts = {}
for item, score in sorted(zip(items, scores), key=lambda x: -x[1]):
category = item['category']
# Limit 3 items per category
if category_counts.get(category, 0) >= 3:
continue
selected.append(item)
category_counts[category] = category_counts.get(category, 0) + 1
if len(selected) >= n:
break
return selected
5. No Monitoring = Silent Degradation
Problem: Recommendation quality drops, nobody notices until users complain.
Solution: Continuous monitoring with alerts:
from prometheus_client import Counter, Histogram
recommendation_clicks = Counter('recommendation_clicks_total')
recommendation_latency = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
start = time.time()
recs = generate_recs(user_id)
latency = time.time() - start
recommendation_latency.observe(latency)
return recs
@app.post("/track/click")
async def track_click(user_id: str, item_id: str):
recommendation_clicks.inc()
# Alert if CTR drops below 3%
6. Stale Features = Outdated Recommendations
Problem: User preferences change but features don't update, recommendations irrelevant.
Solution: Set appropriate TTLs and update triggers:
class FeatureStore:
def __init__(self, redis_client):
self.redis = redis_client
# Shorter TTL for frequently changing features
self.user_ttl = 300 # 5 minutes
self.item_ttl = 3600 # 1 hour
def update_on_event(self, user_id: str, event: str):
# Invalidate on important events
if event in ['purchase', 'rating']:
self.redis.delete(f"user_features:{user_id}")
logger.info(f"Refreshed features for {user_id}")
7. A/B Test Sample Size Too Small
Problem: Declare winner too early, results not statistically significant.
Solution: Calculate required sample size first:
def calculate_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8
) -> int:
"""Calculate required sample size per variant."""
from scipy import stats
z_alpha = stats.norm.ppf(1 - alpha/2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)
p_avg = (p1 + p2) / 2
n = (
(z_alpha + z_beta)**2 * 2 * p_avg * (1 - p_avg) /
(p2 - p1)**2
)
return int(n)
# Example: detect 10% lift with baseline CTR=5%
n_required = calculate_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10
)
print(f"Required sample size: {n_required} per variant")
# Wait until both variants reach this size before concluding
When to Load References
Load reference files for detailed production implementations:
Production Architecture: Load
references/production-architecture.mdfor complete FeatureStore, ModelServing, and RecommendationService implementations with batch fetching, caching integration, and FastAPI deployment patterns.Caching Strategies: Load
references/caching-strategies.mdwhen implementing multi-tier caching (L1/L2/L3), cache warming, invalidation strategies, probabilistic refresh, or thundering herd prevention.A/B Testing Framework: Load
references/ab-testing-framework.mdfor deterministic variant assignment, Thompson sampling (multi-armed bandits), Bayesian and frequentist significance testing, and experiment tracking.Monitoring & Alerting: Load
references/monitoring-alerting.mdfor Prometheus metrics integration, dashboard endpoints, alert rules, and quality monitoring (diversity, coverage).
Best Practices
- Feature Precomputation: Compute features offline, serve from cache
- Batch Fetching: Use Redis MGET for multiple users/items
- Cache Aggressively: 5-15 minute TTL for user recommendations
- Fail Gracefully: Return popular items if personalization fails
- Monitor Everything: Track CTR, latency, diversity, coverage
- A/B Test Continuously: Always be experimenting with new algorithms
- Diversity Constraint: Ensure varied recommendations
- Explain Recommendations: Provide reasons ("Highly rated", "Popular")
Common Patterns
Recommendation Service
class RecommendationService:
def __init__(self, feature_store, model_serving, cache):
self.feature_store = feature_store
self.model_serving = model_serving
self.cache = cache
def get_recommendations(self, user_id: str, n: int = 10):
# 1. Check cache
cached = self.cache.get(f"recs:{user_id}:{n}")
if cached:
return cached
# 2. Get features
user_features = self.feature_store.get_user_features(user_id)
candidates = self.get_candidates(user_id)
# 3. Score candidates
scores = self.model_serving.predict(user_features, candidates)
# 4. Rank with diversity
recommendations = self.rank_with_diversity(candidates, scores, n)
# 5. Cache
self.cache.set(f"recs:{user_id}:{n}", recommendations, ttl=300)
return recommendations
A/B Testing
def assign_variant(user_id: str, experiment_id: str) -> str:
"""Deterministic assignment - same user always gets same variant."""
import hashlib
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
# 50/50 split
return 'control' if hash_value % 2 == 0 else 'treatment'
# Usage
variant = assign_variant('user_123', 'rec_algo_v2')
model_name = 'main' if variant == 'control' else 'experimental'
recs = get_recommendations(user_id, model_name=model_name)
Monitoring
from prometheus_client import Counter, Histogram
requests_total = Counter('recommendation_requests_total', ['status'])
latency_seconds = Histogram('recommendation_latency_seconds')
@app.post("/recommendations")
async def get_recommendations(user_id: str):
with latency_seconds.time():
try:
recs = generate_recs(user_id)
requests_total.labels(status='success').inc()
return recs
except Exception as e:
requests_total.labels(status='error').inc()
raise