Claude Code Plugins

Community-maintained marketplace

Feedback

redis-state-management

@manutej/luxor-claude-marketplace
7
0

Comprehensive guide for Redis state management including caching strategies, session management, pub/sub patterns, distributed locks, and data structures

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name redis-state-management
description Comprehensive guide for Redis state management including caching strategies, session management, pub/sub patterns, distributed locks, and data structures
tags redis, state-management, caching, pub-sub, distributed-systems, sessions
tier tier-1

Redis State Management

A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.

When to Use This Skill

Use this skill when:

  • Implementing high-performance caching layers for web applications
  • Managing user sessions in distributed environments
  • Building real-time messaging and event distribution systems
  • Coordinating distributed processes with locks and synchronization
  • Storing and querying structured data with Redis data structures
  • Optimizing application performance with Redis
  • Scaling applications horizontally with shared state
  • Implementing rate limiting, counters, and analytics
  • Building microservices with Redis as a communication layer
  • Managing temporary data with automatic expiration (TTL)
  • Implementing leaderboards, queues, and real-time features

Core Concepts

Redis Fundamentals

Redis (Remote Dictionary Server) is an in-memory data structure store used as:

  • Database: Persistent key-value storage
  • Cache: High-speed data layer
  • Message Broker: Pub/sub and stream messaging
  • Session Store: Distributed session management

Key Characteristics:

  • In-memory storage (microsecond latency)
  • Optional persistence (RDB snapshots, AOF logs)
  • Rich data structures beyond key-value
  • Atomic operations on complex data types
  • Built-in replication and clustering
  • Pub/sub messaging support
  • Lua scripting for complex operations
  • Pipelining for batch operations

Redis Data Structures

Redis provides multiple data types for different use cases:

  1. Strings: Simple key-value pairs, binary safe

    • Use for: Cache values, counters, flags, JSON objects
    • Max size: 512 MB
    • Commands: SET, GET, INCR, APPEND
  2. Hashes: Field-value maps (objects)

    • Use for: User profiles, configuration objects, small entities
    • Efficient for storing objects with multiple fields
    • Commands: HSET, HGET, HMGET, HINCRBY
  3. Lists: Ordered collections (linked lists)

    • Use for: Queues, activity feeds, recent items
    • Operations at head/tail are O(1)
    • Commands: LPUSH, RPUSH, LPOP, RPOP, LRANGE
  4. Sets: Unordered unique collections

    • Use for: Tags, unique visitors, relationships
    • Set operations: union, intersection, difference
    • Commands: SADD, SMEMBERS, SISMEMBER, SINTER
  5. Sorted Sets: Ordered sets with scores

    • Use for: Leaderboards, time-series, priority queues
    • Range queries by score or rank
    • Commands: ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
  6. Streams: Append-only logs with consumer groups

    • Use for: Event sourcing, activity logs, message queues
    • Built-in consumer group support
    • Commands: XADD, XREAD, XREADGROUP

Connection Management

Connection Pools: Redis connections are expensive to create. Always use connection pools:

import redis

# Connection pool (recommended)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)

# Direct connection (avoid in production)
r = redis.Redis(host='localhost', port=6379, db=0)

Best Practices:

  • Use connection pools for all applications
  • Set appropriate max_connections based on workload
  • Enable decode_responses=True for string data
  • Configure socket_timeout and socket_keepalive
  • Handle connection errors with retries

Data Persistence

Redis offers two persistence mechanisms:

RDB (Redis Database): Point-in-time snapshots

  • Compact binary format
  • Fast restart times
  • Lower disk I/O
  • Potential data loss between snapshots

AOF (Append-Only File): Log of write operations

  • Better durability (fsync policies)
  • Larger files, slower restarts
  • Can be automatically rewritten/compacted
  • Minimal data loss potential

Hybrid Approach: RDB + AOF for best of both worlds

RESP 3 Protocol

Redis Serialization Protocol version 3 offers:

  • Client-side caching support
  • Better data type support
  • Push notifications
  • Performance improvements
import redis
from redis.cache import CacheConfig

# Enable RESP3 with client-side caching
r = redis.Redis(host='localhost', port=6379, protocol=3,
                cache_config=CacheConfig())

Caching Strategies

Cache-Aside (Lazy Loading)

Pattern: Application checks cache first, loads from database on miss

import redis
import json
from typing import Optional, Dict, Any

r = redis.Redis(decode_responses=True)

def get_user(user_id: int) -> Optional[Dict[str, Any]]:
    """Cache-aside pattern for user data."""
    cache_key = f"user:{user_id}"

    # Try cache first
    cached_data = r.get(cache_key)
    if cached_data:
        return json.loads(cached_data)

    # Cache miss - load from database
    user_data = database.get_user(user_id)  # Your DB query
    if user_data:
        # Store in cache with 1 hour TTL
        r.setex(cache_key, 3600, json.dumps(user_data))

    return user_data

Advantages:

  • Only requested data is cached (efficient memory usage)
  • Cache failures don't break the application
  • Simple to implement

Disadvantages:

  • Cache miss penalty (latency spike)
  • Thundering herd on popular items
  • Stale data until cache expiration

Write-Through Cache

Pattern: Write to cache and database simultaneously

def update_user(user_id: int, user_data: Dict[str, Any]) -> bool:
    """Write-through pattern for user updates."""
    cache_key = f"user:{user_id}"

    # Write to database first
    success = database.update_user(user_id, user_data)

    if success:
        # Update cache immediately
        r.setex(cache_key, 3600, json.dumps(user_data))

    return success

Advantages:

  • Cache always consistent with database
  • No read penalty for recently written data

Disadvantages:

  • Write latency increases
  • Unused data may be cached
  • Extra cache write overhead

Write-Behind (Write-Back) Cache

Pattern: Write to cache immediately, sync to database asynchronously

import redis
import json
from queue import Queue
from threading import Thread

r = redis.Redis(decode_responses=True)
write_queue = Queue()

def async_writer():
    """Background worker to sync cache to database."""
    while True:
        user_id, user_data = write_queue.get()
        try:
            database.update_user(user_id, user_data)
        except Exception as e:
            # Log error, potentially retry
            print(f"Failed to write user {user_id}: {e}")
        finally:
            write_queue.task_done()

# Start background writer
Thread(target=async_writer, daemon=True).start()

def update_user_fast(user_id: int, user_data: Dict[str, Any]):
    """Write-behind pattern for fast writes."""
    cache_key = f"user:{user_id}"

    # Write to cache immediately (fast)
    r.setex(cache_key, 3600, json.dumps(user_data))

    # Queue database write (async)
    write_queue.put((user_id, user_data))

Advantages:

  • Minimal write latency
  • Can batch database writes
  • Handles write spikes

Disadvantages:

  • Risk of data loss if cache fails
  • Complex error handling
  • Consistency challenges

Cache Invalidation Strategies

Time-based Expiration (TTL):

# Set key with expiration
r.setex("session:abc123", 1800, session_data)  # 30 minutes

# Or set TTL on existing key
r.expire("user:profile:123", 3600)  # 1 hour

# Check remaining TTL
ttl = r.ttl("user:profile:123")

Event-based Invalidation:

def update_product(product_id: int, product_data: dict):
    """Invalidate cache on update."""
    # Update database
    database.update_product(product_id, product_data)

    # Invalidate related caches
    r.delete(f"product:{product_id}")
    r.delete(f"product_list:category:{product_data['category']}")
    r.delete("products:featured")

Pattern-based Invalidation:

