Claude Code Plugins

Community-maintained marketplace

Feedback
3
0

Comprehensive caching strategies and patterns for performance optimization. Use when implementing cache layers, cache invalidation, TTL policies, or distributed caching with Redis/Memcached. Triggers: cache, caching, Redis, Memcached, TTL, invalidation, cache-aside, write-through, cache stampede, distributed cache.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md


name: caching description: Comprehensive caching strategies and patterns for performance optimization. Use when implementing cache layers, cache invalidation, TTL policies, or distributed caching with Redis/Memcached. Triggers: cache, caching, Redis, Memcached, TTL, invalidation, cache-aside, write-through, cache stampede, distributed cache.

Caching

Overview

Caching improves application performance by storing frequently accessed data closer to the consumer. This skill covers cache strategies (aside, through, behind), invalidation patterns, TTL management, Redis/Memcached usage, stampede prevention, and distributed caching.

Instructions

1. Cache Strategies

Cache-Aside (Lazy Loading)

from typing import TypeVar, Optional, Callable
import json

T = TypeVar('T')

class CacheAside:
    """Application manages cache explicitly."""

    def __init__(self, cache_client, default_ttl: int = 3600):
        self.cache = cache_client
        self.default_ttl = default_ttl

    async def get_or_load(
        self,
        key: str,
        loader: Callable[[], T],
        ttl: Optional[int] = None
    ) -> T:
        # Try cache first
        cached = await self.cache.get(key)
        if cached is not None:
            return json.loads(cached)

        # Load from source
        value = await loader()

        # Store in cache
        await self.cache.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value)
        )

        return value

    async def invalidate(self, key: str):
        await self.cache.delete(key)

# Usage
cache = CacheAside(redis_client)

async def get_user(user_id: str) -> User:
    return await cache.get_or_load(
        f"user:{user_id}",
        lambda: database.get_user(user_id),
        ttl=300
    )

Write-Through Cache

class WriteThrough:
    """Writes go to cache and database simultaneously."""

    def __init__(self, cache_client, database, default_ttl: int = 3600):
        self.cache = cache_client
        self.database = database
        self.default_ttl = default_ttl

    async def write(self, key: str, value: any, ttl: Optional[int] = None):
        # Write to database first
        await self.database.save(key, value)

        # Then update cache
        await self.cache.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value)
        )

    async def read(self, key: str) -> Optional[any]:
        cached = await self.cache.get(key)
        if cached:
            return json.loads(cached)

        # Fallback to database
        value = await self.database.get(key)
        if value:
            await self.cache.setex(key, self.default_ttl, json.dumps(value))
        return value

Write-Behind (Write-Back) Cache

import asyncio
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Any
import time

@dataclass
class PendingWrite:
    key: str
    value: Any
    timestamp: float

class WriteBehind:
    """Writes go to cache immediately, database asynchronously."""

    def __init__(
        self,
        cache_client,
        database,
        flush_interval: float = 5.0,
        batch_size: int = 100
    ):
        self.cache = cache_client
        self.database = database
        self.flush_interval = flush_interval
        self.batch_size = batch_size
        self.pending_writes: Dict[str, PendingWrite] = {}
        self._lock = asyncio.Lock()
        self._flush_task = None

    async def start(self):
        self._flush_task = asyncio.create_task(self._flush_loop())

    async def stop(self):
        if self._flush_task:
            self._flush_task.cancel()
            await self._flush_pending()

    async def write(self, key: str, value: Any):
        # Write to cache immediately
        await self.cache.set(key, json.dumps(value))

        # Queue for database write
        async with self._lock:
            self.pending_writes[key] = PendingWrite(
                key=key,
                value=value,
                timestamp=time.time()
            )

        if len(self.pending_writes) >= self.batch_size:
            await self._flush_pending()

    async def _flush_loop(self):
        while True:
            await asyncio.sleep(self.flush_interval)
            await self._flush_pending()

    async def _flush_pending(self):
        async with self._lock:
            if not self.pending_writes:
                return

            writes = list(self.pending_writes.values())
            self.pending_writes.clear()

        # Batch write to database
        await self.database.batch_save(
            [(w.key, w.value) for w in writes]
        )

Read-Through Cache

class ReadThrough:
    """Cache automatically loads from database on miss."""

    def __init__(self, cache_client, loader, default_ttl: int = 3600):
        self.cache = cache_client
        self.loader = loader
        self.default_ttl = default_ttl

    async def get(self, key: str) -> Optional[Any]:
        # Check cache
        cached = await self.cache.get(key)
        if cached:
            return json.loads(cached)

        # Auto-load on miss
        value = await self.loader(key)
        if value is not None:
            await self.cache.setex(key, self.default_ttl, json.dumps(value))

        return value

