| name | llm-inference-optimization |
| description | Optimize LLM inference for latency, throughput, and cost in production systems. |
LLM Inference Optimization Skill
When to Use This Skill
Use this skill when:
- Building production LLM applications with latency requirements
- Processing large batches of requests (classification, summarization, extraction)
- Optimizing cost for high-volume applications
- Improving throughput for batch processing
- Enhancing user experience with streaming
- Balancing cost, latency, and quality trade-offs
When NOT to use: Prototyping or single-query experiments where optimization is premature.
Core Principle
Performance is not automatic. Optimization is systematic.
Without optimization:
- Sequential processing: 16 minutes for 1000 documents (0.06 requests/sec)
- No caching: 60% wasted cost on repeated queries
- Wrong model: 10× expensive for same quality
- No streaming: 40% bounce rate on long generations
- Single-objective: Poor cost-latency-quality trade-offs
Formula: Parallelization (10× throughput) + Caching (60% cost savings) + Model routing (balanced cost-quality) + Streaming (better UX) + Multi-objective optimization (Pareto optimal) = Production-ready performance.
Optimization Framework
┌─────────────────────────────────────────┐
│ 1. Measure Baseline │
│ Latency, Cost, Quality, Throughput │
└──────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 2. Set Requirements │
│ Acceptable latency, Budget, Quality │
└──────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 3. Apply Optimizations │
│ Parallelization → Caching → Routing │
└──────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 4. Evaluate Trade-offs │
│ Cost vs Latency vs Quality (Pareto) │
└──────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 5. Monitor Production │
│ Track metrics, Detect regressions │
└─────────────────────────────────────────┘
Part 1: Parallelization
Async/Await for Concurrent Requests
Problem: Sequential API calls are slow (1 request/sec).
Solution: Concurrent requests with async/await (10-20 requests/sec).
import asyncio
import openai
from typing import List
async def classify_async(text: str, semaphore: asyncio.Semaphore) -> str:
"""
Classify text asynchronously with rate limiting.
Args:
text: Text to classify
semaphore: Limits concurrent requests
Returns:
Classification result
"""
async with semaphore:
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
{"role": "user", "content": text}
]
)
return response.choices[0].message.content
async def classify_batch_parallel(
texts: List[str],
concurrency: int = 10
) -> List[str]:
"""
Classify multiple texts in parallel.
Args:
texts: List of texts to classify
concurrency: Maximum concurrent requests (default 10)
Returns:
List of classification results
"""
semaphore = asyncio.Semaphore(concurrency)
tasks = [classify_async(text, semaphore) for text in texts]
results = await asyncio.gather(*tasks)
return results
# Example usage
texts = ["Great product!", "Terrible service.", "It's okay."] * 333 # 1000 texts
# Sequential: 1000 requests × 1 second = 1000 seconds (16.7 minutes)
# Parallel (concurrency=10): 1000 requests / 10 = 100 seconds (1.7 minutes) - 10× FASTER!
results = asyncio.run(classify_batch_parallel(texts, concurrency=10))
print(f"Classified {len(results)} texts")
Performance comparison:
| Approach | Time | Throughput | Cost |
|---|---|---|---|
| Sequential | 1000s (16.7 min) | 1 req/sec | $2.00 |
| Parallel (10) | 100s (1.7 min) | 10 req/sec | $2.00 (same!) |
| Parallel (20) | 50s (0.8 min) | 20 req/sec | $2.00 (same!) |
Key insight: Parallelization is free performance. Same cost, 10-20× faster.
OpenAI Batch API (Offline Processing)
Problem: Real-time API is expensive for large batch jobs.
Solution: Batch API (50% cheaper, 24-hour completion window).
import openai
import jsonlines
import time
def create_batch_job(texts: List[str], output_file: str = "batch_results.jsonl"):
"""
Submit batch job for offline processing (50% cost reduction).
Args:
texts: List of texts to process
output_file: File to save results
Returns:
Batch job ID
"""
# Step 1: Create batch input file (JSONL format)
batch_input = []
for i, text in enumerate(texts):
batch_input.append({
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
{"role": "user", "content": text}
]
}
})
# Write to file
with jsonlines.open("batch_input.jsonl", "w") as writer:
writer.write_all(batch_input)
# Step 2: Upload file
with open("batch_input.jsonl", "rb") as f:
file_response = openai.File.create(file=f, purpose="batch")
# Step 3: Create batch job
batch_job = openai.Batch.create(
input_file_id=file_response.id,
endpoint="/v1/chat/completions",
completion_window="24h" # Complete within 24 hours
)
print(f"Batch job created: {batch_job.id}")
print(f"Status: {batch_job.status}")
return batch_job.id
def check_batch_status(batch_id: str):
"""Check batch job status."""
batch = openai.Batch.retrieve(batch_id)
print(f"Status: {batch.status}")
print(f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}")
if batch.status == "completed":
# Download results
result_file_id = batch.output_file_id
result = openai.File.download(result_file_id)
with open("batch_results.jsonl", "wb") as f:
f.write(result)
print(f"Results saved to batch_results.jsonl")
return batch.status
# Example usage
texts = ["Great product!"] * 10000 # 10,000 texts
# Submit batch job
batch_id = create_batch_job(texts)
# Check status (poll every 10 minutes)
while True:
status = check_batch_status(batch_id)
if status == "completed":
break
time.sleep(600) # Check every 10 minutes
# Cost: $10 (batch API) vs $20 (real-time API) = 50% savings!
When to use Batch API:
| Use Case | Real-time API | Batch API |
|---|---|---|
| User-facing chat | ✓ (latency critical) | ✗ |
| Document classification (10k docs) | ✗ (expensive) | ✓ (50% cheaper) |
| Nightly data processing | ✗ | ✓ |
| A/B test evaluation | ✗ | ✓ |
| Real-time search | ✓ | ✗ |
Part 2: Caching
Answer Caching (Repeated Queries)
Problem: 60-70% of queries are repeated (FAQs, common questions).
Solution: Cache answers for identical queries (60% cost reduction).
import hashlib
import json
from typing import Optional
class AnswerCache:
def __init__(self):
self.cache = {} # In-memory cache (use Redis for production)
def _cache_key(self, query: str, model: str = "gpt-3.5-turbo") -> str:
"""Generate cache key from query and model."""
# Normalize query (lowercase, strip whitespace)
normalized = query.lower().strip()
# Hash for consistent key
key_data = f"{model}:{normalized}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
"""Get cached answer if exists."""
key = self._cache_key(query, model)
return self.cache.get(key)
def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
"""Cache answer for query."""
key = self._cache_key(query, model)
self.cache[key] = answer
def stats(self):
"""Get cache statistics."""
return {
"cache_size": len(self.cache),
"memory_bytes": sum(len(v.encode()) for v in self.cache.values())
}
def answer_with_cache(
query: str,
cache: AnswerCache,
model: str = "gpt-3.5-turbo"
) -> tuple[str, bool]:
"""
Answer query with caching.
Returns:
(answer, cache_hit)
"""
# Check cache
cached_answer = cache.get(query, model)
if cached_answer:
return cached_answer, True # Cache hit!
# Cache miss: Generate answer
response = openai.ChatCompletion.create(
model=model,
messages=[
{"role": "system", "content": "Answer the question concisely."},
{"role": "user", "content": query}
]
)
answer = response.choices[0].message.content
# Cache for future queries
cache.set(query, answer, model)
return answer, False
# Example usage
cache = AnswerCache()
queries = [
"What is your return policy?",
"How do I track my order?",
"What is your return policy?", # Repeated!
"Do you offer international shipping?",
"What is your return policy?", # Repeated again!
]
cache_hits = 0
cache_misses = 0
for query in queries:
answer, is_cache_hit = answer_with_cache(query, cache)
if is_cache_hit:
cache_hits += 1
print(f"[CACHE HIT] {query}")
else:
cache_misses += 1
print(f"[CACHE MISS] {query}")
print(f"Answer: {answer}\n")
print(f"Cache hits: {cache_hits}/{len(queries)} ({cache_hits/len(queries)*100:.1f}%)")
print(f"Cost savings: {cache_hits/len(queries)*100:.1f}%")
# Output:
# [CACHE MISS] What is your return policy?
# [CACHE MISS] How do I track my order?
# [CACHE HIT] What is your return policy?
# [CACHE MISS] Do you offer international shipping?
# [CACHE HIT] What is your return policy?
# Cache hits: 2/5 (40%)
# Cost savings: 40%
Production caching with Redis:
import redis
import json
class RedisAnswerCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.ttl = 86400 # 24 hours
def _cache_key(self, query: str, model: str) -> str:
normalized = query.lower().strip()
return f"answer:{model}:{hashlib.md5(normalized.encode()).hexdigest()}"
def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
key = self._cache_key(query, model)
cached = self.redis_client.get(key)
return cached.decode() if cached else None
def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
key = self._cache_key(query, model)
self.redis_client.setex(key, self.ttl, answer)
def stats(self):
return {
"cache_size": self.redis_client.dbsize(),
"memory_usage": self.redis_client.info("memory")["used_memory_human"]
}
Prompt Caching (Static Context)
Problem: RAG sends same context repeatedly (expensive).
Solution: Anthropic prompt caching (90% cost reduction for static context).
import anthropic
def rag_with_prompt_caching(
query: str,
context: str, # Static context (knowledge base)
model: str = "claude-3-sonnet-20240229"
):
"""
RAG with prompt caching for static context.
First query: Full cost (e.g., $0.01)
Subsequent queries: 90% discount on cached context (e.g., $0.001)
"""
client = anthropic.Anthropic()
response = client.messages.create(
model=model,
max_tokens=500,
system=[
{
"type": "text",
"text": "Answer questions using only the provided context.",
},
{
"type": "text",
"text": f"Context:\n{context}",
"cache_control": {"type": "ephemeral"} # Cache this!
}
],
messages=[
{"role": "user", "content": query}
]
)
return response.content[0].text
# Example
knowledge_base = """
[Large knowledge base with 50,000 tokens of product info, policies, FAQs...]
"""
# Query 1: Full cost (write context to cache)
answer1 = rag_with_prompt_caching("What is your return policy?", knowledge_base)
# Cost: Input (50k tokens × $0.003/1k) + Cache write (50k × $0.00375/1k) = $0.34
# Query 2-100: 90% discount on cached context!
answer2 = rag_with_prompt_caching("How do I track my order?", knowledge_base)
# Cost: Cached input (50k × $0.0003/1k) + Query (20 tokens × $0.003/1k) = $0.015 + $0.00006 = $0.015
# Savings: Query 2-100 cost $0.015 vs $0.34 = 95.6% reduction per query!
When prompt caching is effective:
| Scenario | Static Context | Dynamic Content | Cache Savings |
|---|---|---|---|
| RAG with knowledge base | 50k tokens (policies, products) | Query (20 tokens) | 95%+ |
| Multi-turn chat with instructions | 1k tokens (system message) | Conversation (varying) | 60-80% |
| Document analysis | 10k tokens (document) | Multiple questions | 90%+ |
| Code review with context | 5k tokens (codebase) | Review comments | 85%+ |
Part 3: Model Routing
Task-Based Model Selection
Problem: Using GPT-4 for everything is 10× expensive.
Solution: Route by task complexity (GPT-3.5 for simple, GPT-4 for complex).
from enum import Enum
from typing import Dict
class TaskType(Enum):
CLASSIFICATION = "classification"
EXTRACTION = "extraction"
SUMMARIZATION = "summarization"
TRANSLATION = "translation"
REASONING = "reasoning"
CREATIVE = "creative"
CODE_GENERATION = "code_generation"
class ModelRouter:
"""Route queries to appropriate model based on task complexity."""
# Model configurations
MODELS = {
"gpt-3.5-turbo": {
"cost_per_1k_input": 0.0015,
"cost_per_1k_output": 0.002,
"latency_factor": 1.0, # Baseline
"quality_score": 0.85
},
"gpt-4": {
"cost_per_1k_input": 0.03,
"cost_per_1k_output": 0.06,
"latency_factor": 2.5,
"quality_score": 0.95
},
"gpt-4-turbo": {
"cost_per_1k_input": 0.01,
"cost_per_1k_output": 0.03,
"latency_factor": 1.5,
"quality_score": 0.94
}
}
# Task → Model mapping
TASK_ROUTING = {
TaskType.CLASSIFICATION: "gpt-3.5-turbo", # Simple task
TaskType.EXTRACTION: "gpt-3.5-turbo",
TaskType.SUMMARIZATION: "gpt-3.5-turbo",
TaskType.TRANSLATION: "gpt-3.5-turbo",
TaskType.REASONING: "gpt-4", # Complex reasoning
TaskType.CREATIVE: "gpt-4", # Better creativity
TaskType.CODE_GENERATION: "gpt-4" # Better coding
}
@classmethod
def route(cls, task_type: TaskType, complexity: str = "medium") -> str:
"""
Route to appropriate model.
Args:
task_type: Type of task
complexity: "low", "medium", "high"
Returns:
Model name
"""
base_model = cls.TASK_ROUTING[task_type]
# Override for high complexity
if complexity == "high" and base_model == "gpt-3.5-turbo":
return "gpt-4-turbo" # Upgrade for complex variants
return base_model
@classmethod
def calculate_cost(cls, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for model."""
config = cls.MODELS[model]
input_cost = (input_tokens / 1000) * config["cost_per_1k_input"]
output_cost = (output_tokens / 1000) * config["cost_per_1k_output"]
return input_cost + output_cost
@classmethod
def compare_models(cls, task_type: TaskType, input_tokens: int = 500, output_tokens: int = 200):
"""Compare models for a task."""
print(f"\nTask: {task_type.value}")
print(f"Input: {input_tokens} tokens, Output: {output_tokens} tokens\n")
for model_name, config in cls.MODELS.items():
cost = cls.calculate_cost(model_name, input_tokens, output_tokens)
quality = config["quality_score"]
latency = config["latency_factor"]
print(f"{model_name}:")
print(f" Cost: ${cost:.4f}")
print(f" Quality: {quality:.0%}")
print(f" Latency: {latency:.1f}× baseline")
print(f" Cost per quality point: ${cost/quality:.4f}\n")
# Example usage
router = ModelRouter()
# Classification task
model = router.route(TaskType.CLASSIFICATION, complexity="low")
print(f"Classification → {model}") # gpt-3.5-turbo
# Complex reasoning task
model = router.route(TaskType.REASONING, complexity="high")
print(f"Complex reasoning → {model}") # gpt-4
# Compare costs
router.compare_models(TaskType.CLASSIFICATION, input_tokens=500, output_tokens=200)
# Output:
# gpt-3.5-turbo: $0.0015 (Cost per quality: $0.0018)
# gpt-4: $0.0270 (Cost per quality: $0.0284) - 18× more expensive!
# Recommendation: Use GPT-3.5 for classification (18× cheaper, acceptable quality)
Model Cascade (Try Cheap First)
Problem: Don't know if task needs GPT-4 until you try.
Solution: Try GPT-3.5, escalate to GPT-4 if unsatisfied.
def cascade_generation(
prompt: str,
quality_threshold: float = 0.8,
max_attempts: int = 2
) -> tuple[str, str, float]:
"""
Try cheaper model first, escalate if quality insufficient.
Args:
prompt: User prompt
quality_threshold: Minimum quality score (0-1)
max_attempts: Max escalation attempts
Returns:
(response, model_used, estimated_quality)
"""
models = ["gpt-3.5-turbo", "gpt-4-turbo", "gpt-4"]
for i, model in enumerate(models[:max_attempts]):
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
# Estimate quality (simplified - use LLM-as-judge in production)
quality = estimate_quality(result, prompt)
if quality >= quality_threshold:
print(f"✓ {model} met quality threshold ({quality:.2f} >= {quality_threshold})")
return result, model, quality
else:
print(f"✗ {model} below threshold ({quality:.2f} < {quality_threshold}), escalating...")
# Return best attempt even if below threshold
return result, models[max_attempts-1], quality
def estimate_quality(response: str, prompt: str) -> float:
"""
Estimate quality score (0-1).
Production: Use LLM-as-judge or other quality metrics.
"""
# Simplified heuristic
if len(response) < 20:
return 0.3 # Too short
elif len(response) > 500:
return 0.9 # Detailed
else:
return 0.7 # Moderate
# Example
prompt = "Explain quantum entanglement in simple terms."
result, model, quality = cascade_generation(prompt, quality_threshold=0.8)
print(f"\nFinal result:")
print(f"Model: {model}")
print(f"Quality: {quality:.2f}")
print(f"Response: {result[:200]}...")
# Average case: GPT-3.5 suffices (90% of queries)
# Cost: $0.002 per query
# Complex case: Escalate to GPT-4 (10% of queries)
# Cost: $0.002 (GPT-3.5 attempt) + $0.030 (GPT-4) = $0.032
# Overall cost: 0.9 × $0.002 + 0.1 × $0.032 = $0.0018 + $0.0032 = $0.005
# vs Always GPT-4: $0.030
# Savings: 83%!
Part 4: Streaming
Streaming for Long-Form Generation
Problem: 20-second wait for full article (40% bounce rate).
Solution: Stream tokens as generated (perceived latency: 0.5s).
import openai
def generate_streaming(prompt: str, model: str = "gpt-4"):
"""
Generate response with streaming.
Benefits:
- First token in 0.5s (vs 20s wait)
- User sees progress (engagement)
- Can cancel early if needed
"""
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=2000,
stream=True # Enable streaming
)
full_response = ""
for chunk in response:
if chunk.choices[0].delta.get("content"):
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True) # Display immediately
print() # Newline
return full_response
# Example
prompt = "Write a detailed article about the history of artificial intelligence."
# Without streaming: Wait 20s, then see full article
# With streaming: See first words in 0.5s, smooth streaming for 20s
article = generate_streaming(prompt)
# User experience improvement:
# - Perceived latency: 20s → 0.5s (40× better!)
# - Bounce rate: 40% → 5% (35pp improvement!)
# - Satisfaction: 3.2/5 → 4.3/5 (+1.1 points!)
Streaming in Web Applications
Flask with Server-Sent Events (SSE):
from flask import Flask, Response, request
import openai
app = Flask(__name__)
@app.route('/generate', methods=['POST'])
def generate_stream():
"""Stream generation results to frontend."""
prompt = request.json.get('prompt')
def event_stream():
"""Generator for SSE."""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.get("content"):
token = chunk.choices[0].delta.content
# SSE format: "data: {content}\n\n"
yield f"data: {token}\n\n"
# Signal completion
yield "data: [DONE]\n\n"
return Response(event_stream(), mimetype="text/event-stream")
# Frontend (JavaScript):
"""
const eventSource = new EventSource('/generate', {
method: 'POST',
body: JSON.stringify({prompt: userPrompt})
});
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
} else {
// Append token to display
document.getElementById('output').innerText += event.data;
}
};
"""
Part 5: Cost-Latency-Quality Trade-offs
Multi-Objective Optimization
Problem: Optimizing single objective (cost OR latency) leads to poor trade-offs.
Solution: Pareto analysis to find balanced solutions.
import numpy as np
from typing import List, Dict
class OptimizationOption:
def __init__(
self,
name: str,
latency_p95: float, # seconds
cost_per_1k: float, # dollars
quality_score: float # 0-1
):
self.name = name
self.latency_p95 = latency_p95
self.cost_per_1k = cost_per_1k
self.quality_score = quality_score
def dominates(self, other: 'OptimizationOption') -> bool:
"""Check if this option dominates another (Pareto dominance)."""
# Dominate if: better or equal in all dimensions, strictly better in at least one
better_latency = self.latency_p95 <= other.latency_p95
better_cost = self.cost_per_1k <= other.cost_per_1k
better_quality = self.quality_score >= other.quality_score
strictly_better = (
self.latency_p95 < other.latency_p95 or
self.cost_per_1k < other.cost_per_1k or
self.quality_score > other.quality_score
)
return better_latency and better_cost and better_quality and strictly_better
def __repr__(self):
return f"{self.name}: {self.latency_p95:.2f}s, ${self.cost_per_1k:.3f}/1k, {self.quality_score:.2f} quality"
def find_pareto_optimal(options: List[OptimizationOption]) -> List[OptimizationOption]:
"""Find Pareto optimal solutions (non-dominated options)."""
pareto_optimal = []
for option in options:
is_dominated = False
for other in options:
if other.dominates(option):
is_dominated = True
break
if not is_dominated:
pareto_optimal.append(option)
return pareto_optimal
# Example: RAG chatbot optimization
options = [
OptimizationOption("GPT-4, no caching", latency_p95=2.5, cost_per_1k=10.0, quality_score=0.92),
OptimizationOption("GPT-3.5, no caching", latency_p95=0.8, cost_per_1k=2.0, quality_score=0.78),
OptimizationOption("GPT-3.5 + caching", latency_p95=0.6, cost_per_1k=1.2, quality_score=0.78),
OptimizationOption("GPT-3.5 + caching + prompt eng", latency_p95=0.7, cost_per_1k=1.3, quality_score=0.85),
OptimizationOption("GPT-4 + caching", latency_p95=2.0, cost_per_1k=6.0, quality_score=0.92),
OptimizationOption("GPT-4-turbo + caching", latency_p95=1.2, cost_per_1k=4.0, quality_score=0.90),
]
# Find Pareto optimal
pareto = find_pareto_optimal(options)
print("Pareto Optimal Solutions:")
for opt in pareto:
print(f" {opt}")
# Output:
# Pareto Optimal Solutions:
# GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
# GPT-4-turbo + caching: 1.20s, $4.000/1k, 0.90 quality
# GPT-4 + caching: 2.00s, $6.000/1k, 0.92 quality
# Interpretation:
# - If budget-conscious: GPT-3.5 + caching + prompt eng ($1.30/1k, 0.85 quality)
# - If quality-critical: GPT-4-turbo + caching ($4/1k, 0.90 quality, faster than GPT-4)
# - If maximum quality needed: GPT-4 + caching ($6/1k, 0.92 quality)
Requirements-Based Selection
def select_optimal_solution(
options: List[OptimizationOption],
max_latency: float = None,
max_cost: float = None,
min_quality: float = None
) -> OptimizationOption:
"""
Select optimal solution given constraints.
Args:
options: Available options
max_latency: Maximum acceptable latency (seconds)
max_cost: Maximum cost per 1k queries (dollars)
min_quality: Minimum quality score (0-1)
Returns:
Best option meeting all constraints
"""
# Filter options meeting constraints
feasible = []
for opt in options:
meets_latency = max_latency is None or opt.latency_p95 <= max_latency
meets_cost = max_cost is None or opt.cost_per_1k <= max_cost
meets_quality = min_quality is None or opt.quality_score >= min_quality
if meets_latency and meets_cost and meets_quality:
feasible.append(opt)
if not feasible:
raise ValueError("No solution meets all constraints")
# Among feasible, select best cost-quality trade-off
best = min(feasible, key=lambda opt: opt.cost_per_1k / opt.quality_score)
return best
# Example: Requirements
requirements = {
"max_latency": 1.0, # Must respond within 1 second
"max_cost": 5.0, # Budget: $5 per 1k queries
"min_quality": 0.85 # Minimum 85% quality
}
selected = select_optimal_solution(
options,
max_latency=requirements["max_latency"],
max_cost=requirements["max_cost"],
min_quality=requirements["min_quality"]
)
print(f"Selected solution: {selected}")
# Output: GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
# (Meets all constraints, most cost-effective)
Part 6: Production Monitoring
Performance Metrics Tracking
import time
from dataclasses import dataclass
from typing import List
import numpy as np
@dataclass
class QueryMetrics:
"""Metrics for a single query."""
latency_ms: float
input_tokens: int
output_tokens: int
cost: float
cache_hit: bool
model: str
class PerformanceMonitor:
"""Track and analyze performance metrics."""
def __init__(self):
self.metrics: List[QueryMetrics] = []
def log_query(
self,
latency_ms: float,
input_tokens: int,
output_tokens: int,
cost: float,
cache_hit: bool,
model: str
):
"""Log query metrics."""
self.metrics.append(QueryMetrics(
latency_ms=latency_ms,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
cache_hit=cache_hit,
model=model
))
def summary(self) -> Dict:
"""Generate summary statistics."""
if not self.metrics:
return {}
latencies = [m.latency_ms for m in self.metrics]
costs = [m.cost for m in self.metrics]
cache_hits = [m.cache_hit for m in self.metrics]
return {
"total_queries": len(self.metrics),
"latency_p50": np.percentile(latencies, 50),
"latency_p95": np.percentile(latencies, 95),
"latency_p99": np.percentile(latencies, 99),
"avg_cost": np.mean(costs),
"total_cost": np.sum(costs),
"cache_hit_rate": np.mean(cache_hits) * 100,
"queries_per_model": self._count_by_model()
}
def _count_by_model(self) -> Dict[str, int]:
"""Count queries by model."""
counts = {}
for m in self.metrics:
counts[m.model] = counts.get(m.model, 0) + 1
return counts
# Example usage
monitor = PerformanceMonitor()
# Simulate queries
for i in range(1000):
cache_hit = np.random.random() < 0.6 # 60% cache hit rate
latency = 100 if cache_hit else 800 # Cache: 100ms, API: 800ms
cost = 0 if cache_hit else 0.002
monitor.log_query(
latency_ms=latency,
input_tokens=500,
output_tokens=200,
cost=cost,
cache_hit=cache_hit,
model="gpt-3.5-turbo"
)
# Generate summary
summary = monitor.summary()
print("Performance Summary:")
print(f" Total queries: {summary['total_queries']}")
print(f" Latency P50: {summary['latency_p50']:.0f}ms")
print(f" Latency P95: {summary['latency_p95']:.0f}ms")
print(f" Avg cost: ${summary['avg_cost']:.4f}")
print(f" Total cost: ${summary['total_cost']:.2f}")
print(f" Cache hit rate: {summary['cache_hit_rate']:.1f}%")
Summary
Inference optimization is systematic, not ad-hoc.
Core techniques:
- Parallelization: Async/await (10× throughput), Batch API (50% cheaper)
- Caching: Answer caching (60% savings), Prompt caching (90% savings)
- Model routing: GPT-3.5 for simple tasks (10× cheaper), GPT-4 for complex
- Streaming: First token in 0.5s (vs 20s wait), 35pp better completion rate
- Multi-objective: Pareto analysis (balance cost-latency-quality)
Checklist:
- ✓ Measure baseline (latency, cost, quality)
- ✓ Set requirements (acceptable latency, budget, quality threshold)
- ✓ Parallelize batch processing (10× throughput)
- ✓ Implement caching (60-90% cost savings)
- ✓ Route by task complexity (10× cost savings)
- ✓ Stream long responses (better UX)
- ✓ Analyze cost-latency-quality trade-offs (Pareto optimal)
- ✓ Monitor production metrics (track improvements)
Production-ready performance requires deliberate optimization across multiple dimensions.