# Delete all keys matching pattern
def invalidate_user_cache(user_id: int):
    """Invalidate all cache entries for a user."""
    pattern = f"user:{user_id}:*"

    # Find and delete matching keys
    for key in r.scan_iter(match=pattern, count=100):
        r.delete(key)

Cache Stampede Prevention

Problem: Multiple requests simultaneously miss cache and query database

Solution 1: Probabilistic Early Expiration

import time
import random

def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0):
    """Prevent stampede with probabilistic early recomputation."""
    value = r.get(key)

    if value is None:
        # Cache miss - compute and cache
        value = compute_value(key)
        r.setex(key, ttl, value)
        return value

    # Check if we should recompute early
    current_time = time.time()
    delta = current_time - float(r.get(f"{key}:timestamp") or 0)
    expiry = ttl * random.random() * beta

    if delta > expiry:
        # Recompute in background
        value = compute_value(key)
        r.setex(key, ttl, value)
        r.set(f"{key}:timestamp", current_time)

    return value

Solution 2: Locking

from contextlib import contextmanager

@contextmanager
def cache_lock(key: str, timeout: int = 10):
    """Acquire lock for cache computation."""
    lock_key = f"{key}:lock"
    identifier = str(time.time())

    # Try to acquire lock
    if r.set(lock_key, identifier, nx=True, ex=timeout):
        try:
            yield True
        finally:
            # Release lock
            if r.get(lock_key) == identifier:
                r.delete(lock_key)
    else:
        yield False

def get_with_lock(key: str):
    """Use lock to prevent stampede."""
    value = r.get(key)

    if value is None:
        with cache_lock(key) as acquired:
            if acquired:
                # We got the lock - compute value
                value = compute_value(key)
                r.setex(key, 3600, value)
            else:
                # Someone else is computing - wait and retry
                time.sleep(0.1)
                value = r.get(key) or compute_value(key)

    return value

Session Management

Distributed Session Storage

Basic Session Management:

import redis
import json
import uuid
from datetime import datetime, timedelta

r = redis.Redis(decode_responses=True)

class SessionManager:
    def __init__(self, ttl: int = 1800):
        """Session manager with Redis backend.

        Args:
            ttl: Session timeout in seconds (default 30 minutes)
        """
        self.ttl = ttl

    def create_session(self, user_id: int, data: dict = None) -> str:
        """Create new session and return session ID."""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "data": data or {}
        }

        r.setex(session_key, self.ttl, json.dumps(session_data))
        return session_id

    def get_session(self, session_id: str) -> dict:
        """Retrieve session data and refresh TTL."""
        session_key = f"session:{session_id}"
        session_data = r.get(session_key)

        if session_data:
            # Refresh TTL on access (sliding expiration)
            r.expire(session_key, self.ttl)
            return json.loads(session_data)

        return None

    def update_session(self, session_id: str, data: dict) -> bool:
        """Update session data."""
        session_key = f"session:{session_id}"
        session_data = self.get_session(session_id)

        if session_data:
            session_data["data"].update(data)
            r.setex(session_key, self.ttl, json.dumps(session_data))
            return True

        return False

    def delete_session(self, session_id: str) -> bool:
        """Delete session (logout)."""
        session_key = f"session:{session_id}"
        return r.delete(session_key) > 0

Session with Hash Storage

More efficient for session objects:

class HashSessionManager:
    """Session manager using Redis hashes for better performance."""

    def __init__(self, ttl: int = 1800):
        self.ttl = ttl

    def create_session(self, user_id: int, **kwargs) -> str:
        """Create session using hash."""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        # Store as hash for efficient field access
        session_fields = {
            "user_id": str(user_id),
            "created_at": datetime.utcnow().isoformat(),
            **{k: str(v) for k, v in kwargs.items()}
        }

        r.hset(session_key, mapping=session_fields)
        r.expire(session_key, self.ttl)

        return session_id

    def get_field(self, session_id: str, field: str) -> str:
        """Get single session field efficiently."""
        session_key = f"session:{session_id}"
        value = r.hget(session_key, field)

        if value:
            r.expire(session_key, self.ttl)  # Refresh TTL

        return value

    def set_field(self, session_id: str, field: str, value: str) -> bool:
        """Update single session field."""
        session_key = f"session:{session_id}"

        if r.exists(session_key):
            r.hset(session_key, field, value)
            r.expire(session_key, self.ttl)
            return True

        return False

    def get_all(self, session_id: str) -> dict:
        """Get all session fields."""
        session_key = f"session:{session_id}"
        data = r.hgetall(session_key)

        if data:
            r.expire(session_key, self.ttl)

        return data

User Activity Tracking

def track_user_activity(user_id: int, action: str):
    """Track user activity with automatic expiration."""
    activity_key = f"user:{user_id}:activity"
    timestamp = datetime.utcnow().isoformat()

    # Add activity to list
    r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))

    # Keep only last 100 activities
    r.ltrim(activity_key, 0, 99)

    # Set expiration (30 days)
    r.expire(activity_key, 2592000)

def get_recent_activity(user_id: int, limit: int = 10) -> list:
    """Get recent user activities."""
    activity_key = f"user:{user_id}:activity"
    activities = r.lrange(activity_key, 0, limit - 1)

    return [json.loads(a) for a in activities]

Pub/Sub Patterns

Basic Publisher/Subscriber

Publisher:

import redis

r = redis.Redis(decode_responses=True)

def publish_event(channel: str, message: dict):
    """Publish event to channel."""
    import json
    r.publish(channel, json.dumps(message))

# Example usage
publish_event("notifications", {
    "type": "user_signup",
    "user_id": 12345,
    "timestamp": datetime.utcnow().isoformat()
})

Subscriber:

import redis
import json

def handle_message(message):
    """Process received message."""
    data = json.loads(message['data'])
    print(f"Received: {data}")

# Initialize pubsub
r = redis.Redis(decode_responses=True)
p = r.pubsub()

# Subscribe to channels
p.subscribe('notifications', 'alerts')

# Listen for messages
for message in p.listen():
    if message['type'] == 'message':
        handle_message(message)

Pattern-Based Subscriptions

# Subscribe to multiple channels with patterns
p = r.pubsub()
p.psubscribe('user:*', 'notification:*')

# Get messages from pattern subscriptions
for message in p.listen():
    if message['type'] == 'pmessage':
        channel = message['channel']
        pattern = message['pattern']
        data = message['data']
        print(f"Pattern {pattern} matched {channel}: {data}")

Async Pub/Sub with Background Thread

import redis
import time

r = redis.Redis(decode_responses=True)
p = r.pubsub()

def message_handler(message):
    """Handle messages in background thread."""
    print(f"Handler received: {message['data']}")

# Subscribe with handler
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})

# Run in background thread
thread = p.run_in_thread(sleep_time=0.001)

# Publish some messages
r.publish('notifications', 'Hello!')
r.publish('alerts', 'Warning!')

time.sleep(1)

# Stop background thread
thread.stop()

Async Pub/Sub with asyncio

import asyncio
import redis.asyncio as redis

async def reader(channel: redis.client.PubSub):
    """Async message reader."""
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
        if message is not None:
            print(f"Received: {message}")

            # Stop on specific message
            if message["data"].decode() == "STOP":
                break

async def pubsub_example():
    """Async pub/sub example."""
    r = await redis.from_url("redis://localhost")

    async with r.pubsub() as pubsub:
        # Subscribe to channels
        await pubsub.subscribe("channel:1", "channel:2")

        # Create reader task
        reader_task = asyncio.create_task(reader(pubsub))

        # Publish messages
        await r.publish("channel:1", "Hello")
        await r.publish("channel:2", "World")
        await r.publish("channel:1", "STOP")

        # Wait for reader to finish
        await reader_task

    await r.close()