2. Invalidation Strategies

from enum import Enum
from typing import Set, List
import fnmatch

class InvalidationStrategy(Enum):
    TIME_BASED = "time_based"
    EVENT_BASED = "event_based"
    VERSION_BASED = "version_based"

class CacheInvalidator:
    def __init__(self, cache_client):
        self.cache = cache_client
        self._tag_index: Dict[str, Set[str]] = defaultdict(set)

    # Tag-based invalidation
    async def set_with_tags(
        self,
        key: str,
        value: Any,
        tags: List[str],
        ttl: int = 3600
    ):
        await self.cache.setex(key, ttl, json.dumps(value))

        for tag in tags:
            self._tag_index[tag].add(key)
            await self.cache.sadd(f"tag:{tag}", key)

    async def invalidate_by_tag(self, tag: str):
        keys = await self.cache.smembers(f"tag:{tag}")
        if keys:
            await self.cache.delete(*keys)
            await self.cache.delete(f"tag:{tag}")

    # Pattern-based invalidation
    async def invalidate_by_pattern(self, pattern: str):
        cursor = 0
        while True:
            cursor, keys = await self.cache.scan(
                cursor,
                match=pattern,
                count=100
            )
            if keys:
                await self.cache.delete(*keys)
            if cursor == 0:
                break

    # Version-based invalidation
    async def get_versioned(self, key: str, version: int) -> Optional[Any]:
        versioned_key = f"{key}:v{version}"
        return await self.cache.get(versioned_key)

    async def set_versioned(
        self,
        key: str,
        value: Any,
        version: int,
        ttl: int = 3600
    ):
        versioned_key = f"{key}:v{version}"
        await self.cache.setex(versioned_key, ttl, json.dumps(value))

# Event-based invalidation with pub/sub
class EventBasedInvalidator:
    def __init__(self, cache_client, pubsub_client):
        self.cache = cache_client
        self.pubsub = pubsub_client

    async def start_listener(self):
        await self.pubsub.subscribe("cache:invalidate")

        async for message in self.pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                await self._handle_invalidation(data)

    async def _handle_invalidation(self, data: dict):
        if "key" in data:
            await self.cache.delete(data["key"])
        elif "pattern" in data:
            await self.invalidate_by_pattern(data["pattern"])
        elif "tag" in data:
            await self.invalidate_by_tag(data["tag"])

    async def publish_invalidation(self, **kwargs):
        await self.pubsub.publish(
            "cache:invalidate",
            json.dumps(kwargs)
        )

3. TTL and Expiration

import random
from datetime import datetime, timedelta

class TTLManager:
    def __init__(self, base_ttl: int = 3600):
        self.base_ttl = base_ttl

    def get_ttl_with_jitter(self, ttl: Optional[int] = None) -> int:
        """Add randomness to prevent synchronized expiration."""
        base = ttl or self.base_ttl
        jitter = random.uniform(-0.1, 0.1) * base
        return int(base + jitter)

    def get_sliding_ttl(
        self,
        last_access: datetime,
        min_ttl: int = 60,
        max_ttl: int = 3600
    ) -> int:
        """TTL based on access frequency."""
        age = (datetime.utcnow() - last_access).total_seconds()

        if age < 60:
            return max_ttl  # Frequently accessed
        elif age < 300:
            return max_ttl // 2
        else:
            return min_ttl

    def get_tiered_ttl(self, data_type: str) -> int:
        """Different TTL for different data types."""
        ttl_tiers = {
            "user_session": 86400,      # 1 day
            "user_profile": 3600,       # 1 hour
            "product_catalog": 300,     # 5 minutes
            "search_results": 60,       # 1 minute
            "real_time_data": 10,       # 10 seconds
        }
        return ttl_tiers.get(data_type, self.base_ttl)

# Sliding window expiration
class SlidingCache:
    def __init__(self, cache_client, default_ttl: int = 3600):
        self.cache = cache_client
        self.default_ttl = default_ttl

    async def get(self, key: str) -> Optional[Any]:
        value = await self.cache.get(key)
        if value:
            # Refresh TTL on access
            await self.cache.expire(key, self.default_ttl)
            return json.loads(value)
        return None

    async def set(self, key: str, value: Any, ttl: Optional[int] = None):
        await self.cache.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value)
        )

4. Redis/Memcached Patterns

import redis.asyncio as redis
from typing import List, Tuple

