Claude Code Plugins

Community-maintained marketplace

Feedback

MongoDB, Redis, Cassandra, DynamoDB, and key-value store expertise

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name nosql-databases
description MongoDB, Redis, Cassandra, DynamoDB, and distributed database patterns for scalable applications
sasmp_version 1.3.0
bonded_agent 01-data-engineer
bond_type SECONDARY_BOND
skill_version 2.0.0
last_updated 2025-01
complexity intermediate
estimated_mastery_hours 110
prerequisites python-programming, sql-databases
unlocks big-data, cloud-platforms

NoSQL Databases

Production-grade NoSQL database patterns with MongoDB, Redis, Cassandra, and DynamoDB.

Quick Start

# MongoDB with PyMongo
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from datetime import datetime

client = MongoClient("mongodb://localhost:27017/")
db = client.analytics
events = db.events

# Create index for query performance
events.create_index([("user_id", 1), ("timestamp", -1)])
events.create_index([("event_type", 1)])

# Insert with retry pattern
def insert_event(event: dict, retries: int = 3):
    event["_id"] = f"{event['user_id']}_{event['timestamp'].isoformat()}"
    event["created_at"] = datetime.utcnow()

    for attempt in range(retries):
        try:
            events.insert_one(event)
            return True
        except DuplicateKeyError:
            return False  # Already exists
        except Exception as e:
            if attempt == retries - 1:
                raise
    return False

# Aggregation pipeline
pipeline = [
    {"$match": {"event_type": "purchase", "timestamp": {"$gte": datetime(2024, 1, 1)}}},
    {"$group": {"_id": "$user_id", "total_purchases": {"$sum": "$amount"}, "count": {"$sum": 1}}},
    {"$sort": {"total_purchases": -1}},
    {"$limit": 100}
]
top_customers = list(events.aggregate(pipeline))

Core Concepts

1. Redis for Caching & Real-time

import redis
import json
from datetime import timedelta

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Cache pattern with TTL
def get_user_profile(user_id: str) -> dict:
    cache_key = f"user:{user_id}:profile"

    # Try cache first
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache miss - fetch from DB
    profile = fetch_from_database(user_id)

    # Set with 1 hour TTL
    r.setex(cache_key, timedelta(hours=1), json.dumps(profile))
    return profile

# Rate limiting
def check_rate_limit(user_id: str, limit: int = 100, window: int = 60) -> bool:
    key = f"rate:{user_id}:{int(time.time()) // window}"
    current = r.incr(key)

    if current == 1:
        r.expire(key, window)

    return current <= limit

# Real-time leaderboard with sorted sets
def update_leaderboard(user_id: str, score: float):
    r.zadd("leaderboard:daily", {user_id: score})

def get_top_users(n: int = 10) -> list:
    return r.zrevrange("leaderboard:daily", 0, n-1, withscores=True)

# Pub/Sub for event streaming
def publish_event(channel: str, event: dict):
    r.publish(channel, json.dumps(event))

def subscribe_events(channel: str):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield json.loads(message['data'])

2. DynamoDB Patterns

import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Events')

# Single table design pattern
def put_event(event: dict):
    item = {
        'PK': f"USER#{event['user_id']}",
        'SK': f"EVENT#{event['timestamp']}#{event['event_id']}",
        'GSI1PK': f"TYPE#{event['event_type']}",
        'GSI1SK': f"DATE#{event['timestamp'][:10]}",
        'data': event
    }
    table.put_item(Item=item)

# Query by user
def get_user_events(user_id: str, limit: int = 100):
    response = table.query(
        KeyConditionExpression=Key('PK').eq(f"USER#{user_id}") & Key('SK').begins_with("EVENT#"),
        ScanIndexForward=False,
        Limit=limit
    )
    return response['Items']

# Query by event type (using GSI)
def get_events_by_type(event_type: str, date: str):
    response = table.query(
        IndexName='GSI1',
        KeyConditionExpression=Key('GSI1PK').eq(f"TYPE#{event_type}") & Key('GSI1SK').eq(f"DATE#{date}")
    )
    return response['Items']

# Batch write with exponential backoff
def batch_write_events(events: list):
    with table.batch_writer() as batch:
        for event in events:
            batch.put_item(Item=event)

3. Cassandra for Time Series

from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from datetime import datetime

cluster = Cluster(['node1', 'node2', 'node3'])
session = cluster.connect('analytics')

# Create table with time-based partitioning
session.execute("""
    CREATE TABLE IF NOT EXISTS events_by_day (
        date date,
        user_id uuid,
        event_time timestamp,
        event_type text,
        data text,
        PRIMARY KEY ((date), event_time, user_id)
    ) WITH CLUSTERING ORDER BY (event_time DESC)
""")

# Insert with prepared statement
insert_stmt = session.prepare("""
    INSERT INTO events_by_day (date, user_id, event_time, event_type, data)
    VALUES (?, ?, ?, ?, ?)
""")

def insert_event(event: dict):
    session.execute(insert_stmt, [
        event['timestamp'].date(),
        event['user_id'],
        event['timestamp'],
        event['event_type'],
        json.dumps(event['data'])
    ])

# Query by date range
def get_events_for_date(date: datetime.date):
    rows = session.execute(
        "SELECT * FROM events_by_day WHERE date = %s",
        [date]
    )
    return list(rows)

Tools & Technologies

Tool Purpose Version (2025)
MongoDB Document store 7.0+
Redis Cache, pub/sub 7.2+
Cassandra Time series, wide column 5.0+
DynamoDB Managed key-value Latest
Elasticsearch Search, analytics 8.12+
ScyllaDB High-perf Cassandra 5.4+

Troubleshooting Guide

Issue Symptoms Root Cause Fix
Hot Partition High latency on some keys Uneven partition key Redesign partition key
Memory Pressure Redis evictions, slow queries Data > memory Eviction policy, clustering
Query Timeout Slow reads in Cassandra Missing index, large partition Add index, limit partition size
Consistency Issues Stale reads Eventual consistency Use appropriate consistency level

Best Practices

# ✅ DO: Design for access patterns (NoSQL)
# Primary key = partition key + sort key

# ✅ DO: Use connection pooling
pool = redis.ConnectionPool(max_connections=20)
r = redis.Redis(connection_pool=pool)

# ✅ DO: Set TTLs on cache data
r.setex(key, ttl_seconds, value)

# ✅ DO: Handle eventual consistency
# Read-your-writes with consistent reads where needed

# ❌ DON'T: Use NoSQL for complex joins
# ❌ DON'T: Store unbounded data in single document
# ❌ DON'T: Ignore partition sizing

Resources


Skill Certification Checklist:

  • Can design document schemas for MongoDB
  • Can implement caching patterns with Redis
  • Can model time series data in Cassandra
  • Can use DynamoDB single-table design
  • Can choose appropriate consistency levels