# Run async example
asyncio.run(pubsub_example())

Sharded Pub/Sub (Redis 7.0+)

from redis.cluster import RedisCluster, ClusterNode

# Connect to cluster
rc = RedisCluster(startup_nodes=[
    ClusterNode('localhost', 6379),
    ClusterNode('localhost', 6380)
])

# Create sharded pubsub
p = rc.pubsub()
p.ssubscribe('foo')

# Get message from specific node
message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))

Distributed Locks

Simple Lock Implementation

import redis
import time
import uuid

class RedisLock:
    """Simple distributed lock using Redis."""

    def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """Acquire lock."""
        end_time = time.time() + (timeout or self.timeout)

        while True:
            # Try to set lock with NX (only if not exists) and EX (expiration)
            if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
                return True

            if not blocking:
                return False

            if timeout and time.time() > end_time:
                return False

            # Wait before retry
            time.sleep(0.01)

    def release(self) -> bool:
        """Release lock only if we own it."""
        # Use Lua script for atomic check-and-delete
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key, self.identifier)
        return result == 1

    def __enter__(self):
        """Context manager support."""
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager cleanup."""
        self.release()

# Usage example
r = redis.Redis()
lock = RedisLock(r, "resource:123", timeout=5)

with lock:
    # Critical section - only one process at a time
    print("Processing resource 123")
    process_resource()

Advanced Lock with Auto-Renewal

import threading

class RenewableLock:
    """Distributed lock with automatic renewal."""

    def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
        self.renewal_thread = None
        self.stop_renewal = threading.Event()

    def _renew_lock(self):
        """Background task to renew lock."""
        while not self.stop_renewal.is_set():
            time.sleep(self.timeout / 3)  # Renew at 1/3 of timeout

            # Renew only if we still own the lock
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("expire", KEYS[1], ARGV[2])
            else
                return 0
            end
            """

            result = self.redis.eval(lua_script, 1, self.key,
                                   self.identifier, self.timeout)

            if result == 0:
                # We lost the lock
                self.stop_renewal.set()

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """Acquire lock and start auto-renewal."""
        if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
            # Start renewal thread
            self.stop_renewal.clear()
            self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
            self.renewal_thread.start()
            return True

        return False

    def release(self) -> bool:
        """Release lock and stop renewal."""
        self.stop_renewal.set()

        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key, self.identifier)
        return result == 1

Redlock Algorithm (Multiple Redis Instances)

class Redlock:
    """Redlock algorithm for distributed locking across multiple Redis instances."""

    def __init__(self, redis_instances: list):
        """
        Args:
            redis_instances: List of Redis client connections
        """
        self.instances = redis_instances
        self.quorum = len(redis_instances) // 2 + 1

    def acquire(self, resource: str, ttl: int = 10000) -> tuple:
        """
        Acquire lock across multiple Redis instances.

        Returns:
            (success: bool, lock_identifier: str)
        """
        identifier = str(uuid.uuid4())
        start_time = int(time.time() * 1000)

        # Try to acquire lock on all instances
        acquired = 0
        for instance in self.instances:
            try:
                if instance.set(f"lock:{resource}", identifier,
                              nx=True, px=ttl):
                    acquired += 1
            except Exception:
                pass

        # Calculate elapsed time
        elapsed = int(time.time() * 1000) - start_time
        validity_time = ttl - elapsed - 100  # drift compensation

        # Check if we got quorum
        if acquired >= self.quorum and validity_time > 0:
            return True, identifier
        else:
            # Release locks if we didn't get quorum
            self._release_all(resource, identifier)
            return False, None

    def _release_all(self, resource: str, identifier: str):
        """Release lock on all instances."""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        for instance in self.instances:
            try:
                instance.eval(lua_script, 1, f"lock:{resource}", identifier)
            except Exception:
                pass

Data Structures and Operations

Working with Hashes

# User profile storage
def save_user_profile(user_id: int, profile: dict):
    """Save user profile as hash."""
    key = f"user:profile:{user_id}"
    r.hset(key, mapping=profile)
    r.expire(key, 86400)  # 24 hour TTL

def get_user_profile(user_id: int) -> dict:
    """Get complete user profile."""
    key = f"user:profile:{user_id}"
    return r.hgetall(key)

def update_user_field(user_id: int, field: str, value: str):
    """Update single profile field."""
    key = f"user:profile:{user_id}"
    r.hset(key, field, value)

# Example usage
save_user_profile(123, {
    "username": "alice",
    "email": "alice@example.com",
    "age": "30"
})

# Atomic increment
r.hincrby("user:profile:123", "login_count", 1)

Working with Lists

# Job queue implementation
def enqueue_job(queue_name: str, job_data: dict):
    """Add job to queue."""
    key = f"queue:{queue_name}"
    r.rpush(key, json.dumps(job_data))

def dequeue_job(queue_name: str, timeout: int = 0) -> dict:
    """Get job from queue (blocking)."""
    key = f"queue:{queue_name}"

    if timeout > 0:
        # Blocking pop with timeout
        result = r.blpop(key, timeout=timeout)
        if result:
            _, job_data = result
            return json.loads(job_data)
    else:
        # Non-blocking pop
        job_data = r.lpop(key)
        if job_data:
            return json.loads(job_data)

    return None

# Activity feed
def add_to_feed(user_id: int, activity: dict):
    """Add activity to user feed."""
    key = f"feed:{user_id}"
    r.lpush(key, json.dumps(activity))
    r.ltrim(key, 0, 99)  # Keep only latest 100 items
    r.expire(key, 604800)  # 7 days

def get_feed(user_id: int, start: int = 0, end: int = 19) -> list:
    """Get user feed with pagination."""
    key = f"feed:{user_id}"
    items = r.lrange(key, start, end)
    return [json.loads(item) for item in items]

Working with Sets

# Tags and relationships
def add_tags(item_id: int, tags: list):
    """Add tags to item."""
    key = f"item:{item_id}:tags"
    r.sadd(key, *tags)

def get_tags(item_id: int) -> set:
    """Get all tags for item."""
    key = f"item:{item_id}:tags"
    return r.smembers(key)

def find_items_with_all_tags(tags: list) -> set:
    """Find items having all specified tags."""
    keys = [f"item:*:tags" for _ in tags]
    # This is simplified - in practice, you'd need to track item IDs differently
    return r.sinter(*keys)

# Online users tracking
def user_online(user_id: int):
    """Mark user as online."""
    r.sadd("users:online", user_id)
    r.expire(f"user:{user_id}:heartbeat", 60)

def user_offline(user_id: int):
    """Mark user as offline."""
    r.srem("users:online", user_id)

def get_online_users() -> set:
    """Get all online users."""
    return r.smembers("users:online")

def get_online_count() -> int:
    """Get count of online users."""
    return r.scard("users:online")

Working with Sorted Sets

# Leaderboard implementation
def update_score(leaderboard: str, user_id: int, score: float):
    """Update user score in leaderboard."""
    key = f"leaderboard:{leaderboard}"
    r.zadd(key, {user_id: score})

def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list:
    """Get top players (descending order)."""
    key = f"leaderboard:{leaderboard}"
    # ZREVRANGE for descending order (highest scores first)
    return r.zrevrange(key, start, end, withscores=True)

