| name | llm-caching-patterns |
| description | Multi-level caching strategies for LLM applications - semantic caching (Redis), prompt caching (Claude/OpenAI native), cache hierarchies, cost optimization, and Langfuse cost tracking with hierarchical trace rollup for 70-95% cost reduction |
| version | 2.0.0 |
| author | YG Starter Template |
| tags | llm, caching, redis, cost-optimization, semantic-cache, prompt-cache, langfuse, trace-hierarchy, 2025 |
LLM Caching Patterns
Overview
Modern LLM applications can reduce costs by 70-95% through intelligent multi-level caching. This skill covers the multi-tier caching architecture (2025 best practice): combining in-memory LRU, Redis semantic caching, and provider-native prompt caching for maximum efficiency.
Real-World Use Cases:
- High-Volume Chatbot: Semantic cache for FAQ variations (80% cache hit rate)
- Code Review Automation: Prompt cache for system instructions (90% savings)
- Content Moderation: L1/L2 cache for repeat content detection
- Multi-Agent Analysis: Hierarchical cache strategy across agents
- Customer Support: Session-aware caching for context continuity
When to use this skill:
- High-volume LLM applications with repeated queries
- Cost-sensitive AI features
- Similar query patterns (e.g., analyzing similar content types)
- Applications requiring sub-second response times
- Multi-agent systems with redundant LLM calls
Expected Impact:
- L1 (LRU) Cache: 10-20% hit rate, ~1ms latency, 100% cost savings
- L2 (Redis Semantic): 30-50% hit rate, ~5-10ms latency, 100% cost savings
- L3 (Prompt Cache): 80-100% coverage, ~2s latency, 90% token cost savings
- Combined: 70-95% total cost reduction
Core Concepts
Double Caching Architecture
┌─────────────────────────────────────────────────────────────────┐
│ CACHE HIERARCHY (2025 BEST PRACTICE) │
└─────────────────────────────────────────────────────────────────┘
Request → L1 (Exact Hash) → L2 (Semantic) → L3 (Prompt) → L4 (Full LLM)
↓ Hit: ~1ms ↓ Hit: ~10ms ↓ Cached ↓ Full Cost
100% savings 100% savings 90% savings $$$
L1: In-Memory LRU Cache
────────────────────────
• Exact content hash matching
• 1,000-10,000 entry size
• TTL: 5-10 minutes
• Use Case: Duplicate requests within session
• Implementation: Python functools.lru_cache or cachetools
L2: Redis Semantic Cache
─────────────────────────
• Vector similarity search (cosine distance < 0.08)
• Configurable similarity threshold (0.85-0.95)
• TTL: 1-24 hours
• Use Case: Similar but not identical queries
• Implementation: RedisVL SemanticCache + RediSearch
L3: Prompt Caching (Provider Native)
────────────────────────────────────
• Cache identical prompt PREFIXES (system prompts, examples)
• Claude: cache_control ephemeral markers
• GPT: Cached prefix automatically detected
• TTL: 5 minutes (auto-refresh on use)
• Use Case: Same prompts, different user content
• March 2025: Cache reads don't count against rate limits!
L4: Full LLM Call
─────────────────
• No cache hit - full generation required
• Store response in L2 and L1 for future hits
• Full token cost
Cache Decision Flow
async def get_llm_response(query: str, agent_type: str) -> dict:
"""Multi-level cache lookup."""
# L1: Exact match (in-memory)
cache_key = hash_content(query)
if cache_key in lru_cache:
return lru_cache[cache_key] # ~1ms, 100% savings
# L2: Semantic similarity (Redis)
embedding = await embed_text(query)
similar = await redis_cache.find_similar(
embedding=embedding,
agent_type=agent_type,
threshold=0.92 # Configurable
)
if similar and similar.distance < 0.08:
lru_cache[cache_key] = similar.response # Promote to L1
return similar.response # ~10ms, 100% savings
# L3 + L4: Prompt caching + LLM call
# Prompt cache breakpoints reduce L4 cost by 90%
response = await llm.generate(
messages=build_cached_messages(
system_prompt=AGENT_PROMPT, # ← Cached
examples=few_shot_examples, # ← Cached
user_content=query # ← NOT cached
)
)
# Store in L2 and L1
await redis_cache.set(embedding, response, agent_type)
lru_cache[cache_key] = response
return response # L3: ~2s, 90% savings | L4: ~3s, full cost
Similarity Threshold Tuning
Problem: How similar is "similar enough" to return a cached response?
Threshold Guidelines (cosine similarity):
- 0.98-1.00 (distance 0.00-0.02): Nearly identical - safe to return
- 0.95-0.98 (distance 0.02-0.05): Very similar - usually safe
- 0.92-0.95 (distance 0.05-0.08): Similar - validate with reranking
- 0.85-0.92 (distance 0.08-0.15): Moderately similar - risky
- < 0.85 (distance > 0.15): Different - do not return
Recommended Starting Point: 0.92 (distance < 0.08)
Tuning Process:
- Start at 0.92 threshold
- Monitor false positives (wrong cached responses)
- Monitor false negatives (cache misses that should've hit)
- Adjust threshold based on precision/recall tradeoff
- Different thresholds per agent type (security=0.95, general=0.90)
Cache Warming Strategy
Pre-populate cache from golden dataset for instant hit rates:
async def warm_cache_from_golden_dataset(
cache: SemanticCache,
min_quality: float = 0.8
) -> int:
"""Warm cache with high-quality historical responses."""
# Load golden dataset analyses
analyses = await db.query(
"SELECT * FROM analyses WHERE confidence_score >= ?",
(min_quality,)
)
warmed = 0
for analysis in analyses:
# Extract agent findings
for finding in analysis.findings:
await cache.set(
content=analysis.content,
response=finding.output,
agent_type=finding.agent_type,
quality_score=finding.confidence_score
)
warmed += 1
return warmed
Redis Semantic Cache Implementation
Schema Design
# RedisVL Index Schema
CACHE_INDEX_SCHEMA = {
"index": {
"name": "llm_semantic_cache",
"prefix": "cache:",
},
"fields": [
{"name": "agent_type", "type": "tag"},
{"name": "content_type", "type": "tag"},
{"name": "input_hash", "type": "tag"},
{
"name": "embedding",
"type": "vector",
"attrs": {
"dims": 1536, # OpenAI text-embedding-3-small
"distance_metric": "cosine",
"algorithm": "hnsw", # Fast approximate search
}
},
{"name": "response", "type": "text"},
{"name": "created_at", "type": "numeric"},
{"name": "hit_count", "type": "numeric"},
{"name": "quality_score", "type": "numeric"},
]
}
Service Class
from redisvl.index import SearchIndex
from redisvl.query import VectorQuery
from redis import Redis
class SemanticCacheService:
"""Redis semantic cache for LLM responses."""
def __init__(self, redis_url: str, similarity_threshold: float = 0.92):
self.client = Redis.from_url(redis_url)
self.threshold = similarity_threshold
self.embedding_service = EmbeddingService()
# Initialize RedisVL index
schema = IndexSchema.from_dict(CACHE_INDEX_SCHEMA)
self.index = SearchIndex(schema, self.client)
self.index.create(overwrite=False)
async def get(
self,
content: str,
agent_type: str,
content_type: str | None = None
) -> CacheEntry | None:
"""Look up cached response by semantic similarity."""
# Generate embedding
embedding = await self.embedding_service.embed_text(content[:2000])
# Build query with filters
filter_expr = f"@agent_type:{{{agent_type}}}"
if content_type:
filter_expr += f" @content_type:{{{content_type}}}"
query = VectorQuery(
vector=embedding,
vector_field_name="embedding",
return_fields=["response", "quality_score", "hit_count"],
num_results=1,
filter_expression=filter_expr
)
results = self.index.query(query)
if results and len(results) > 0:
result = results[0]
distance = float(result.get("vector_distance", 1.0))
# Check similarity threshold
if distance <= (1 - self.threshold):
# Increment hit count
self.client.hincrby(result["id"], "hit_count", 1)
return CacheEntry(
response=json.loads(result["response"]),
quality_score=float(result["quality_score"]),
hit_count=int(result["hit_count"]),
distance=distance
)
return None
async def set(
self,
content: str,
response: dict,
agent_type: str,
content_type: str | None = None,
quality_score: float = 1.0
) -> None:
"""Store response in cache."""
content_preview = content[:2000]
embedding = await self.embedding_service.embed_text(content_preview)
key = f"cache:{agent_type}:{hash_content(content_preview)}"
data = {
"agent_type": agent_type,
"content_type": content_type or "",
"input_hash": hash_content(content_preview),
"embedding": embedding,
"response": json.dumps(response),
"created_at": time.time(),
"hit_count": 0,
"quality_score": quality_score,
}
self.client.hset(key, mapping=data)
self.client.expire(key, ttl=86400) # 24 hours
Prompt Caching (Claude Native)
Cache Breakpoint Strategy
class PromptCacheManager:
"""Manage Claude prompt caching with cache breakpoints."""
def build_cached_messages(
self,
system_prompt: str,
few_shot_examples: str | None = None,
schema_prompt: str | None = None,
dynamic_content: str = ""
) -> list[dict]:
"""Build messages with cache breakpoints.
Cache structure:
1. System prompt (always cached)
2. Few-shot examples (cached per content type)
3. Schema documentation (always cached)
──────────────── CACHE BREAKPOINT ────────────────
4. Dynamic content (NEVER cached)
"""
content_parts = []
# Breakpoint 1: System prompt
content_parts.append({
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"}
})
# Breakpoint 2: Few-shot examples (if provided)
if few_shot_examples:
content_parts.append({
"type": "text",
"text": few_shot_examples,
"cache_control": {"type": "ephemeral"}
})
# Breakpoint 3: Schema documentation (if provided)
if schema_prompt:
content_parts.append({
"type": "text",
"text": schema_prompt,
"cache_control": {"type": "ephemeral"}
})
# Dynamic content (NOT cached)
content_parts.append({
"type": "text",
"text": dynamic_content
})
return [{"role": "user", "content": content_parts}]
Cost Calculation
Without Prompt Caching:
─────────────────────────
System prompt: 2,000 tokens @ $3/MTok = $0.006
Few-shot examples: 5,000 tokens @ $3/MTok = $0.015
Schema docs: 1,000 tokens @ $3/MTok = $0.003
User content: 10,000 tokens @ $3/MTok = $0.030
────────────────────────────────────────
Total input: 18,000 tokens = $0.054 per request
With Prompt Caching (90% hit rate):
───────────────────────────────────
Cached prefix: 8,000 tokens @ $0.30/MTok = $0.0024 (cache read)
User content: 10,000 tokens @ $3/MTok = $0.0300
────────────────────────────────────────
Total: 18,000 tokens = $0.0324 per request
Savings: 40% per request
With Semantic Cache (35% hit rate) + Prompt Cache:
──────────────────────────────────────────────────
35% requests: $0.00 (semantic cache hit)
65% requests: $0.0324 (prompt cache benefit)
Average: $0.021 per request
Total Savings: 61% vs no caching
Optimization Techniques
1. LLM Reranking (Optional)
For higher precision, rerank top-k semantic cache candidates:
async def get_with_reranking(
query: str,
agent_type: str,
top_k: int = 3
) -> CacheEntry | None:
"""Retrieve with LLM reranking for better precision."""
# Get top-k candidates
candidates = await semantic_cache.get_topk(query, agent_type, k=top_k)
if not candidates:
return None
# Use lightweight model to rerank
rerank_prompt = f"""
Query: {query}
Rank these cached responses by relevance (1 = most relevant):
{format_candidates(candidates)}
"""
ranking = await lightweight_llm.rank(rerank_prompt)
best_candidate = candidates[ranking[0]]
if best_candidate.score > 0.8: # Rerank threshold
return best_candidate
return None
2. Metadata Filtering
Filter before vector search to improve precision:
# Good: Filter by agent_type + content_type
query = VectorQuery(
vector=embedding,
filter_expression="@agent_type:{security_auditor} @content_type:{article}"
)
# Better: Add difficulty level
query = VectorQuery(
vector=embedding,
filter_expression="""
@agent_type:{security_auditor}
@content_type:{article}
@difficulty_level:{advanced}
"""
)
3. Quality-Based Eviction
Prioritize keeping high-quality responses:
async def evict_low_quality_entries(cache: SemanticCache, max_size: int):
"""Evict low-quality entries when cache is full."""
# Get all entries sorted by quality score
entries = await cache.get_all_sorted_by_quality()
if len(entries) > max_size:
# Keep top N by quality, evict rest
to_evict = entries[max_size:]
for entry in to_evict:
await cache.delete(entry.key)
4. Dynamic Threshold Adjustment
Adjust similarity threshold based on cache hit rate:
class AdaptiveThresholdManager:
"""Dynamically adjust threshold based on metrics."""
def __init__(self, target_hit_rate: float = 0.35):
self.target = target_hit_rate
self.threshold = 0.92
async def adjust(self, actual_hit_rate: float):
"""Adjust threshold to reach target hit rate."""
if actual_hit_rate < self.target - 0.05:
# Too many misses, lower threshold (more permissive)
self.threshold = max(0.85, self.threshold - 0.01)
elif actual_hit_rate > self.target + 0.05:
# Too many hits (possibly false positives), raise threshold
self.threshold = min(0.98, self.threshold + 0.01)
logger.info(f"Adjusted threshold to {self.threshold}")
Monitoring & Observability
Key Metrics
@dataclass
class CacheMetrics:
"""Track cache performance."""
# Hit rates
l1_hit_rate: float
l2_hit_rate: float
l3_hit_rate: float
combined_hit_rate: float
# Latency
l1_avg_latency_ms: float
l2_avg_latency_ms: float
l3_avg_latency_ms: float
l4_avg_latency_ms: float
# Cost
estimated_cost_saved_usd: float
total_requests: int
# Quality
false_positive_rate: float # Wrong cached responses
false_negative_rate: float # Missed valid cache hits
Langfuse Cost Tracking (2025 Best Practice)
Langfuse automatically tracks token usage and costs for all LLM calls. This eliminates manual cost calculation and provides accurate cost attribution.
Automatic Cost Tracking with Custom Trace IDs
from langfuse.decorators import observe, langfuse_context
from uuid import UUID
@observe(as_type="generation")
async def call_llm_with_cache(
prompt: str,
agent_type: str,
analysis_id: UUID | None = None
) -> str:
"""LLM call with automatic cost tracking via Langfuse.
CRITICAL: Always link to parent trace for cost attribution!
"""
# Link to parent analysis trace (for cost rollup)
if analysis_id:
langfuse_context.update_current_trace(
name=f"{agent_type}_generation",
session_id=str(analysis_id), # Group by analysis
tags=[agent_type, "cached"],
metadata={"analysis_id": str(analysis_id)}
)
# Langfuse decorator automatically:
# 1. Captures input/output tokens
# 2. Calculates costs using model pricing
# 3. Tags with agent_type for cost attribution
# 4. Records cache hit/miss status
# L1: Check exact cache
cache_key = hash_content(prompt)
if cache_key in lru_cache:
# Mark as cache hit (zero cost)
langfuse_context.update_current_observation(
metadata={"cache_layer": "L1", "cache_hit": True}
)
return lru_cache[cache_key]
# L2: Check semantic cache
embedding = await embed_text(prompt)
similar = await redis_cache.find_similar(embedding, agent_type)
if similar:
langfuse_context.update_current_observation(
metadata={"cache_layer": "L2", "cache_hit": True, "distance": similar.distance}
)
return similar.response
# L3/L4: LLM call with prompt caching
# Langfuse automatically tracks token usage and cost
response = await llm.generate(
messages=build_cached_messages(prompt),
model="claude-3-5-sonnet-20241022"
)
# Langfuse records:
# - input_tokens (total)
# - output_tokens
# - cache_creation_input_tokens (prompt cache breakpoints)
# - cache_read_input_tokens (cached prefix tokens)
# - total_cost (calculated from model pricing)
langfuse_context.update_current_observation(
metadata={
"cache_layer": "L3/L4",
"cache_hit": False,
"prompt_cache_hit": response.usage.cache_read_input_tokens > 0
}
)
# Store in L2 and L1 for future hits
await redis_cache.set(embedding, response.content, agent_type)
lru_cache[cache_key] = response.content
return response.content
Trace Hierarchy for Cost Attribution (Production Pattern)
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from uuid import uuid4, UUID
class CodeReviewWorkflow:
"""Multi-agent code review with hierarchical cost tracking."""
@observe(as_type="trace")
async def run_code_review(self, pr_id: int, diff: str, review_id: UUID) -> dict:
"""Parent trace - aggregates all child agent costs.
Trace Hierarchy:
run_code_review (trace)
├── security_scan_generation (generation)
├── performance_analysis_generation (generation)
├── style_check_generation (generation)
├── test_coverage_generation (generation)
└── synthesis_generation (generation)
Langfuse automatically rolls up costs to parent trace.
"""
# Set trace metadata for filtering/grouping
langfuse_context.update_current_trace(
name="code_review",
session_id=str(review_id),
user_id=f"pr_{pr_id}", # Group by PR for analysis
tags=["multi-agent", "production", "code-review"],
metadata={
"review_id": str(review_id),
"pr_id": pr_id,
"agent_count": 5,
"diff_size": len(diff)
}
)
# Each review agent creates a child generation
findings = {}
for agent in self.review_agents:
# Child generation auto-linked to parent trace
result = await self.run_review_agent(
agent=agent,
code_diff=diff,
review_id=review_id # Links to parent
)
findings[agent.name] = result
# Synthesis also tracked as child generation
synthesis = await self.synthesize_review(
findings=findings,
review_id=review_id
)
# Langfuse dashboard shows:
# - Total cost for this review (sum of all child generations)
# - Token breakdown by review agent type
# - Cache hit rate per agent
# - Latency per agent
# - Quality score trends
return {"findings": findings, "synthesis": synthesis, "approved": synthesis.approved}
@observe(as_type="generation")
async def run_agent(
self,
agent: Agent,
content: str,
analysis_id: UUID
) -> dict:
"""Child generation - costs roll up to parent trace."""
langfuse_context.update_current_observation(
name=f"{agent.name}_generation",
metadata={
"agent_type": agent.name,
"content_length": len(content)
}
)
# LLM call automatically tracked
response = await agent.analyze(content)
return response
Cost Rollup Query Pattern
from langfuse import Langfuse
from datetime import datetime, timedelta
async def get_analysis_costs(analysis_id: UUID) -> dict:
"""Get total cost for an analysis (parent trace + all child generations)."""
langfuse = Langfuse()
# Fetch parent trace by session_id
traces = langfuse.get_traces(
session_id=str(analysis_id),
limit=1
)
if not traces.data:
return {"error": "Trace not found"}
trace = traces.data[0]
# Langfuse automatically aggregates child costs
return {
"trace_id": trace.id,
"total_cost": trace.total_cost, # Sum of all child generations
"input_tokens": trace.usage.input_tokens,
"output_tokens": trace.usage.output_tokens,
"cache_read_tokens": trace.usage.cache_read_input_tokens,
"observations_count": trace.observation_count, # Number of child LLM calls
"latency_ms": trace.latency,
"created_at": trace.timestamp
}
async def get_daily_costs_by_agent() -> list[dict]:
"""Get cost breakdown by agent type for last 30 days."""
langfuse = Langfuse()
# Fetch all generations from last 30 days
from_date = datetime.now() - timedelta(days=30)
generations = langfuse.get_generations(
from_timestamp=from_date,
limit=10000
)
# Group by agent type (from metadata)
costs_by_agent = {}
for gen in generations.data:
agent_type = gen.metadata.get("agent_type", "unknown")
cost = gen.calculated_total_cost or 0.0
if agent_type not in costs_by_agent:
costs_by_agent[agent_type] = {
"agent_type": agent_type,
"total_cost": 0.0,
"call_count": 0,
"total_input_tokens": 0,
"total_output_tokens": 0,
"cache_hits": 0
}
costs_by_agent[agent_type]["total_cost"] += cost
costs_by_agent[agent_type]["call_count"] += 1
costs_by_agent[agent_type]["total_input_tokens"] += gen.usage.input or 0
costs_by_agent[agent_type]["total_output_tokens"] += gen.usage.output or 0
if gen.metadata.get("cache_hit"):
costs_by_agent[agent_type]["cache_hits"] += 1
# Calculate averages
results = []
for stats in costs_by_agent.values():
stats["avg_cost_per_call"] = stats["total_cost"] / stats["call_count"]
stats["cache_hit_rate"] = stats["cache_hits"] / stats["call_count"]
results.append(stats)
# Sort by total cost descending
results.sort(key=lambda x: x["total_cost"], reverse=True)
return results
Cost Attribution by Agent Type
# Langfuse dashboard query:
# GROUP BY metadata.agent_type
# SUM(total_cost) AS cost_per_agent
#
# Results show:
# - security_auditor: $12.45 (35% cache hit rate)
# - implementation_planner: $8.23 (42% cache hit rate)
# - tech_comparator: $5.67 (58% cache hit rate)
Cache Effectiveness Analysis
from langfuse import Langfuse
langfuse = Langfuse()
# Query all generations with cache metadata
generations = langfuse.get_generations(
limit=1000,
from_timestamp=datetime.now() - timedelta(days=7)
)
cache_hits = 0
cache_misses = 0
total_cost = 0.0
cost_saved = 0.0
for gen in generations:
metadata = gen.metadata or {}
is_cache_hit = metadata.get("cache_hit", False)
if is_cache_hit:
cache_hits += 1
# Estimate saved cost (cost of equivalent full LLM call)
cost_saved += gen.calculated_total_cost or 0 # Would be higher without cache
else:
cache_misses += 1
total_cost += gen.calculated_total_cost or 0
hit_rate = cache_hits / (cache_hits + cache_misses)
print(f"Cache Hit Rate: {hit_rate:.1%}")
print(f"Cost Saved: ${cost_saved:.2f}")
print(f"Total Cost: ${total_cost:.2f}")
print(f"Savings Rate: {(cost_saved / (cost_saved + total_cost)):.1%}")
Model Pricing Registry
from dataclasses import dataclass
@dataclass
class ModelInfo:
"""Model configuration with pricing."""
model_id: str
display_name: str
max_tokens: int
input_cost_per_1m: float # USD per 1M input tokens
output_cost_per_1m: float # USD per 1M output tokens
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate total cost for token usage."""
input_cost = (input_tokens / 1_000_000) * self.input_cost_per_1m
output_cost = (output_tokens / 1_000_000) * self.output_cost_per_1m
return input_cost + output_cost
# Claude 3.5 Sonnet (Updated March 2025)
MODEL_REGISTRY = {
"claude-3-5-sonnet-20241022": ModelInfo(
model_id="claude-3-5-sonnet-20241022",
display_name="Claude 3.5 Sonnet (New)",
max_tokens=8192,
input_cost_per_1m=3.00, # $3 per 1M tokens
output_cost_per_1m=15.00, # $15 per 1M tokens
),
"gpt-4-turbo-2024-04-09": ModelInfo(
model_id="gpt-4-turbo-2024-04-09",
display_name="GPT-4 Turbo",
max_tokens=4096,
input_cost_per_1m=10.00,
output_cost_per_1m=30.00,
),
}
Langfuse Dashboard Views
Access cost insights at http://localhost:3000:
Cost Dashboard:
- Total cost by day/week/month
- Cost breakdown by model
- Cost attribution by agent type
- Cache hit rate impact on costs
- Top 10 most expensive traces
Cache Effectiveness:
- L1/L2/L3 hit rates over time
- Cost savings from semantic cache
- Cost savings from prompt cache
- False positive rate (wrong cache hits)
Agent Performance:
- Average cost per agent invocation
- Token usage distribution
- Cache hit rate by agent type
- Quality score vs. cost correlation
RedisInsight Dashboard
Access Redis cache visualization at http://localhost:8001:
- View cache entries
- Monitor vector similarity distributions
- Track hit/miss rates by agent type
- Analyze quality score distributions
- Identify hot keys
Local Model Considerations (Ollama)
When using local models via Ollama, the caching calculus changes:
Cost Impact:
| Provider | Caching Value | Reason |
|---|---|---|
| Cloud APIs | Critical | $3-15 per MTok |
| Ollama Local | Optional | FREE per token |
When to still cache with Ollama:
- Latency reduction: Cache provides ~1-10ms vs ~50-200ms for local inference
- Memory pressure: Avoid loading multiple models for repeated queries
- Batch CI runs: Same queries across test runs benefit from L1 cache
Simplified Cache Strategy for Local:
# With Ollama, L1 (LRU) cache is usually sufficient
# Skip L2 (Redis semantic) unless latency-critical
async def get_local_llm_response(query: str) -> str:
# L1: Exact match only (sufficient for local)
cache_key = hash_content(query)
if cache_key in lru_cache:
return lru_cache[cache_key] # ~1ms
# Direct local inference (FREE, fast enough)
response = await ollama_provider.ainvoke(query) # ~50-200ms
# Store in L1 only
lru_cache[cache_key] = response.content
return response.content
Best Practice: Use factory pattern to apply full caching hierarchy only for cloud APIs:
if settings.OLLAMA_ENABLED:
# Minimal caching for local models
return LocalCacheStrategy(l1_only=True)
else:
# Full L1/L2/L3 caching for cloud APIs
return CloudCacheStrategy(l1=True, l2=True, l3=True)
See ai-native-development skill section "10. Local LLM Inference with Ollama" for provider setup.
References
- Redis Blog: Prompt Caching vs Semantic Caching (Dec 2025)
- Redis Blog: 10 Techniques for Semantic Cache Optimization
- RedisVL Docs: SemanticCache Guide
- LangChain: RedisSemanticCache
- Anthropic: Prompt Caching Guide (March 2025: cache reads free!)
Integration Examples
See:
references/redis-setup.md- Docker Compose + RedisVL setupreferences/cache-hierarchy.md- Multi-level cache implementationreferences/cost-optimization.md- ROI calculations and benchmarkstemplates/semantic-cache-service.py- Production-ready servicetemplates/prompt-cache-wrapper.py- Claude caching wrapperexamples/project-integration.md- this project specific patterns
Skill Version: 1.3.0 Last Updated: 2025-12-28 Maintained by: this project AI Agent Hub
Changelog
v1.3.0 (2025-12-28)
- Added "Local Model Considerations (Ollama)" section
- Added cost comparison table for cloud vs local caching value
- Added simplified caching strategy for local models
- Added factory pattern example for adaptive caching
- Cross-referenced ai-native-development skill for Ollama setup
v1.2.0 (2025-12-27)
- Added hierarchical trace pattern for multi-agent cost rollup
- Added
session_idlinking pattern for cost attribution to parent analysis - Added cost rollup query patterns with Langfuse API
- Added daily cost breakdown by agent type example
- Updated automatic cost tracking with custom trace ID support
- Added this project-specific multi-agent workflow cost tracking pattern
v1.1.0 (2025-12-27)
- Added comprehensive Langfuse cost tracking section
- Added automatic cost tracking with
@observedecorator - Added cost attribution by agent type patterns
- Added cache effectiveness analysis with Langfuse API
- Added model pricing registry with
calculate_cost()method - Added Langfuse dashboard views for cost insights
- Updated monitoring section with cost tracking best practices
v1.0.0 (2025-12-14)
- Initial skill with double caching architecture (L1/L2/L3/L4)
- Redis semantic cache implementation with RedisVL
- Claude prompt caching patterns
- Cache warming strategies
- Similarity threshold tuning guidelines
- Optimization techniques (reranking, metadata filtering, quality-based eviction)