class RedisCache:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)

    # Hash operations for structured data
    async def set_hash(self, key: str, data: dict, ttl: int = 3600):
        await self.redis.hset(key, mapping=data)
        await self.redis.expire(key, ttl)

    async def get_hash(self, key: str) -> Optional[dict]:
        data = await self.redis.hgetall(key)
        return {k.decode(): v.decode() for k, v in data.items()} if data else None

    async def update_hash_field(self, key: str, field: str, value: str):
        await self.redis.hset(key, field, value)

    # Sorted sets for leaderboards/rankings
    async def add_to_ranking(self, key: str, member: str, score: float):
        await self.redis.zadd(key, {member: score})

    async def get_top_n(self, key: str, n: int) -> List[Tuple[str, float]]:
        return await self.redis.zrevrange(key, 0, n - 1, withscores=True)

    # Lists for queues
    async def push_to_queue(self, key: str, *values):
        await self.redis.lpush(key, *values)

    async def pop_from_queue(self, key: str, timeout: int = 0):
        return await self.redis.brpop(key, timeout=timeout)

    # Pub/Sub for cache invalidation
    async def publish(self, channel: str, message: str):
        await self.redis.publish(channel, message)

    # Lua scripts for atomic operations
    async def increment_with_cap(self, key: str, cap: int) -> int:
        script = """
        local current = redis.call('GET', KEYS[1])
        if current and tonumber(current) >= tonumber(ARGV[1]) then
            return -1
        end
        return redis.call('INCR', KEYS[1])
        """
        return await self.redis.eval(script, 1, key, cap)

    # Pipeline for batch operations
    async def batch_get(self, keys: List[str]) -> List[Optional[str]]:
        async with self.redis.pipeline() as pipe:
            for key in keys:
                pipe.get(key)
            return await pipe.execute()

    async def batch_set(
        self,
        items: List[Tuple[str, str]],
        ttl: int = 3600
    ):
        async with self.redis.pipeline() as pipe:
            for key, value in items:
                pipe.setex(key, ttl, value)
            await pipe.execute()

5. Cache Stampede Prevention

import asyncio
import hashlib
import time
from typing import Optional, Callable

class StampedeProtection:
    """Prevents cache stampede using various strategies."""

    def __init__(self, cache_client):
        self.cache = cache_client
        self._locks: Dict[str, asyncio.Lock] = {}

    # Strategy 1: Locking (prevents concurrent regeneration)
    async def get_with_lock(
        self,
        key: str,
        loader: Callable,
        ttl: int = 3600
    ) -> Any:
        cached = await self.cache.get(key)
        if cached:
            return json.loads(cached)

        # Get or create lock for this key
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()

        async with self._locks[key]:
            # Double-check after acquiring lock
            cached = await self.cache.get(key)
            if cached:
                return json.loads(cached)

            value = await loader()
            await self.cache.setex(key, ttl, json.dumps(value))
            return value

    # Strategy 2: Probabilistic early expiration
    async def get_with_early_recompute(
        self,
        key: str,
        loader: Callable,
        ttl: int = 3600,
        beta: float = 1.0
    ) -> Any:
        data = await self.cache.get(key)

        if data:
            cached = json.loads(data)
            expiry = cached.get("_expiry", 0)
            delta = cached.get("_delta", 0)

            # Probabilistic early recomputation
            now = time.time()
            if now - delta * beta * random.random() < expiry:
                return cached["value"]

        # Recompute
        start = time.time()
        value = await loader()
        delta = time.time() - start

        cache_data = {
            "value": value,
            "_expiry": time.time() + ttl,
            "_delta": delta
        }
        await self.cache.setex(key, ttl, json.dumps(cache_data))
        return value

    # Strategy 3: Stale-while-revalidate
    async def get_stale_while_revalidate(
        self,
        key: str,
        loader: Callable,
        ttl: int = 3600,
        stale_ttl: int = 300
    ) -> Any:
        data = await self.cache.get(key)

        if data:
            cached = json.loads(data)

            if cached.get("_fresh", True):
                return cached["value"]

            # Return stale, revalidate in background
            asyncio.create_task(self._revalidate(key, loader, ttl, stale_ttl))
            return cached["value"]

        return await self._load_and_cache(key, loader, ttl, stale_ttl)

    async def _revalidate(
        self,
        key: str,
        loader: Callable,
        ttl: int,
        stale_ttl: int
    ):
        lock_key = f"lock:{key}"
        acquired = await self.cache.setnx(lock_key, "1")

        if acquired:
            try:
                await self.cache.expire(lock_key, 30)
                await self._load_and_cache(key, loader, ttl, stale_ttl)
            finally:
                await self.cache.delete(lock_key)

    async def _load_and_cache(
        self,
        key: str,
        loader: Callable,
        ttl: int,
        stale_ttl: int
    ):
        value = await loader()

        cache_data = {"value": value, "_fresh": True}
        await self.cache.setex(key, ttl, json.dumps(cache_data))

        # Mark as stale after TTL
        async def mark_stale():
            await asyncio.sleep(ttl - stale_ttl)
            data = await self.cache.get(key)
            if data:
                cached = json.loads(data)
                cached["_fresh"] = False
                await self.cache.setex(key, stale_ttl, json.dumps(cached))

        asyncio.create_task(mark_stale())
        return value