def get_user_rank(leaderboard: str, user_id: int) -> int:
    """Get user's rank (0-indexed)."""
    key = f"leaderboard:{leaderboard}"
    # ZREVRANK for descending rank
    rank = r.zrevrank(key, user_id)
    return rank if rank is not None else -1

def get_user_score(leaderboard: str, user_id: int) -> float:
    """Get user's score."""
    key = f"leaderboard:{leaderboard}"
    score = r.zscore(key, user_id)
    return score if score is not None else 0.0

def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list:
    """Get users within score range."""
    key = f"leaderboard:{leaderboard}"
    return r.zrangebyscore(key, min_score, max_score, withscores=True)

# Time-based sorted set (activity stream)
def add_activity(user_id: int, activity: str):
    """Add timestamped activity."""
    key = f"user:{user_id}:activities"
    timestamp = time.time()
    r.zadd(key, {activity: timestamp})

    # Keep only last 24 hours
    cutoff = timestamp - 86400
    r.zremrangebyscore(key, '-inf', cutoff)

def get_recent_activities(user_id: int, count: int = 10) -> list:
    """Get recent activities."""
    key = f"user:{user_id}:activities"
    # Get most recent (highest timestamps)
    return r.zrevrange(key, 0, count - 1, withscores=True)

Working with Streams

# Event stream
def add_event(stream_key: str, event_data: dict) -> str:
    """Add event to stream."""
    # Returns auto-generated ID (timestamp-sequence)
    event_id = r.xadd(stream_key, event_data)
    return event_id

def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list:
    """Read events from stream."""
    events = r.xread({stream_key: start_id}, count=count)

    # events format: [(stream_name, [(id, data), (id, data), ...])]
    if events:
        _, event_list = events[0]
        return event_list

    return []

# Consumer groups
def create_consumer_group(stream_key: str, group_name: str):
    """Create consumer group for stream."""
    try:
        r.xgroup_create(name=stream_key, groupname=group_name, id='0')
    except redis.ResponseError as e:
        if "BUSYGROUP" not in str(e):
            raise

def read_from_group(stream_key: str, group_name: str,
                   consumer_name: str, count: int = 10) -> list:
    """Read events as consumer in group."""
    # Read new messages with '>'
    events = r.xreadgroup(
        groupname=group_name,
        consumername=consumer_name,
        streams={stream_key: '>'},
        count=count,
        block=5000  # 5 second timeout
    )

    if events:
        _, event_list = events[0]
        return event_list

    return []

def acknowledge_event(stream_key: str, group_name: str, event_id: str):
    """Acknowledge processed event."""
    r.xack(stream_key, group_name, event_id)

# Example: Processing events with consumer group
def process_events(stream_key: str, group_name: str, consumer_name: str):
    """Process events from stream."""
    create_consumer_group(stream_key, group_name)

    while True:
        events = read_from_group(stream_key, group_name, consumer_name, count=10)

        for event_id, event_data in events:
            try:
                # Process event
                process_event(event_data)

                # Acknowledge successful processing
                acknowledge_event(stream_key, group_name, event_id)
            except Exception as e:
                print(f"Failed to process event {event_id}: {e}")
                # Event remains unacknowledged for retry

Performance Optimization

Pipelining for Batch Operations

# Without pipelining (slow - multiple round trips)
for i in range(1000):
    r.set(f"key:{i}", f"value:{i}")

# With pipelining (fast - single round trip)
pipe = r.pipeline()
for i in range(1000):
    pipe.set(f"key:{i}", f"value:{i}")
results = pipe.execute()

# Pipelining with reads
pipe = r.pipeline()
for i in range(100):
    pipe.get(f"key:{i}")
values = pipe.execute()

# Builder pattern with pipeline
class DataLoader:
    def __init__(self):
        self.pipeline = r.pipeline()

    def add_user(self, user_id: int, user_data: dict):
        """Add user data."""
        self.pipeline.hset(f"user:{user_id}", mapping=user_data)
        return self

    def add_to_set(self, set_name: str, value: str):
        """Add to set."""
        self.pipeline.sadd(set_name, value)
        return self

    def execute(self):
        """Execute all pipelined commands."""
        return self.pipeline.execute()

# Usage
loader = DataLoader()
results = (loader
    .add_user(1, {"name": "Alice", "email": "alice@example.com"})
    .add_user(2, {"name": "Bob", "email": "bob@example.com"})
    .add_to_set("active_users", "1")
    .add_to_set("active_users", "2")
    .execute())

Transactions with WATCH

# Optimistic locking with WATCH
def transfer_credits(from_user: int, to_user: int, amount: int) -> bool:
    """Transfer credits between users with optimistic locking."""

    with r.pipeline() as pipe:
        while True:
            try:
                # Watch the keys we're going to modify
                pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")

                # Get current values
                from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
                to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)

                # Check if transfer is possible
                if from_credits < amount:
                    pipe.unwatch()
                    return False

                # Start transaction
                pipe.multi()
                pipe.set(f"user:{from_user}:credits", from_credits - amount)
                pipe.set(f"user:{to_user}:credits", to_credits + amount)

                # Execute transaction
                pipe.execute()
                return True

            except redis.WatchError:
                # Key was modified by another client - retry
                continue

# Lua scripts for atomic operations
increment_script = """
local current = redis.call('GET', KEYS[1])
if not current then
    current = 0
end
local new_val = tonumber(current) + tonumber(ARGV[1])
redis.call('SET', KEYS[1], new_val)
return new_val
"""

# Register and use Lua script
increment = r.register_script(increment_script)
new_value = increment(keys=['counter:views'], args=[1])

Lua Scripts for Complex Operations

# Rate limiting with Lua
rate_limit_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)

if current == 1 then
    redis.call('EXPIRE', key, window)
end

if current > limit then
    return 0
else
    return 1
end
"""

rate_limiter = r.register_script(rate_limit_script)

def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool:
    """Check if user is within rate limit."""
    key = f"rate_limit:{user_id}"
    result = rate_limiter(keys=[key], args=[limit, window])
    return result == 1

# Get-or-set pattern with Lua
get_or_set_script = """
local value = redis.call('GET', KEYS[1])
if value then
    return value
else
    redis.call('SET', KEYS[1], ARGV[1])
    redis.call('EXPIRE', KEYS[1], ARGV[2])
    return ARGV[1]
