NoSQL Databases
Production-grade NoSQL database patterns with MongoDB, Redis, Cassandra, and DynamoDB.
Quick Start
# MongoDB with PyMongo
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from datetime import datetime
client = MongoClient("mongodb://localhost:27017/")
db = client.analytics
events = db.events
# Create index for query performance
events.create_index([("user_id", 1), ("timestamp", -1)])
events.create_index([("event_type", 1)])
# Insert with retry pattern
def insert_event(event: dict, retries: int = 3):
event["_id"] = f"{event['user_id']}_{event['timestamp'].isoformat()}"
event["created_at"] = datetime.utcnow()
for attempt in range(retries):
try:
events.insert_one(event)
return True
except DuplicateKeyError:
return False # Already exists
except Exception as e:
if attempt == retries - 1:
raise
return False
# Aggregation pipeline
pipeline = [
{"$match": {"event_type": "purchase", "timestamp": {"$gte": datetime(2024, 1, 1)}}},
{"$group": {"_id": "$user_id", "total_purchases": {"$sum": "$amount"}, "count": {"$sum": 1}}},
{"$sort": {"total_purchases": -1}},
{"$limit": 100}
]
top_customers = list(events.aggregate(pipeline))
Core Concepts
1. Redis for Caching & Real-time
import redis
import json
from datetime import timedelta
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Cache pattern with TTL
def get_user_profile(user_id: str) -> dict:
cache_key = f"user:{user_id}:profile"
# Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - fetch from DB
profile = fetch_from_database(user_id)
# Set with 1 hour TTL
r.setex(cache_key, timedelta(hours=1), json.dumps(profile))
return profile
# Rate limiting
def check_rate_limit(user_id: str, limit: int = 100, window: int = 60) -> bool:
key = f"rate:{user_id}:{int(time.time()) // window}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit
# Real-time leaderboard with sorted sets
def update_leaderboard(user_id: str, score: float):
r.zadd("leaderboard:daily", {user_id: score})
def get_top_users(n: int = 10) -> list:
return r.zrevrange("leaderboard:daily", 0, n-1, withscores=True)
# Pub/Sub for event streaming
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))
def subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
yield json.loads(message['data'])
2. DynamoDB Patterns
import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Events')
# Single table design pattern
def put_event(event: dict):
item = {
'PK': f"USER#{event['user_id']}",
'SK': f"EVENT#{event['timestamp']}#{event['event_id']}",
'GSI1PK': f"TYPE#{event['event_type']}",
'GSI1SK': f"DATE#{event['timestamp'][:10]}",
'data': event
}
table.put_item(Item=item)
# Query by user
def get_user_events(user_id: str, limit: int = 100):
response = table.query(
KeyConditionExpression=Key('PK').eq(f"USER#{user_id}") & Key('SK').begins_with("EVENT#"),
ScanIndexForward=False,
Limit=limit
)
return response['Items']
# Query by event type (using GSI)
def get_events_by_type(event_type: str, date: str):
response = table.query(
IndexName='GSI1',
KeyConditionExpression=Key('GSI1PK').eq(f"TYPE#{event_type}") & Key('GSI1SK').eq(f"DATE#{date}")
)
return response['Items']
# Batch write with exponential backoff
def batch_write_events(events: list):
with table.batch_writer() as batch:
for event in events:
batch.put_item(Item=event)
3. Cassandra for Time Series
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from datetime import datetime
cluster = Cluster(['node1', 'node2', 'node3'])
session = cluster.connect('analytics')
# Create table with time-based partitioning
session.execute("""
CREATE TABLE IF NOT EXISTS events_by_day (
date date,
user_id uuid,
event_time timestamp,
event_type text,
data text,
PRIMARY KEY ((date), event_time, user_id)
) WITH CLUSTERING ORDER BY (event_time DESC)
""")
# Insert with prepared statement
insert_stmt = session.prepare("""
INSERT INTO events_by_day (date, user_id, event_time, event_type, data)
VALUES (?, ?, ?, ?, ?)
""")
def insert_event(event: dict):
session.execute(insert_stmt, [
event['timestamp'].date(),
event['user_id'],
event['timestamp'],
event['event_type'],
json.dumps(event['data'])
])
# Query by date range
def get_events_for_date(date: datetime.date):
rows = session.execute(
"SELECT * FROM events_by_day WHERE date = %s",
[date]
)
return list(rows)
Tools & Technologies
| Tool |
Purpose |
Version (2025) |
| MongoDB |
Document store |
7.0+ |
| Redis |
Cache, pub/sub |
7.2+ |
| Cassandra |
Time series, wide column |
5.0+ |
| DynamoDB |
Managed key-value |
Latest |
| Elasticsearch |
Search, analytics |
8.12+ |
| ScyllaDB |
High-perf Cassandra |
5.4+ |
Troubleshooting Guide
| Issue |
Symptoms |
Root Cause |
Fix |
| Hot Partition |
High latency on some keys |
Uneven partition key |
Redesign partition key |
| Memory Pressure |
Redis evictions, slow queries |
Data > memory |
Eviction policy, clustering |
| Query Timeout |
Slow reads in Cassandra |
Missing index, large partition |
Add index, limit partition size |
| Consistency Issues |
Stale reads |
Eventual consistency |
Use appropriate consistency level |
Best Practices
# ✅ DO: Design for access patterns (NoSQL)
# Primary key = partition key + sort key
# ✅ DO: Use connection pooling
pool = redis.ConnectionPool(max_connections=20)
r = redis.Redis(connection_pool=pool)
# ✅ DO: Set TTLs on cache data
r.setex(key, ttl_seconds, value)
# ✅ DO: Handle eventual consistency
# Read-your-writes with consistent reads where needed
# ❌ DON'T: Use NoSQL for complex joins
# ❌ DON'T: Store unbounded data in single document
# ❌ DON'T: Ignore partition sizing
Resources
Skill Certification Checklist: