Claude Code Plugins

Community-maintained marketplace

Feedback

llm-caching-patterns

@yonatangross/create-yg-app
1
0

Multi-level caching strategies for LLM applications - semantic caching (Redis), prompt caching (Claude/OpenAI native), cache hierarchies, cost optimization, and Langfuse cost tracking with hierarchical trace rollup for 70-95% cost reduction

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name llm-caching-patterns
description Multi-level caching strategies for LLM applications - semantic caching (Redis), prompt caching (Claude/OpenAI native), cache hierarchies, cost optimization, and Langfuse cost tracking with hierarchical trace rollup for 70-95% cost reduction
version 2.0.0
author YG Starter Template
tags llm, caching, redis, cost-optimization, semantic-cache, prompt-cache, langfuse, trace-hierarchy, 2025

LLM Caching Patterns

Overview

Modern LLM applications can reduce costs by 70-95% through intelligent multi-level caching. This skill covers the multi-tier caching architecture (2025 best practice): combining in-memory LRU, Redis semantic caching, and provider-native prompt caching for maximum efficiency.

Real-World Use Cases:

  • High-Volume Chatbot: Semantic cache for FAQ variations (80% cache hit rate)
  • Code Review Automation: Prompt cache for system instructions (90% savings)
  • Content Moderation: L1/L2 cache for repeat content detection
  • Multi-Agent Analysis: Hierarchical cache strategy across agents
  • Customer Support: Session-aware caching for context continuity

When to use this skill:

  • High-volume LLM applications with repeated queries
  • Cost-sensitive AI features
  • Similar query patterns (e.g., analyzing similar content types)
  • Applications requiring sub-second response times
  • Multi-agent systems with redundant LLM calls

Expected Impact:

  • L1 (LRU) Cache: 10-20% hit rate, ~1ms latency, 100% cost savings
  • L2 (Redis Semantic): 30-50% hit rate, ~5-10ms latency, 100% cost savings
  • L3 (Prompt Cache): 80-100% coverage, ~2s latency, 90% token cost savings
  • Combined: 70-95% total cost reduction

Core Concepts

Double Caching Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    CACHE HIERARCHY (2025 BEST PRACTICE)         │
└─────────────────────────────────────────────────────────────────┘

Request → L1 (Exact Hash) → L2 (Semantic) → L3 (Prompt) → L4 (Full LLM)
           ↓ Hit: ~1ms      ↓ Hit: ~10ms     ↓ Cached     ↓ Full Cost
         100% savings      100% savings      90% savings   $$$

L1: In-Memory LRU Cache
────────────────────────
• Exact content hash matching
• 1,000-10,000 entry size
• TTL: 5-10 minutes
• Use Case: Duplicate requests within session
• Implementation: Python functools.lru_cache or cachetools

L2: Redis Semantic Cache
─────────────────────────
• Vector similarity search (cosine distance < 0.08)
• Configurable similarity threshold (0.85-0.95)
• TTL: 1-24 hours
• Use Case: Similar but not identical queries
• Implementation: RedisVL SemanticCache + RediSearch

L3: Prompt Caching (Provider Native)
────────────────────────────────────
• Cache identical prompt PREFIXES (system prompts, examples)
• Claude: cache_control ephemeral markers
• GPT: Cached prefix automatically detected
• TTL: 5 minutes (auto-refresh on use)
• Use Case: Same prompts, different user content
• March 2025: Cache reads don't count against rate limits!

L4: Full LLM Call
─────────────────
• No cache hit - full generation required
• Store response in L2 and L1 for future hits
• Full token cost

Cache Decision Flow

async def get_llm_response(query: str, agent_type: str) -> dict:
    """Multi-level cache lookup."""

    # L1: Exact match (in-memory)
    cache_key = hash_content(query)
    if cache_key in lru_cache:
        return lru_cache[cache_key]  # ~1ms, 100% savings

    # L2: Semantic similarity (Redis)
    embedding = await embed_text(query)
    similar = await redis_cache.find_similar(
        embedding=embedding,
        agent_type=agent_type,
        threshold=0.92  # Configurable
    )
    if similar and similar.distance < 0.08:
        lru_cache[cache_key] = similar.response  # Promote to L1
        return similar.response  # ~10ms, 100% savings

    # L3 + L4: Prompt caching + LLM call
    # Prompt cache breakpoints reduce L4 cost by 90%
    response = await llm.generate(
        messages=build_cached_messages(
            system_prompt=AGENT_PROMPT,  # ← Cached
            examples=few_shot_examples,   # ← Cached
            user_content=query            # ← NOT cached
        )
    )

    # Store in L2 and L1
    await redis_cache.set(embedding, response, agent_type)
    lru_cache[cache_key] = response

    return response  # L3: ~2s, 90% savings | L4: ~3s, full cost

Similarity Threshold Tuning

Problem: How similar is "similar enough" to return a cached response?

Threshold Guidelines (cosine similarity):

  • 0.98-1.00 (distance 0.00-0.02): Nearly identical - safe to return
  • 0.95-0.98 (distance 0.02-0.05): Very similar - usually safe
  • 0.92-0.95 (distance 0.05-0.08): Similar - validate with reranking
  • 0.85-0.92 (distance 0.08-0.15): Moderately similar - risky
  • < 0.85 (distance > 0.15): Different - do not return

Recommended Starting Point: 0.92 (distance < 0.08)

Tuning Process:

  1. Start at 0.92 threshold
  2. Monitor false positives (wrong cached responses)
  3. Monitor false negatives (cache misses that should've hit)
  4. Adjust threshold based on precision/recall tradeoff
  5. Different thresholds per agent type (security=0.95, general=0.90)

Cache Warming Strategy

Pre-populate cache from golden dataset for instant hit rates:

async def warm_cache_from_golden_dataset(
    cache: SemanticCache,
    min_quality: float = 0.8
) -> int:
    """Warm cache with high-quality historical responses."""

    # Load golden dataset analyses
    analyses = await db.query(
        "SELECT * FROM analyses WHERE confidence_score >= ?",
        (min_quality,)
    )

    warmed = 0
    for analysis in analyses:
        # Extract agent findings
        for finding in analysis.findings:
            await cache.set(
                content=analysis.content,
                response=finding.output,
                agent_type=finding.agent_type,
                quality_score=finding.confidence_score
            )
            warmed += 1

    return warmed

Redis Semantic Cache Implementation

Schema Design

# RedisVL Index Schema
CACHE_INDEX_SCHEMA = {
    "index": {
        "name": "llm_semantic_cache",
        "prefix": "cache:",
    },
    "fields": [
        {"name": "agent_type", "type": "tag"},
        {"name": "content_type", "type": "tag"},
        {"name": "input_hash", "type": "tag"},
        {
            "name": "embedding",
            "type": "vector",
            "attrs": {
                "dims": 1536,  # OpenAI text-embedding-3-small
                "distance_metric": "cosine",
                "algorithm": "hnsw",  # Fast approximate search
            }
        },
        {"name": "response", "type": "text"},
        {"name": "created_at", "type": "numeric"},
        {"name": "hit_count", "type": "numeric"},
        {"name": "quality_score", "type": "numeric"},
    ]
}

Service Class

from redisvl.index import SearchIndex
from redisvl.query import VectorQuery
from redis import Redis

class SemanticCacheService:
    """Redis semantic cache for LLM responses."""

    def __init__(self, redis_url: str, similarity_threshold: float = 0.92):
        self.client = Redis.from_url(redis_url)
        self.threshold = similarity_threshold
        self.embedding_service = EmbeddingService()

        # Initialize RedisVL index
        schema = IndexSchema.from_dict(CACHE_INDEX_SCHEMA)
        self.index = SearchIndex(schema, self.client)
        self.index.create(overwrite=False)

    async def get(
        self,
        content: str,
        agent_type: str,
        content_type: str | None = None
    ) -> CacheEntry | None:
        """Look up cached response by semantic similarity."""

        # Generate embedding
        embedding = await self.embedding_service.embed_text(content[:2000])

        # Build query with filters
        filter_expr = f"@agent_type:{{{agent_type}}}"
        if content_type:
            filter_expr += f" @content_type:{{{content_type}}}"

        query = VectorQuery(
            vector=embedding,
            vector_field_name="embedding",
            return_fields=["response", "quality_score", "hit_count"],
            num_results=1,
            filter_expression=filter_expr
        )

        results = self.index.query(query)

        if results and len(results) > 0:
            result = results[0]
            distance = float(result.get("vector_distance", 1.0))

            # Check similarity threshold
            if distance <= (1 - self.threshold):
                # Increment hit count
                self.client.hincrby(result["id"], "hit_count", 1)

                return CacheEntry(
                    response=json.loads(result["response"]),
                    quality_score=float(result["quality_score"]),
                    hit_count=int(result["hit_count"]),
                    distance=distance
                )

        return None

    async def set(
        self,
        content: str,
        response: dict,
        agent_type: str,
        content_type: str | None = None,
        quality_score: float = 1.0
    ) -> None:
        """Store response in cache."""
        content_preview = content[:2000]
        embedding = await self.embedding_service.embed_text(content_preview)

        key = f"cache:{agent_type}:{hash_content(content_preview)}"

        data = {
            "agent_type": agent_type,
            "content_type": content_type or "",
            "input_hash": hash_content(content_preview),
            "embedding": embedding,
            "response": json.dumps(response),
            "created_at": time.time(),
            "hit_count": 0,
            "quality_score": quality_score,
        }

        self.client.hset(key, mapping=data)
        self.client.expire(key, ttl=86400)  # 24 hours

Prompt Caching (Claude Native)

Cache Breakpoint Strategy

class PromptCacheManager:
    """Manage Claude prompt caching with cache breakpoints."""

    def build_cached_messages(
        self,
        system_prompt: str,
        few_shot_examples: str | None = None,
        schema_prompt: str | None = None,
        dynamic_content: str = ""
    ) -> list[dict]:
        """Build messages with cache breakpoints.

        Cache structure:
        1. System prompt (always cached)
        2. Few-shot examples (cached per content type)
        3. Schema documentation (always cached)
        ──────────────── CACHE BREAKPOINT ────────────────
        4. Dynamic content (NEVER cached)
        """

        content_parts = []

        # Breakpoint 1: System prompt
        content_parts.append({
            "type": "text",
            "text": system_prompt,
            "cache_control": {"type": "ephemeral"}
        })

        # Breakpoint 2: Few-shot examples (if provided)
        if few_shot_examples:
            content_parts.append({
                "type": "text",
                "text": few_shot_examples,
                "cache_control": {"type": "ephemeral"}
            })

        # Breakpoint 3: Schema documentation (if provided)
        if schema_prompt:
            content_parts.append({
                "type": "text",
                "text": schema_prompt,
                "cache_control": {"type": "ephemeral"}
            })

        # Dynamic content (NOT cached)
        content_parts.append({
            "type": "text",
            "text": dynamic_content
        })

        return [{"role": "user", "content": content_parts}]

Cost Calculation

Without Prompt Caching:
─────────────────────────
System prompt:    2,000 tokens @ $3/MTok = $0.006
Few-shot examples: 5,000 tokens @ $3/MTok = $0.015
Schema docs:      1,000 tokens @ $3/MTok = $0.003
User content:    10,000 tokens @ $3/MTok = $0.030
────────────────────────────────────────
Total input:     18,000 tokens           = $0.054 per request

With Prompt Caching (90% hit rate):
───────────────────────────────────
Cached prefix:    8,000 tokens @ $0.30/MTok = $0.0024 (cache read)
User content:    10,000 tokens @ $3/MTok    = $0.0300
────────────────────────────────────────
Total:           18,000 tokens              = $0.0324 per request

Savings: 40% per request

With Semantic Cache (35% hit rate) + Prompt Cache:
──────────────────────────────────────────────────
35% requests: $0.00 (semantic cache hit)
65% requests: $0.0324 (prompt cache benefit)
Average: $0.021 per request

Total Savings: 61% vs no caching

Optimization Techniques

1. LLM Reranking (Optional)

For higher precision, rerank top-k semantic cache candidates:

async def get_with_reranking(
    query: str,
    agent_type: str,
    top_k: int = 3
) -> CacheEntry | None:
    """Retrieve with LLM reranking for better precision."""

    # Get top-k candidates
    candidates = await semantic_cache.get_topk(query, agent_type, k=top_k)

    if not candidates:
        return None

    # Use lightweight model to rerank
    rerank_prompt = f"""
    Query: {query}

    Rank these cached responses by relevance (1 = most relevant):
    {format_candidates(candidates)}
    """

    ranking = await lightweight_llm.rank(rerank_prompt)
    best_candidate = candidates[ranking[0]]

    if best_candidate.score > 0.8:  # Rerank threshold
        return best_candidate

    return None

2. Metadata Filtering

Filter before vector search to improve precision:

# Good: Filter by agent_type + content_type
query = VectorQuery(
    vector=embedding,
    filter_expression="@agent_type:{security_auditor} @content_type:{article}"
)

# Better: Add difficulty level
query = VectorQuery(
    vector=embedding,
    filter_expression="""
        @agent_type:{security_auditor}
        @content_type:{article}
        @difficulty_level:{advanced}
    """
)

3. Quality-Based Eviction

Prioritize keeping high-quality responses:

async def evict_low_quality_entries(cache: SemanticCache, max_size: int):
    """Evict low-quality entries when cache is full."""

    # Get all entries sorted by quality score
    entries = await cache.get_all_sorted_by_quality()

    if len(entries) > max_size:
        # Keep top N by quality, evict rest
        to_evict = entries[max_size:]
        for entry in to_evict:
            await cache.delete(entry.key)

4. Dynamic Threshold Adjustment

Adjust similarity threshold based on cache hit rate:

class AdaptiveThresholdManager:
    """Dynamically adjust threshold based on metrics."""

    def __init__(self, target_hit_rate: float = 0.35):
        self.target = target_hit_rate
        self.threshold = 0.92

    async def adjust(self, actual_hit_rate: float):
        """Adjust threshold to reach target hit rate."""

        if actual_hit_rate < self.target - 0.05:
            # Too many misses, lower threshold (more permissive)
            self.threshold = max(0.85, self.threshold - 0.01)
        elif actual_hit_rate > self.target + 0.05:
            # Too many hits (possibly false positives), raise threshold
            self.threshold = min(0.98, self.threshold + 0.01)

        logger.info(f"Adjusted threshold to {self.threshold}")

Monitoring & Observability

Key Metrics

@dataclass
class CacheMetrics:
    """Track cache performance."""

    # Hit rates
    l1_hit_rate: float
    l2_hit_rate: float
    l3_hit_rate: float
    combined_hit_rate: float

    # Latency
    l1_avg_latency_ms: float
    l2_avg_latency_ms: float
    l3_avg_latency_ms: float
    l4_avg_latency_ms: float

    # Cost
    estimated_cost_saved_usd: float
    total_requests: int

    # Quality
    false_positive_rate: float  # Wrong cached responses
    false_negative_rate: float  # Missed valid cache hits

Langfuse Cost Tracking (2025 Best Practice)

Langfuse automatically tracks token usage and costs for all LLM calls. This eliminates manual cost calculation and provides accurate cost attribution.

Automatic Cost Tracking with Custom Trace IDs

from langfuse.decorators import observe, langfuse_context
from uuid import UUID

@observe(as_type="generation")
async def call_llm_with_cache(
    prompt: str,
    agent_type: str,
    analysis_id: UUID | None = None
) -> str:
    """LLM call with automatic cost tracking via Langfuse.

    CRITICAL: Always link to parent trace for cost attribution!
    """

    # Link to parent analysis trace (for cost rollup)
    if analysis_id:
        langfuse_context.update_current_trace(
            name=f"{agent_type}_generation",
            session_id=str(analysis_id),  # Group by analysis
            tags=[agent_type, "cached"],
            metadata={"analysis_id": str(analysis_id)}
        )

    # Langfuse decorator automatically:
    # 1. Captures input/output tokens
    # 2. Calculates costs using model pricing
    # 3. Tags with agent_type for cost attribution
    # 4. Records cache hit/miss status

    # L1: Check exact cache
    cache_key = hash_content(prompt)
    if cache_key in lru_cache:
        # Mark as cache hit (zero cost)
        langfuse_context.update_current_observation(
            metadata={"cache_layer": "L1", "cache_hit": True}
        )
        return lru_cache[cache_key]

    # L2: Check semantic cache
    embedding = await embed_text(prompt)
    similar = await redis_cache.find_similar(embedding, agent_type)
    if similar:
        langfuse_context.update_current_observation(
            metadata={"cache_layer": "L2", "cache_hit": True, "distance": similar.distance}
        )
        return similar.response

    # L3/L4: LLM call with prompt caching
    # Langfuse automatically tracks token usage and cost
    response = await llm.generate(
        messages=build_cached_messages(prompt),
        model="claude-3-5-sonnet-20241022"
    )

    # Langfuse records:
    # - input_tokens (total)
    # - output_tokens
    # - cache_creation_input_tokens (prompt cache breakpoints)
    # - cache_read_input_tokens (cached prefix tokens)
    # - total_cost (calculated from model pricing)

    langfuse_context.update_current_observation(
        metadata={
            "cache_layer": "L3/L4",
            "cache_hit": False,
            "prompt_cache_hit": response.usage.cache_read_input_tokens > 0
        }
    )

    # Store in L2 and L1 for future hits
    await redis_cache.set(embedding, response.content, agent_type)
    lru_cache[cache_key] = response.content

    return response.content

Trace Hierarchy for Cost Attribution (Production Pattern)

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from uuid import uuid4, UUID

class CodeReviewWorkflow:
    """Multi-agent code review with hierarchical cost tracking."""

    @observe(as_type="trace")
    async def run_code_review(self, pr_id: int, diff: str, review_id: UUID) -> dict:
        """Parent trace - aggregates all child agent costs.

        Trace Hierarchy:
        run_code_review (trace)
        ├── security_scan_generation (generation)
        ├── performance_analysis_generation (generation)
        ├── style_check_generation (generation)
        ├── test_coverage_generation (generation)
        └── synthesis_generation (generation)

        Langfuse automatically rolls up costs to parent trace.
        """

        # Set trace metadata for filtering/grouping
        langfuse_context.update_current_trace(
            name="code_review",
            session_id=str(review_id),
            user_id=f"pr_{pr_id}",  # Group by PR for analysis
            tags=["multi-agent", "production", "code-review"],
            metadata={
                "review_id": str(review_id),
                "pr_id": pr_id,
                "agent_count": 5,
                "diff_size": len(diff)
            }
        )

        # Each review agent creates a child generation
        findings = {}
        for agent in self.review_agents:
            # Child generation auto-linked to parent trace
            result = await self.run_review_agent(
                agent=agent,
                code_diff=diff,
                review_id=review_id  # Links to parent
            )
            findings[agent.name] = result

        # Synthesis also tracked as child generation
        synthesis = await self.synthesize_review(
            findings=findings,
            review_id=review_id
        )

        # Langfuse dashboard shows:
        # - Total cost for this review (sum of all child generations)
        # - Token breakdown by review agent type
        # - Cache hit rate per agent
        # - Latency per agent
        # - Quality score trends

        return {"findings": findings, "synthesis": synthesis, "approved": synthesis.approved}

    @observe(as_type="generation")
    async def run_agent(
        self,
        agent: Agent,
        content: str,
        analysis_id: UUID
    ) -> dict:
        """Child generation - costs roll up to parent trace."""

        langfuse_context.update_current_observation(
            name=f"{agent.name}_generation",
            metadata={
                "agent_type": agent.name,
                "content_length": len(content)
            }
        )

        # LLM call automatically tracked
        response = await agent.analyze(content)

        return response

Cost Rollup Query Pattern

from langfuse import Langfuse
from datetime import datetime, timedelta

async def get_analysis_costs(analysis_id: UUID) -> dict:
    """Get total cost for an analysis (parent trace + all child generations)."""

    langfuse = Langfuse()

    # Fetch parent trace by session_id
    traces = langfuse.get_traces(
        session_id=str(analysis_id),
        limit=1
    )

    if not traces.data:
        return {"error": "Trace not found"}

    trace = traces.data[0]

    # Langfuse automatically aggregates child costs
    return {
        "trace_id": trace.id,
        "total_cost": trace.total_cost,  # Sum of all child generations
        "input_tokens": trace.usage.input_tokens,
        "output_tokens": trace.usage.output_tokens,
        "cache_read_tokens": trace.usage.cache_read_input_tokens,
        "observations_count": trace.observation_count,  # Number of child LLM calls
        "latency_ms": trace.latency,
        "created_at": trace.timestamp
    }

async def get_daily_costs_by_agent() -> list[dict]:
    """Get cost breakdown by agent type for last 30 days."""

    langfuse = Langfuse()

    # Fetch all generations from last 30 days
    from_date = datetime.now() - timedelta(days=30)
    generations = langfuse.get_generations(
        from_timestamp=from_date,
        limit=10000
    )

    # Group by agent type (from metadata)
    costs_by_agent = {}
    for gen in generations.data:
        agent_type = gen.metadata.get("agent_type", "unknown")
        cost = gen.calculated_total_cost or 0.0

        if agent_type not in costs_by_agent:
            costs_by_agent[agent_type] = {
                "agent_type": agent_type,
                "total_cost": 0.0,
                "call_count": 0,
                "total_input_tokens": 0,
                "total_output_tokens": 0,
                "cache_hits": 0
            }

        costs_by_agent[agent_type]["total_cost"] += cost
        costs_by_agent[agent_type]["call_count"] += 1
        costs_by_agent[agent_type]["total_input_tokens"] += gen.usage.input or 0
        costs_by_agent[agent_type]["total_output_tokens"] += gen.usage.output or 0

        if gen.metadata.get("cache_hit"):
            costs_by_agent[agent_type]["cache_hits"] += 1

    # Calculate averages
    results = []
    for stats in costs_by_agent.values():
        stats["avg_cost_per_call"] = stats["total_cost"] / stats["call_count"]
        stats["cache_hit_rate"] = stats["cache_hits"] / stats["call_count"]
        results.append(stats)

    # Sort by total cost descending
    results.sort(key=lambda x: x["total_cost"], reverse=True)

    return results

Cost Attribution by Agent Type

# Langfuse dashboard query:
# GROUP BY metadata.agent_type
# SUM(total_cost) AS cost_per_agent
#
# Results show:
# - security_auditor: $12.45 (35% cache hit rate)
# - implementation_planner: $8.23 (42% cache hit rate)
# - tech_comparator: $5.67 (58% cache hit rate)

Cache Effectiveness Analysis

from langfuse import Langfuse

langfuse = Langfuse()

# Query all generations with cache metadata
generations = langfuse.get_generations(
    limit=1000,
    from_timestamp=datetime.now() - timedelta(days=7)
)

cache_hits = 0
cache_misses = 0
total_cost = 0.0
cost_saved = 0.0

for gen in generations:
    metadata = gen.metadata or {}
    is_cache_hit = metadata.get("cache_hit", False)

    if is_cache_hit:
        cache_hits += 1
        # Estimate saved cost (cost of equivalent full LLM call)
        cost_saved += gen.calculated_total_cost or 0  # Would be higher without cache
    else:
        cache_misses += 1
        total_cost += gen.calculated_total_cost or 0

hit_rate = cache_hits / (cache_hits + cache_misses)
print(f"Cache Hit Rate: {hit_rate:.1%}")
print(f"Cost Saved: ${cost_saved:.2f}")
print(f"Total Cost: ${total_cost:.2f}")
print(f"Savings Rate: {(cost_saved / (cost_saved + total_cost)):.1%}")

Model Pricing Registry

from dataclasses import dataclass

@dataclass
class ModelInfo:
    """Model configuration with pricing."""

    model_id: str
    display_name: str
    max_tokens: int
    input_cost_per_1m: float  # USD per 1M input tokens
    output_cost_per_1m: float  # USD per 1M output tokens

    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate total cost for token usage."""
        input_cost = (input_tokens / 1_000_000) * self.input_cost_per_1m
        output_cost = (output_tokens / 1_000_000) * self.output_cost_per_1m
        return input_cost + output_cost

# Claude 3.5 Sonnet (Updated March 2025)
MODEL_REGISTRY = {
    "claude-3-5-sonnet-20241022": ModelInfo(
        model_id="claude-3-5-sonnet-20241022",
        display_name="Claude 3.5 Sonnet (New)",
        max_tokens=8192,
        input_cost_per_1m=3.00,  # $3 per 1M tokens
        output_cost_per_1m=15.00,  # $15 per 1M tokens
    ),
    "gpt-4-turbo-2024-04-09": ModelInfo(
        model_id="gpt-4-turbo-2024-04-09",
        display_name="GPT-4 Turbo",
        max_tokens=4096,
        input_cost_per_1m=10.00,
        output_cost_per_1m=30.00,
    ),
}

Langfuse Dashboard Views

Access cost insights at http://localhost:3000:

Cost Dashboard:

  • Total cost by day/week/month
  • Cost breakdown by model
  • Cost attribution by agent type
  • Cache hit rate impact on costs
  • Top 10 most expensive traces

Cache Effectiveness:

  • L1/L2/L3 hit rates over time
  • Cost savings from semantic cache
  • Cost savings from prompt cache
  • False positive rate (wrong cache hits)

Agent Performance:

  • Average cost per agent invocation
  • Token usage distribution
  • Cache hit rate by agent type
  • Quality score vs. cost correlation

RedisInsight Dashboard

Access Redis cache visualization at http://localhost:8001:

  • View cache entries
  • Monitor vector similarity distributions
  • Track hit/miss rates by agent type
  • Analyze quality score distributions
  • Identify hot keys

Local Model Considerations (Ollama)

When using local models via Ollama, the caching calculus changes:

Cost Impact:

Provider Caching Value Reason
Cloud APIs Critical $3-15 per MTok
Ollama Local Optional FREE per token

When to still cache with Ollama:

  • Latency reduction: Cache provides ~1-10ms vs ~50-200ms for local inference
  • Memory pressure: Avoid loading multiple models for repeated queries
  • Batch CI runs: Same queries across test runs benefit from L1 cache

Simplified Cache Strategy for Local:

# With Ollama, L1 (LRU) cache is usually sufficient
# Skip L2 (Redis semantic) unless latency-critical

async def get_local_llm_response(query: str) -> str:
    # L1: Exact match only (sufficient for local)
    cache_key = hash_content(query)
    if cache_key in lru_cache:
        return lru_cache[cache_key]  # ~1ms

    # Direct local inference (FREE, fast enough)
    response = await ollama_provider.ainvoke(query)  # ~50-200ms

    # Store in L1 only
    lru_cache[cache_key] = response.content
    return response.content

Best Practice: Use factory pattern to apply full caching hierarchy only for cloud APIs:

if settings.OLLAMA_ENABLED:
    # Minimal caching for local models
    return LocalCacheStrategy(l1_only=True)
else:
    # Full L1/L2/L3 caching for cloud APIs
    return CloudCacheStrategy(l1=True, l2=True, l3=True)

See ai-native-development skill section "10. Local LLM Inference with Ollama" for provider setup.


References

Integration Examples

See:

  • references/redis-setup.md - Docker Compose + RedisVL setup
  • references/cache-hierarchy.md - Multi-level cache implementation
  • references/cost-optimization.md - ROI calculations and benchmarks
  • templates/semantic-cache-service.py - Production-ready service
  • templates/prompt-cache-wrapper.py - Claude caching wrapper
  • examples/project-integration.md - this project specific patterns

Skill Version: 1.3.0 Last Updated: 2025-12-28 Maintained by: this project AI Agent Hub

Changelog

v1.3.0 (2025-12-28)

  • Added "Local Model Considerations (Ollama)" section
  • Added cost comparison table for cloud vs local caching value
  • Added simplified caching strategy for local models
  • Added factory pattern example for adaptive caching
  • Cross-referenced ai-native-development skill for Ollama setup

v1.2.0 (2025-12-27)

  • Added hierarchical trace pattern for multi-agent cost rollup
  • Added session_id linking pattern for cost attribution to parent analysis
  • Added cost rollup query patterns with Langfuse API
  • Added daily cost breakdown by agent type example
  • Updated automatic cost tracking with custom trace ID support
  • Added this project-specific multi-agent workflow cost tracking pattern

v1.1.0 (2025-12-27)

  • Added comprehensive Langfuse cost tracking section
  • Added automatic cost tracking with @observe decorator
  • Added cost attribution by agent type patterns
  • Added cache effectiveness analysis with Langfuse API
  • Added model pricing registry with calculate_cost() method
  • Added Langfuse dashboard views for cost insights
  • Updated monitoring section with cost tracking best practices

v1.0.0 (2025-12-14)

  • Initial skill with double caching architecture (L1/L2/L3/L4)
  • Redis semantic cache implementation with RedisVL
  • Claude prompt caching patterns
  • Cache warming strategies
  • Similarity threshold tuning guidelines
  • Optimization techniques (reranking, metadata filtering, quality-based eviction)