end
"""

get_or_set = r.register_script(get_or_set_script)

def get_or_compute(key: str, compute_fn, ttl: int = 3600):
    """Get value from cache or compute and cache it."""
    value = get_or_set(keys=[key], args=["__COMPUTING__", ttl])

    if value == "__COMPUTING__":
        # We set the placeholder - compute the real value
        computed = compute_fn()
        r.setex(key, ttl, computed)
        return computed

    return value

Production Patterns

High Availability with Sentinel

from redis.sentinel import Sentinel

# Connect to Sentinel
sentinel = Sentinel([
    ('sentinel1', 26379),
    ('sentinel2', 26379),
    ('sentinel3', 26379)
], socket_timeout=0.5)

# Get master connection
master = sentinel.master_for('mymaster', socket_timeout=0.5)

# Get replica connection (for read-only operations)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)

# Use master for writes
master.set('key', 'value')

# Use replica for reads (optional, for load distribution)
value = replica.get('key')

Async Redis with asyncio

import asyncio
import redis.asyncio as redis

async def async_redis_operations():
    """Async Redis operations example."""
    # Create async connection
    r = await redis.from_url("redis://localhost")

    try:
        # Async operations
        await r.set("async_key", "async_value")
        value = await r.get("async_key")
        print(f"Value: {value}")

        # Async pipeline
        async with r.pipeline(transaction=True) as pipe:
            await pipe.set("key1", "value1")
            await pipe.set("key2", "value2")
            await pipe.get("key1")
            results = await pipe.execute()

        print(f"Pipeline results: {results}")

    finally:
        await r.close()

# Run async operations
asyncio.run(async_redis_operations())

Connection Pool Configuration

# Production-ready connection pool
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=50,           # Max pool size
    socket_timeout=5,             # Socket timeout
    socket_connect_timeout=5,     # Connection timeout
    socket_keepalive=True,        # Keep TCP connection alive
    socket_keepalive_options={
        socket.TCP_KEEPIDLE: 60,
        socket.TCP_KEEPINTVL: 10,
        socket.TCP_KEEPCNT: 3
    },
    retry_on_timeout=True,        # Retry on timeout
    health_check_interval=30,     # Health check every 30s
    decode_responses=True         # Auto-decode bytes to strings
)

r = redis.Redis(connection_pool=pool)

Error Handling and Resilience

import redis
from redis.exceptions import ConnectionError, TimeoutError
import time

class ResilientRedisClient:
    """Redis client with retry logic and circuit breaker."""

    def __init__(self, max_retries: int = 3, backoff: float = 0.1):
        self.redis = redis.Redis(
            host='localhost',
            port=6379,
            socket_timeout=5,
            retry_on_timeout=True
        )
        self.max_retries = max_retries
        self.backoff = backoff

    def get_with_retry(self, key: str, default=None):
        """Get value with exponential backoff retry."""
        for attempt in range(self.max_retries):
            try:
                return self.redis.get(key) or default
            except (ConnectionError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    # Log error and return default
                    print(f"Redis error after {self.max_retries} attempts: {e}")
                    return default

                # Exponential backoff
                wait_time = self.backoff * (2 ** attempt)
                time.sleep(wait_time)

    def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
        """Set value with retry logic."""
        for attempt in range(self.max_retries):
            try:
                if ttl:
                    return self.redis.setex(key, ttl, value)
                else:
                    return self.redis.set(key, value)
            except (ConnectionError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    print(f"Redis error after {self.max_retries} attempts: {e}")
                    return False

                wait_time = self.backoff * (2 ** attempt)
                time.sleep(wait_time)

Monitoring and Metrics

def get_redis_info(section: str = None) -> dict:
    """Get Redis server information."""
    return r.info(section=section)

def monitor_memory_usage():
    """Monitor Redis memory usage."""
    info = r.info('memory')

    used_memory = info['used_memory_human']
    peak_memory = info['used_memory_peak_human']
    memory_fragmentation = info['mem_fragmentation_ratio']

    print(f"Used Memory: {used_memory}")
    print(f"Peak Memory: {peak_memory}")
    print(f"Fragmentation Ratio: {memory_fragmentation}")

    return info

def monitor_stats():
    """Monitor Redis statistics."""
    info = r.info('stats')

    total_connections = info['total_connections_received']
    total_commands = info['total_commands_processed']
    ops_per_sec = info['instantaneous_ops_per_sec']

    print(f"Total Connections: {total_connections}")
    print(f"Total Commands: {total_commands}")
    print(f"Ops/sec: {ops_per_sec}")

    return info

def get_slow_log(count: int = 10):
    """Get slow query log."""
    slow_log = r.slowlog_get(count)

    for entry in slow_log:
        print(f"Command: {entry['command']}")
        print(f"Duration: {entry['duration']} microseconds")
        print(f"Time: {entry['start_time']}")
        print("---")

    return slow_log

Best Practices

Key Naming Conventions

Use consistent, hierarchical naming:

# Good naming patterns
user:123:profile              # User profile data
user:123:sessions:abc         # User session
cache:product:456             # Cached product
queue:emails:pending          # Email queue
lock:resource:789             # Resource lock
counter:api:requests:daily    # Daily API request counter
leaderboard:global:score      # Global leaderboard

# Avoid
u123                          # Too cryptic
user_profile_123              # Underscores less common
123:user                      # Wrong hierarchy

Memory Management

# Set TTL on all temporary data
r.setex("temp:data", 3600, value)  # Expires in 1 hour

# Limit collection sizes
r.lpush("activity_log", entry)
r.ltrim("activity_log", 0, 999)  # Keep only 1000 items

# Use appropriate data structures
# Hash is more memory-efficient than multiple keys
r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})
# vs
r.set("user:123:name", "Alice")
r.set("user:123:email", "alice@example.com")

# Monitor memory usage
if r.info('memory')['used_memory'] > threshold:
    # Implement eviction or cleanup
    cleanup_old_data()

Security

# Use authentication
r = redis.Redis(
    host='localhost',
    port=6379,
    password='your-secure-password',
    username='your-username'  # Redis 6+
)

# Use SSL/TLS for production
pool = redis.ConnectionPool(
    host='redis.example.com',
    port=6380,
    connection_class=redis.SSLConnection,
    ssl_cert_reqs='required',
    ssl_ca_certs='/path/to/ca-cert.pem'
)

# Credential provider pattern
from redis import UsernamePasswordCredentialProvider

creds_provider = UsernamePasswordCredentialProvider("username", "password")
r = redis.Redis(
    host="localhost",
    port=6379,
    credential_provider=creds_provider
)

Testing

import fakeredis
import pytest

@pytest.fixture
def redis_client():
    """Provide fake Redis client for testing."""
    return fakeredis.FakeRedis(decode_responses=True)

def test_caching(redis_client):
    """Test caching logic."""
    # Test cache miss
    assert redis_client.get("test_key") is None

    # Test cache set
    redis_client.setex("test_key", 60, "test_value")
    assert redis_client.get("test_key") == "test_value"

    # Test expiration
    assert redis_client.ttl("test_key") <= 60

def test_session_management(redis_client):
    """Test session operations."""
    session_manager = SessionManager(redis_client)

    # Create session
    session_id = session_manager.create_session(user_id=123)
    assert session_id is not None

    # Get session
    session = session_manager.get_session(session_id)
    assert session['user_id'] == 123

    # Delete session
    assert session_manager.delete_session(session_id) is True
    assert session_manager.get_session(session_id) is None

Examples

Example 1: User Session Management with Redis

import redis
import json
import uuid
from datetime import datetime, timedelta

class UserSessionManager:
    """Complete user session management with Redis."""

    def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
        self.redis = redis_client
        self.ttl = ttl

    def create_session(self, user_id: int, user_data: dict = None) -> str:
        """Create new user session."""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "last_accessed": datetime.utcnow().isoformat(),
            "data": user_data or {}
        }

        # Store session with TTL
        self.redis.setex(session_key, self.ttl, json.dumps(session_data))

        # Track user's active sessions
        self.redis.sadd(f"user:{user_id}:sessions", session_id)

        return session_id

    def get_session(self, session_id: str) -> dict:
        """Get session and refresh TTL."""
        session_key = f"session:{session_id}"
        session_data = self.redis.get(session_key)

        if session_data:
            session = json.loads(session_data)
            session['last_accessed'] = datetime.utcnow().isoformat()

            # Refresh TTL
            self.redis.setex(session_key, self.ttl, json.dumps(session))

            return session

        return None

    def delete_session(self, session_id: str) -> bool:
        """Delete session."""
        session = self.get_session(session_id)
        if not session:
            return False

        user_id = session['user_id']

        # Remove session
        self.redis.delete(f"session:{session_id}")

        # Remove from user's session set
        self.redis.srem(f"user:{user_id}:sessions", session_id)

        return True

    def delete_all_user_sessions(self, user_id: int):
        """Delete all sessions for a user."""
        sessions_key = f"user:{user_id}:sessions"
        session_ids = self.redis.smembers(sessions_key)

        for session_id in session_ids:
            self.redis.delete(f"session:{session_id}")

        self.redis.delete(sessions_key)

    def get_user_sessions(self, user_id: int) -> list:
        """Get all active sessions for a user."""
        sessions_key = f"user:{user_id}:sessions"
        session_ids = self.redis.smembers(sessions_key)

        sessions = []
        for session_id in session_ids:
            session = self.get_session(session_id)
            if session:
                session['session_id'] = session_id
                sessions.append(session)

        return sessions

# Usage
r = redis.Redis(decode_responses=True)
session_mgr = UserSessionManager(r)

# Create session
session_id = session_mgr.create_session(
    user_id=123,
    user_data={"role": "admin", "permissions": ["read", "write"]}
)

# Get session
session = session_mgr.get_session(session_id)
print(f"User ID: {session['user_id']}")

# List all user sessions
sessions = session_mgr.get_user_sessions(123)
print(f"Active sessions: {len(sessions)}")

# Logout (delete session)
session_mgr.delete_session(session_id)

Example 2: Real-Time Leaderboard

import redis
import time

class Leaderboard:
    """Real-time leaderboard using Redis sorted sets."""

    def __init__(self, redis_client: redis.Redis, name: str):
        self.redis = redis_client
        self.key = f"leaderboard:{name}"

    def add_score(self, player_id: str, score: float):
        """Add or update player score."""
        self.redis.zadd(self.key, {player_id: score})

    def increment_score(self, player_id: str, increment: float):
        """Increment player score."""
        self.redis.zincrby(self.key, increment, player_id)

    def get_top(self, count: int = 10) -> list:
        """Get top players."""
        # ZREVRANGE for highest scores first
        players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)

        return [
            {
                "rank": idx + 1,
                "player_id": player_id,
                "score": score
            }
            for idx, (player_id, score) in enumerate(players)
        ]

    def get_rank(self, player_id: str) -> dict:
        """Get player rank and score."""
        score = self.redis.zscore(self.key, player_id)
        if score is None:
            return None

        # ZREVRANK for rank (0-indexed, highest first)
        rank = self.redis.zrevrank(self.key, player_id)

        return {
            "player_id": player_id,
            "rank": rank + 1 if rank is not None else None,
            "score": score
        }

    def get_around(self, player_id: str, count: int = 5) -> list:
        """Get players around a specific player."""
        rank = self.redis.zrevrank(self.key, player_id)
        if rank is None:
            return []

        # Get players before and after
        start = max(0, rank - count)
        end = rank + count

        players = self.redis.zrevrange(self.key, start, end, withscores=True)

        return [
            {
                "rank": start + idx + 1,
                "player_id": pid,
                "score": score,
                "is_current": pid == player_id
            }
            for idx, (pid, score) in enumerate(players)
        ]

    def get_total_players(self) -> int:
        """Get total number of players."""
        return self.redis.zcard(self.key)

    def remove_player(self, player_id: str) -> bool:
        """Remove player from leaderboard."""
        return self.redis.zrem(self.key, player_id) > 0

# Usage
r = redis.Redis(decode_responses=True)
leaderboard = Leaderboard(r, "global")

# Add scores
leaderboard.add_score("alice", 1500)
leaderboard.add_score("bob", 2000)
leaderboard.add_score("charlie", 1800)
leaderboard.increment_score("alice", 200)  # alice now at 1700

# Get top 10
top_players = leaderboard.get_top(10)
for player in top_players:
    print(f"#{player['rank']}: {player['player_id']} - {player['score']}")

# Get player rank
alice_stats = leaderboard.get_rank("alice")
print(f"Alice is rank {alice_stats['rank']} with {alice_stats['score']} points")

# Get players around alice
nearby = leaderboard.get_around("alice", count=2)
for player in nearby:
    marker = " <-- YOU" if player['is_current'] else ""
    print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")

Example 3: Distributed Rate Limiter

import redis
import time

class RateLimiter:
    """Distributed rate limiter using Redis."""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

        # Lua script for atomic rate limiting
        self.rate_limit_script = self.redis.register_script("""
            local key = KEYS[1]
            local limit = tonumber(ARGV[1])
            local window = tonumber(ARGV[2])

            local current = redis.call('INCR', key)

            if current == 1 then
                redis.call('EXPIRE', key, window)
            end

            if current > limit then
                return {0, limit, current - 1}
            else
                return {1, limit, current}
            end
        """)

    def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
        """
        Check if request is within rate limit.

        Args:
            identifier: User ID, IP address, or API key
            limit: Maximum requests allowed
            window: Time window in seconds

        Returns:
            dict with allowed (bool), limit, current, remaining
        """
        key = f"rate_limit:{identifier}:{int(time.time() // window)}"

        allowed, max_limit, current = self.rate_limit_script(
            keys=[key],
            args=[limit, window]
        )

        return {
            "allowed": bool(allowed),
            "limit": max_limit,
            "current": current,
            "remaining": max(0, max_limit - current),
            "reset_at": (int(time.time() // window) + 1) * window
        }

    def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
        """
        Sliding window rate limiter using sorted sets.
        More accurate but slightly more expensive.
        """
        key = f"rate_limit:sliding:{identifier}"
        now = time.time()
        window_start = now - window

        # Remove old entries
        self.redis.zremrangebyscore(key, 0, window_start)

        # Count current requests
        current = self.redis.zcard(key)

        if current < limit:
            # Add new request
            self.redis.zadd(key, {str(now): now})
            self.redis.expire(key, window)

            return {
                "allowed": True,
                "limit": limit,
                "current": current + 1,
                "remaining": limit - current - 1
            }
        else:
            return {
                "allowed": False,
                "limit": limit,
                "current": current,
                "remaining": 0
            }

# Usage
r = redis.Redis(decode_responses=True)
limiter = RateLimiter(r)

# API rate limiting: 100 requests per minute
user_id = "user_123"
result = limiter.check_rate_limit(user_id, limit=100, window=60)

if result["allowed"]:
    print(f"Request allowed. {result['remaining']} requests remaining.")
    # Process request
else:
    print(f"Rate limit exceeded. Try again at {result['reset_at']}")
    # Return 429 Too Many Requests

# More accurate sliding window
result = limiter.sliding_window_check(user_id, limit=100, window=60)

Example 4: Distributed Job Queue

import redis
import json
import time
import uuid
from typing import Optional, Callable

class JobQueue:
    """Distributed job queue with Redis."""

    def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
        self.redis = redis_client
        self.queue_name = queue_name
        self.queue_key = f"queue:{queue_name}"
        self.processing_key = f"queue:{queue_name}:processing"

    def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
        """
        Add job to queue.

        Args:
            job_type: Type of job (for routing to workers)
            payload: Job data
            priority: Higher priority = processed first (0 = normal)

        Returns:
            job_id
        """
        job_id = str(uuid.uuid4())

        job_data = {
            "id": job_id,
            "type": job_type,
            "payload": payload,
            "enqueued_at": time.time(),
            "attempts": 0
        }

        # Add to queue (use ZADD for priority queue)
        score = -priority  # Negative for higher priority first
        self.redis.zadd(self.queue_key, {json.dumps(job_data): score})

        return job_id

    def dequeue(self, timeout: int = 0) -> Optional[dict]:
        """
        Get next job from queue.

        Args:
            timeout: Block for this many seconds (0 = no blocking)

        Returns:
            Job data or None
        """
        # Get highest priority job (lowest score)
        jobs = self.redis.zrange(self.queue_key, 0, 0)

        if not jobs:
            if timeout > 0:
                time.sleep(min(timeout, 1))
                return self.dequeue(timeout - 1)
            return None

        job_json = jobs[0]

        # Move to processing set atomically
        pipe = self.redis.pipeline()
        pipe.zrem(self.queue_key, job_json)
        pipe.zadd(self.processing_key, {job_json: time.time()})
        pipe.execute()

        job_data = json.loads(job_json)
        job_data['attempts'] += 1

        return job_data

    def complete(self, job_data: dict):
        """Mark job as completed."""
        job_json = json.dumps({
            k: v for k, v in job_data.items()
            if k != 'attempts'
        })

        # Remove from processing
        self.redis.zrem(self.processing_key, job_json)

    def retry(self, job_data: dict, delay: int = 0):
        """Retry failed job."""
        job_json = json.dumps({
            k: v for k, v in job_data.items()
            if k != 'attempts'
        })

        # Remove from processing
        self.redis.zrem(self.processing_key, job_json)

        # Re-enqueue with delay
        if delay > 0:
            time.sleep(delay)

        self.redis.zadd(self.queue_key, {job_json: 0})

    def get_stats(self) -> dict:
        """Get queue statistics."""
        return {
            "queued": self.redis.zcard(self.queue_key),
            "processing": self.redis.zcard(self.processing_key)
        }

# Worker example
class Worker:
    """Job worker."""

    def __init__(self, queue: JobQueue, handlers: dict):
        self.queue = queue
        self.handlers = handlers

    def process_jobs(self):
        """Process jobs from queue."""
        print("Worker started. Waiting for jobs...")

        while True:
            job = self.queue.dequeue(timeout=5)

            if job:
                print(f"Processing job {job['id']} (type: {job['type']})")

                try:
                    # Get handler for job type
                    handler = self.handlers.get(job['type'])

                    if handler:
                        handler(job['payload'])
                        self.queue.complete(job)
                        print(f"Job {job['id']} completed")
                    else:
                        print(f"No handler for job type: {job['type']}")
                        self.queue.complete(job)

                except Exception as e:
                    print(f"Job {job['id']} failed: {e}")

                    if job['attempts'] < 3:
                        # Retry with exponential backoff
                        delay = 2 ** job['attempts']
                        print(f"Retrying in {delay}s...")
                        self.queue.retry(job, delay=delay)
                    else:
                        print(f"Job {job['id']} failed permanently")
                        self.queue.complete(job)

# Usage
r = redis.Redis(decode_responses=True)
queue = JobQueue(r, "email_queue")

# Enqueue jobs
job_id = queue.enqueue("send_email", {
    "to": "user@example.com",
    "subject": "Welcome!",
    "body": "Thanks for signing up."
}, priority=1)

# Define handlers
def send_email_handler(payload):
    print(f"Sending email to {payload['to']}")
    # Email sending logic here
    time.sleep(1)  # Simulate work

handlers = {
    "send_email": send_email_handler
}

# Start worker
worker = Worker(queue, handlers)
# worker.process_jobs()  # This blocks - run in separate process

Example 5: Real-Time Event Streaming

import redis
import json
import time
from typing import Callable, Optional

class EventStream:
    """Real-time event streaming with Redis Streams."""

    def __init__(self, redis_client: redis.Redis, stream_name: str):
        self.redis = redis_client
        self.stream_name = stream_name

    def publish(self, event_type: str, data: dict) -> str:
        """Publish event to stream."""
        event = {
            "type": event_type,
            "data": json.dumps(data),
            "timestamp": time.time()
        }

        # Add to stream (returns auto-generated ID)
        event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
        return event_id

    def read_events(self, last_id: str = '0', count: int = 10) -> list:
        """Read events from stream."""
        events = self.redis.xread(
            {self.stream_name: last_id},
            count=count,
            block=1000  # 1 second timeout
        )

        if not events:
            return []

        _, event_list = events[0]

        return [
            {
                "id": event_id,
                "type": event_data[b'type'].decode(),
                "data": json.loads(event_data[b'data'].decode()),
                "timestamp": float(event_data[b'timestamp'])
            }
            for event_id, event_data in event_list
        ]

    def create_consumer_group(self, group_name: str):
        """Create consumer group for parallel processing."""
        try:
            self.redis.xgroup_create(
                name=self.stream_name,
                groupname=group_name,
                id='0',
                mkstream=True
            )
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    def consume_events(self, group_name: str, consumer_name: str,
                      count: int = 10) -> list:
        """Consume events as part of consumer group."""
        events = self.redis.xreadgroup(
            groupname=group_name,
            consumername=consumer_name,
            streams={self.stream_name: '>'},
            count=count,
            block=5000
        )

        if not events:
            return []

        _, event_list = events[0]

        return [
            {
                "id": event_id,
                "type": event_data[b'type'].decode(),
                "data": json.loads(event_data[b'data'].decode()),
                "timestamp": float(event_data[b'timestamp'])
            }
            for event_id, event_data in event_list
        ]

    def acknowledge(self, group_name: str, event_id: str):
        """Acknowledge processed event."""
        self.redis.xack(self.stream_name, group_name, event_id)

    def get_pending(self, group_name: str) -> list:
        """Get pending (unacknowledged) events."""
        pending = self.redis.xpending_range(
            name=self.stream_name,
            groupname=group_name,
            min='-',
            max='+',
            count=100
        )

        return pending

# Usage Example: Activity Feed
r = redis.Redis()
activity_stream = EventStream(r, "user_activity")

# Publish events
activity_stream.publish("user_signup", {
    "user_id": 123,
    "email": "alice@example.com"
})

activity_stream.publish("post_created", {
    "user_id": 123,
    "post_id": 456,
    "title": "My First Post"
})

# Read events (simple consumer)
last_id = '0'
while True:
    events = activity_stream.read_events(last_id, count=10)

    for event in events:
        print(f"Event: {event['type']}")
        print(f"Data: {event['data']}")
        last_id = event['id']

    if not events:
        break

# Consumer group example
activity_stream.create_consumer_group("processors")

# Worker consuming events
while True:
    events = activity_stream.consume_events(
        group_name="processors",
        consumer_name="worker-1",
        count=10
    )

    for event in events:
        try:
            # Process event
            process_event(event)

            # Acknowledge
            activity_stream.acknowledge("processors", event['id'])
        except Exception as e:
            print(f"Failed to process event {event['id']}: {e}")
            # Event remains unacknowledged for retry

Example 6: Cache-Aside Pattern with Multi-Level Caching

import redis
import json
import hashlib
from typing import Optional, Any, Callable

class MultiLevelCache:
    """Multi-level caching with Redis and local cache."""

    def __init__(self, redis_client: redis.Redis,
                 local_cache_size: int = 100,
                 local_ttl: int = 60,
                 redis_ttl: int = 3600):
        self.redis = redis_client
        self.local_cache = {}
        self.local_cache_size = local_cache_size
        self.local_ttl = local_ttl
        self.redis_ttl = redis_ttl

    def _make_key(self, namespace: str, key: str) -> str:
        """Generate cache key."""
        return f"cache:{namespace}:{key}"

    def get(self, namespace: str, key: str,
            compute_fn: Optional[Callable] = None) -> Optional[Any]:
        """
        Get value from cache with fallback to compute function.

        Lookup order: Local cache → Redis → Compute function
        """
        cache_key = self._make_key(namespace, key)

        # Level 1: Local cache
        if cache_key in self.local_cache:
            entry = self.local_cache[cache_key]
            if time.time() < entry['expires_at']:
                return entry['value']
            else:
                del self.local_cache[cache_key]

        # Level 2: Redis cache
        redis_value = self.redis.get(cache_key)
        if redis_value:
            value = json.loads(redis_value)

            # Populate local cache
            self._set_local(cache_key, value)

            return value

        # Level 3: Compute function
        if compute_fn:
            value = compute_fn()
            if value is not None:
                self.set(namespace, key, value)
            return value

        return None

    def set(self, namespace: str, key: str, value: Any):
        """Set value in both cache levels."""
        cache_key = self._make_key(namespace, key)
        serialized = json.dumps(value)

        # Set in Redis
        self.redis.setex(cache_key, self.redis_ttl, serialized)

        # Set in local cache
        self._set_local(cache_key, value)

    def _set_local(self, key: str, value: Any):
        """Set value in local cache with LRU eviction."""
        # Simple LRU: remove oldest if at capacity
        if len(self.local_cache) >= self.local_cache_size:
            # Remove oldest entry
            oldest_key = min(
                self.local_cache.keys(),
                key=lambda k: self.local_cache[k]['expires_at']
            )
            del self.local_cache[oldest_key]

        self.local_cache[key] = {
            'value': value,
            'expires_at': time.time() + self.local_ttl
        }

    def delete(self, namespace: str, key: str):
        """Delete from all cache levels."""
        cache_key = self._make_key(namespace, key)

        # Delete from Redis
        self.redis.delete(cache_key)

        # Delete from local cache
        if cache_key in self.local_cache:
            del self.local_cache[cache_key]

    def invalidate_namespace(self, namespace: str):
        """Invalidate all keys in namespace."""
        pattern = f"cache:{namespace}:*"

        # Delete from Redis
        for key in self.redis.scan_iter(match=pattern, count=100):
            self.redis.delete(key)

        # Delete from local cache
        to_delete = [
            k for k in self.local_cache.keys()
            if k.startswith(f"cache:{namespace}:")
        ]
        for k in to_delete:
            del self.local_cache[k]

# Usage
r = redis.Redis(decode_responses=True)
cache = MultiLevelCache(r)

def get_user(user_id: int) -> dict:
    """Get user with multi-level caching."""
    return cache.get(
        namespace="users",
        key=str(user_id),
        compute_fn=lambda: database.query_user(user_id)
    )

# First call: Queries database, caches result
user = get_user(123)

# Second call: Returns from local cache (fastest)
user = get_user(123)

# Update user
def update_user(user_id: int, data: dict):
    database.update_user(user_id, data)

    # Invalidate cache
    cache.delete("users", str(user_id))

# Invalidate all user caches
cache.invalidate_namespace("users")

Example 7: Geo-Location with Redis

import redis

class GeoLocation:
    """Geo-spatial indexing and queries with Redis."""

    def __init__(self, redis_client: redis.Redis, index_name: str):
        self.redis = redis_client
        self.key = f"geo:{index_name}"

    def add_location(self, location_id: str, longitude: float, latitude: float):
        """Add location to geo index."""
        self.redis.geoadd(self.key, longitude, latitude, location_id)

    def add_locations(self, locations: list):
        """Batch add locations.

        Args:
            locations: List of (location_id, longitude, latitude) tuples
        """
        self.redis.geoadd(self.key, *[
            item for loc in locations
            for item in (loc[1], loc[2], loc[0])
        ])

    def get_position(self, location_id: str) -> tuple:
        """Get coordinates of a location."""
        result = self.redis.geopos(self.key, location_id)
        if result and result[0]:
            return result[0]  # (longitude, latitude)
        return None

    def find_nearby(self, longitude: float, latitude: float,
                   radius: float, unit: str = 'km', count: int = None) -> list:
        """
        Find locations within radius.

        Args:
            longitude: Center longitude
            latitude: Center latitude
            radius: Search radius
            unit: Distance unit ('m', 'km', 'mi', 'ft')
            count: Maximum results
        """
        args = {
            'longitude': longitude,
            'latitude': latitude,
            'radius': radius,
            'unit': unit,
            'withdist': True,
            'withcoord': True,
            'sort': 'ASC'
        }

        if count:
            args['count'] = count

        results = self.redis.georadius(self.key, **args)

        return [
            {
                'location_id': location_id,
                'distance': distance,
                'coordinates': (longitude, latitude)
            }
            for location_id, distance, (longitude, latitude) in results
        ]

    def find_nearby_member(self, location_id: str, radius: float,
                          unit: str = 'km', count: int = None) -> list:
        """Find locations near an existing member."""
        args = {
            'member': location_id,
            'radius': radius,
            'unit': unit,
            'withdist': True,
            'sort': 'ASC'
        }

        if count:
            args['count'] = count

        results = self.redis.georadiusbymember(self.key, **args)

        return [
            {
                'location_id': loc_id,
                'distance': distance
            }
            for loc_id, distance in results
            if loc_id != location_id  # Exclude self
        ]

    def distance_between(self, location_id1: str, location_id2: str,
                        unit: str = 'km') -> float:
        """Calculate distance between two locations."""
        return self.redis.geodist(self.key, location_id1, location_id2, unit)

# Usage Example: Restaurant finder
r = redis.Redis(decode_responses=True)
restaurants = GeoLocation(r, "restaurants")

# Add restaurants
restaurants.add_locations([
    ("rest1", -122.4194, 37.7749),  # San Francisco
    ("rest2", -122.4068, 37.7849),
    ("rest3", -122.4312, 37.7652),
])

# Find restaurants near coordinates
nearby = restaurants.find_nearby(
    longitude=-122.4194,
    latitude=37.7749,
    radius=5,
    unit='km',
    count=10
)

for restaurant in nearby:
    print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} km away")

# Find restaurants near a specific restaurant
similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')

# Get distance between two restaurants
distance = restaurants.distance_between("rest1", "rest2", unit='km')
print(f"Distance: {distance:.2f} km")

Quick Reference

Common Operations

# Connection
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Strings
r.set('key', 'value')
r.setex('key', 3600, 'value')  # With TTL
r.get('key')
r.incr('counter')

# Hashes
r.hset('user:123', 'name', 'Alice')
r.hset('user:123', mapping={'name': 'Alice', 'age': 30})
r.hget('user:123', 'name')
r.hgetall('user:123')

# Lists
r.lpush('queue', 'item')
r.rpush('queue', 'item')
r.lpop('queue')
r.lrange('queue', 0, -1)

# Sets
r.sadd('tags', 'python', 'redis')
r.smembers('tags')
r.sismember('tags', 'python')

# Sorted Sets
r.zadd('leaderboard', {'alice': 100, 'bob': 200})
r.zrange('leaderboard', 0, -1, withscores=True)
r.zrank('leaderboard', 'alice')

# Expiration
r.expire('key', 3600)
r.ttl('key')

# Pipelining
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()

Time Complexity

  • GET, SET: O(1)
  • HGET, HSET: O(1)
  • LPUSH, RPUSH, LPOP, RPOP: O(1)
  • SADD, SREM, SISMEMBER: O(1)
  • ZADD, ZREM: O(log(N))
  • ZRANGE, ZREVRANGE: O(log(N)+M) where M is result size
  • SCAN, SSCAN, HSCAN, ZSCAN: O(1) per iteration

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: State Management, Distributed Systems, Performance Optimization Compatible With: redis-py, Redis 6.0+, Redis 7.0+