6. Distributed Caching

import hashlib
from typing import List, Optional
import random

class ConsistentHashing:
    """Consistent hashing for distributed cache nodes."""

    def __init__(self, nodes: List[str], replicas: int = 100):
        self.replicas = replicas
        self.ring: Dict[int, str] = {}
        self.sorted_keys: List[int] = []

        for node in nodes:
            self.add_node(node)

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            self.sorted_keys.append(key)
        self.sorted_keys.sort()

    def remove_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            del self.ring[key]
            self.sorted_keys.remove(key)

    def get_node(self, key: str) -> str:
        if not self.ring:
            raise ValueError("No nodes in ring")

        h = self._hash(key)

        for ring_key in self.sorted_keys:
            if h <= ring_key:
                return self.ring[ring_key]

        return self.ring[self.sorted_keys[0]]

class DistributedCache:
    """Distributed cache with consistent hashing."""

    def __init__(self, nodes: List[str]):
        self.ring = ConsistentHashing(nodes)
        self.clients: Dict[str, RedisCache] = {
            node: RedisCache(node) for node in nodes
        }

    def _get_client(self, key: str) -> RedisCache:
        node = self.ring.get_node(key)
        return self.clients[node]

    async def get(self, key: str) -> Optional[str]:
        client = self._get_client(key)
        return await client.redis.get(key)

    async def set(self, key: str, value: str, ttl: int = 3600):
        client = self._get_client(key)
        await client.redis.setex(key, ttl, value)

    async def delete(self, key: str):
        client = self._get_client(key)
        await client.redis.delete(key)

    # Multi-get across nodes
    async def mget(self, keys: List[str]) -> Dict[str, Optional[str]]:
        # Group keys by node
        node_keys: Dict[str, List[str]] = defaultdict(list)
        for key in keys:
            node = self.ring.get_node(key)
            node_keys[node].append(key)

        # Fetch in parallel
        results = {}
        tasks = []

        for node, node_key_list in node_keys.items():
            client = self.clients[node]
            tasks.append(self._fetch_from_node(client, node_key_list, results))

        await asyncio.gather(*tasks)
        return results

    async def _fetch_from_node(
        self,
        client: RedisCache,
        keys: List[str],
        results: Dict
    ):
        values = await client.batch_get(keys)
        for key, value in zip(keys, values):
            results[key] = value

Best Practices

  1. Cache What Matters: Focus on frequently accessed, expensive-to-compute data.

  2. Set Appropriate TTLs: Balance freshness vs performance. Use jitter to prevent thundering herd.

  3. Handle Cache Failures Gracefully: Cache should enhance, not be required for operation.

  4. Monitor Cache Performance: Track hit rates, latency, and memory usage.

  5. Use Appropriate Data Structures: Hashes for objects, sorted sets for rankings, lists for queues.

  6. Implement Proper Invalidation: Prefer event-driven invalidation over TTL alone for critical data.

  7. Prevent Stampedes: Use locking, early recomputation, or stale-while-revalidate.

  8. Size Your Cache Appropriately: Monitor eviction rates and adjust size accordingly.

Examples

Complete Caching Layer

class CacheLayer:
    """Production-ready caching layer with multiple strategies."""

    def __init__(self, redis_url: str, default_ttl: int = 3600):
        self.redis = RedisCache(redis_url)
        self.stampede = StampedeProtection(self.redis.redis)
        self.ttl_manager = TTLManager(default_ttl)
        self.invalidator = CacheInvalidator(self.redis.redis)

    async def get_user(self, user_id: str) -> User:
        return await self.stampede.get_with_lock(
            f"user:{user_id}",
            lambda: self.database.get_user(user_id),
            ttl=self.ttl_manager.get_tiered_ttl("user_profile")
        )

    async def update_user(self, user_id: str, data: dict):
        await self.database.update_user(user_id, data)
        await self.invalidator.invalidate_by_tag(f"user:{user_id}")

    async def get_product_listing(self, category: str) -> List[Product]:
        return await self.stampede.get_stale_while_revalidate(
            f"products:{category}",
            lambda: self.database.get_products(category),
            ttl=self.ttl_manager.get_tiered_ttl("product_catalog"),
            stale_ttl=60
        )