Claude Code Plugins

Community-maintained marketplace

Feedback

llm-inference-optimization

@tachyon-beep/skillpacks
1
0

Optimize LLM inference for latency, throughput, and cost in production systems.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name llm-inference-optimization
description Optimize LLM inference for latency, throughput, and cost in production systems.

LLM Inference Optimization Skill

When to Use This Skill

Use this skill when:

  • Building production LLM applications with latency requirements
  • Processing large batches of requests (classification, summarization, extraction)
  • Optimizing cost for high-volume applications
  • Improving throughput for batch processing
  • Enhancing user experience with streaming
  • Balancing cost, latency, and quality trade-offs

When NOT to use: Prototyping or single-query experiments where optimization is premature.

Core Principle

Performance is not automatic. Optimization is systematic.

Without optimization:

  • Sequential processing: 16 minutes for 1000 documents (0.06 requests/sec)
  • No caching: 60% wasted cost on repeated queries
  • Wrong model: 10× expensive for same quality
  • No streaming: 40% bounce rate on long generations
  • Single-objective: Poor cost-latency-quality trade-offs

Formula: Parallelization (10× throughput) + Caching (60% cost savings) + Model routing (balanced cost-quality) + Streaming (better UX) + Multi-objective optimization (Pareto optimal) = Production-ready performance.

Optimization Framework

┌─────────────────────────────────────────┐
│      1. Measure Baseline                │
│  Latency, Cost, Quality, Throughput     │
└──────────────┬──────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│      2. Set Requirements                │
│  Acceptable latency, Budget, Quality    │
└──────────────┬──────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│      3. Apply Optimizations             │
│  Parallelization → Caching → Routing    │
└──────────────┬──────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│      4. Evaluate Trade-offs             │
│  Cost vs Latency vs Quality (Pareto)    │
└──────────────┬──────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│      5. Monitor Production              │
│  Track metrics, Detect regressions      │
└─────────────────────────────────────────┘

Part 1: Parallelization

Async/Await for Concurrent Requests

Problem: Sequential API calls are slow (1 request/sec).

Solution: Concurrent requests with async/await (10-20 requests/sec).

import asyncio
import openai
from typing import List

async def classify_async(text: str, semaphore: asyncio.Semaphore) -> str:
    """
    Classify text asynchronously with rate limiting.

    Args:
        text: Text to classify
        semaphore: Limits concurrent requests

    Returns:
        Classification result
    """
    async with semaphore:
        response = await openai.ChatCompletion.acreate(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
                {"role": "user", "content": text}
            ]
        )
        return response.choices[0].message.content

async def classify_batch_parallel(
    texts: List[str],
    concurrency: int = 10
) -> List[str]:
    """
    Classify multiple texts in parallel.

    Args:
        texts: List of texts to classify
        concurrency: Maximum concurrent requests (default 10)

    Returns:
        List of classification results
    """
    semaphore = asyncio.Semaphore(concurrency)

    tasks = [classify_async(text, semaphore) for text in texts]
    results = await asyncio.gather(*tasks)

    return results

# Example usage
texts = ["Great product!", "Terrible service.", "It's okay."] * 333  # 1000 texts

# Sequential: 1000 requests × 1 second = 1000 seconds (16.7 minutes)
# Parallel (concurrency=10): 1000 requests / 10 = 100 seconds (1.7 minutes) - 10× FASTER!

results = asyncio.run(classify_batch_parallel(texts, concurrency=10))
print(f"Classified {len(results)} texts")

Performance comparison:

Approach Time Throughput Cost
Sequential 1000s (16.7 min) 1 req/sec $2.00
Parallel (10) 100s (1.7 min) 10 req/sec $2.00 (same!)
Parallel (20) 50s (0.8 min) 20 req/sec $2.00 (same!)

Key insight: Parallelization is free performance. Same cost, 10-20× faster.

OpenAI Batch API (Offline Processing)

Problem: Real-time API is expensive for large batch jobs.

Solution: Batch API (50% cheaper, 24-hour completion window).

import openai
import jsonlines
import time

def create_batch_job(texts: List[str], output_file: str = "batch_results.jsonl"):
    """
    Submit batch job for offline processing (50% cost reduction).

    Args:
        texts: List of texts to process
        output_file: File to save results

    Returns:
        Batch job ID
    """
    # Step 1: Create batch input file (JSONL format)
    batch_input = []
    for i, text in enumerate(texts):
        batch_input.append({
            "custom_id": f"request-{i}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-3.5-turbo",
                "messages": [
                    {"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
                    {"role": "user", "content": text}
                ]
            }
        })

    # Write to file
    with jsonlines.open("batch_input.jsonl", "w") as writer:
        writer.write_all(batch_input)

    # Step 2: Upload file
    with open("batch_input.jsonl", "rb") as f:
        file_response = openai.File.create(file=f, purpose="batch")

    # Step 3: Create batch job
    batch_job = openai.Batch.create(
        input_file_id=file_response.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"  # Complete within 24 hours
    )

    print(f"Batch job created: {batch_job.id}")
    print(f"Status: {batch_job.status}")

    return batch_job.id

def check_batch_status(batch_id: str):
    """Check batch job status."""
    batch = openai.Batch.retrieve(batch_id)

    print(f"Status: {batch.status}")
    print(f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}")

    if batch.status == "completed":
        # Download results
        result_file_id = batch.output_file_id
        result = openai.File.download(result_file_id)

        with open("batch_results.jsonl", "wb") as f:
            f.write(result)

        print(f"Results saved to batch_results.jsonl")

    return batch.status

# Example usage
texts = ["Great product!"] * 10000  # 10,000 texts

# Submit batch job
batch_id = create_batch_job(texts)

# Check status (poll every 10 minutes)
while True:
    status = check_batch_status(batch_id)
    if status == "completed":
        break
    time.sleep(600)  # Check every 10 minutes

# Cost: $10 (batch API) vs $20 (real-time API) = 50% savings!

When to use Batch API:

Use Case Real-time API Batch API
User-facing chat ✓ (latency critical)
Document classification (10k docs) ✗ (expensive) ✓ (50% cheaper)
Nightly data processing
A/B test evaluation
Real-time search

Part 2: Caching

Answer Caching (Repeated Queries)

Problem: 60-70% of queries are repeated (FAQs, common questions).

Solution: Cache answers for identical queries (60% cost reduction).

import hashlib
import json
from typing import Optional

class AnswerCache:
    def __init__(self):
        self.cache = {}  # In-memory cache (use Redis for production)

    def _cache_key(self, query: str, model: str = "gpt-3.5-turbo") -> str:
        """Generate cache key from query and model."""
        # Normalize query (lowercase, strip whitespace)
        normalized = query.lower().strip()

        # Hash for consistent key
        key_data = f"{model}:{normalized}"
        return hashlib.md5(key_data.encode()).hexdigest()

    def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
        """Get cached answer if exists."""
        key = self._cache_key(query, model)
        return self.cache.get(key)

    def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
        """Cache answer for query."""
        key = self._cache_key(query, model)
        self.cache[key] = answer

    def stats(self):
        """Get cache statistics."""
        return {
            "cache_size": len(self.cache),
            "memory_bytes": sum(len(v.encode()) for v in self.cache.values())
        }

def answer_with_cache(
    query: str,
    cache: AnswerCache,
    model: str = "gpt-3.5-turbo"
) -> tuple[str, bool]:
    """
    Answer query with caching.

    Returns:
        (answer, cache_hit)
    """
    # Check cache
    cached_answer = cache.get(query, model)
    if cached_answer:
        return cached_answer, True  # Cache hit!

    # Cache miss: Generate answer
    response = openai.ChatCompletion.create(
        model=model,
        messages=[
            {"role": "system", "content": "Answer the question concisely."},
            {"role": "user", "content": query}
        ]
    )

    answer = response.choices[0].message.content

    # Cache for future queries
    cache.set(query, answer, model)

    return answer, False

# Example usage
cache = AnswerCache()

queries = [
    "What is your return policy?",
    "How do I track my order?",
    "What is your return policy?",  # Repeated!
    "Do you offer international shipping?",
    "What is your return policy?",  # Repeated again!
]

cache_hits = 0
cache_misses = 0

for query in queries:
    answer, is_cache_hit = answer_with_cache(query, cache)

    if is_cache_hit:
        cache_hits += 1
        print(f"[CACHE HIT] {query}")
    else:
        cache_misses += 1
        print(f"[CACHE MISS] {query}")

    print(f"Answer: {answer}\n")

print(f"Cache hits: {cache_hits}/{len(queries)} ({cache_hits/len(queries)*100:.1f}%)")
print(f"Cost savings: {cache_hits/len(queries)*100:.1f}%")

# Output:
# [CACHE MISS] What is your return policy?
# [CACHE MISS] How do I track my order?
# [CACHE HIT] What is your return policy?
# [CACHE MISS] Do you offer international shipping?
# [CACHE HIT] What is your return policy?
# Cache hits: 2/5 (40%)
# Cost savings: 40%

Production caching with Redis:

import redis
import json

class RedisAnswerCache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.ttl = 86400  # 24 hours

    def _cache_key(self, query: str, model: str) -> str:
        normalized = query.lower().strip()
        return f"answer:{model}:{hashlib.md5(normalized.encode()).hexdigest()}"

    def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
        key = self._cache_key(query, model)
        cached = self.redis_client.get(key)
        return cached.decode() if cached else None

    def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
        key = self._cache_key(query, model)
        self.redis_client.setex(key, self.ttl, answer)

    def stats(self):
        return {
            "cache_size": self.redis_client.dbsize(),
            "memory_usage": self.redis_client.info("memory")["used_memory_human"]
        }

Prompt Caching (Static Context)

Problem: RAG sends same context repeatedly (expensive).

Solution: Anthropic prompt caching (90% cost reduction for static context).

import anthropic

def rag_with_prompt_caching(
    query: str,
    context: str,  # Static context (knowledge base)
    model: str = "claude-3-sonnet-20240229"
):
    """
    RAG with prompt caching for static context.

    First query: Full cost (e.g., $0.01)
    Subsequent queries: 90% discount on cached context (e.g., $0.001)
    """
    client = anthropic.Anthropic()

    response = client.messages.create(
        model=model,
        max_tokens=500,
        system=[
            {
                "type": "text",
                "text": "Answer questions using only the provided context.",
            },
            {
                "type": "text",
                "text": f"Context:\n{context}",
                "cache_control": {"type": "ephemeral"}  # Cache this!
            }
        ],
        messages=[
            {"role": "user", "content": query}
        ]
    )

    return response.content[0].text

# Example
knowledge_base = """
[Large knowledge base with 50,000 tokens of product info, policies, FAQs...]
"""

# Query 1: Full cost (write context to cache)
answer1 = rag_with_prompt_caching("What is your return policy?", knowledge_base)
# Cost: Input (50k tokens × $0.003/1k) + Cache write (50k × $0.00375/1k) = $0.34

# Query 2-100: 90% discount on cached context!
answer2 = rag_with_prompt_caching("How do I track my order?", knowledge_base)
# Cost: Cached input (50k × $0.0003/1k) + Query (20 tokens × $0.003/1k) = $0.015 + $0.00006 = $0.015

# Savings: Query 2-100 cost $0.015 vs $0.34 = 95.6% reduction per query!

When prompt caching is effective:

Scenario Static Context Dynamic Content Cache Savings
RAG with knowledge base 50k tokens (policies, products) Query (20 tokens) 95%+
Multi-turn chat with instructions 1k tokens (system message) Conversation (varying) 60-80%
Document analysis 10k tokens (document) Multiple questions 90%+
Code review with context 5k tokens (codebase) Review comments 85%+

Part 3: Model Routing

Task-Based Model Selection

Problem: Using GPT-4 for everything is 10× expensive.

Solution: Route by task complexity (GPT-3.5 for simple, GPT-4 for complex).

from enum import Enum
from typing import Dict

class TaskType(Enum):
    CLASSIFICATION = "classification"
    EXTRACTION = "extraction"
    SUMMARIZATION = "summarization"
    TRANSLATION = "translation"
    REASONING = "reasoning"
    CREATIVE = "creative"
    CODE_GENERATION = "code_generation"

class ModelRouter:
    """Route queries to appropriate model based on task complexity."""

    # Model configurations
    MODELS = {
        "gpt-3.5-turbo": {
            "cost_per_1k_input": 0.0015,
            "cost_per_1k_output": 0.002,
            "latency_factor": 1.0,  # Baseline
            "quality_score": 0.85
        },
        "gpt-4": {
            "cost_per_1k_input": 0.03,
            "cost_per_1k_output": 0.06,
            "latency_factor": 2.5,
            "quality_score": 0.95
        },
        "gpt-4-turbo": {
            "cost_per_1k_input": 0.01,
            "cost_per_1k_output": 0.03,
            "latency_factor": 1.5,
            "quality_score": 0.94
        }
    }

    # Task → Model mapping
    TASK_ROUTING = {
        TaskType.CLASSIFICATION: "gpt-3.5-turbo",  # Simple task
        TaskType.EXTRACTION: "gpt-3.5-turbo",
        TaskType.SUMMARIZATION: "gpt-3.5-turbo",
        TaskType.TRANSLATION: "gpt-3.5-turbo",
        TaskType.REASONING: "gpt-4",  # Complex reasoning
        TaskType.CREATIVE: "gpt-4",  # Better creativity
        TaskType.CODE_GENERATION: "gpt-4"  # Better coding
    }

    @classmethod
    def route(cls, task_type: TaskType, complexity: str = "medium") -> str:
        """
        Route to appropriate model.

        Args:
            task_type: Type of task
            complexity: "low", "medium", "high"

        Returns:
            Model name
        """
        base_model = cls.TASK_ROUTING[task_type]

        # Override for high complexity
        if complexity == "high" and base_model == "gpt-3.5-turbo":
            return "gpt-4-turbo"  # Upgrade for complex variants

        return base_model

    @classmethod
    def calculate_cost(cls, model: str, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost for model."""
        config = cls.MODELS[model]
        input_cost = (input_tokens / 1000) * config["cost_per_1k_input"]
        output_cost = (output_tokens / 1000) * config["cost_per_1k_output"]
        return input_cost + output_cost

    @classmethod
    def compare_models(cls, task_type: TaskType, input_tokens: int = 500, output_tokens: int = 200):
        """Compare models for a task."""
        print(f"\nTask: {task_type.value}")
        print(f"Input: {input_tokens} tokens, Output: {output_tokens} tokens\n")

        for model_name, config in cls.MODELS.items():
            cost = cls.calculate_cost(model_name, input_tokens, output_tokens)
            quality = config["quality_score"]
            latency = config["latency_factor"]

            print(f"{model_name}:")
            print(f"  Cost: ${cost:.4f}")
            print(f"  Quality: {quality:.0%}")
            print(f"  Latency: {latency:.1f}× baseline")
            print(f"  Cost per quality point: ${cost/quality:.4f}\n")

# Example usage
router = ModelRouter()

# Classification task
model = router.route(TaskType.CLASSIFICATION, complexity="low")
print(f"Classification → {model}")  # gpt-3.5-turbo

# Complex reasoning task
model = router.route(TaskType.REASONING, complexity="high")
print(f"Complex reasoning → {model}")  # gpt-4

# Compare costs
router.compare_models(TaskType.CLASSIFICATION, input_tokens=500, output_tokens=200)
# Output:
# gpt-3.5-turbo: $0.0015 (Cost per quality: $0.0018)
# gpt-4: $0.0270 (Cost per quality: $0.0284) - 18× more expensive!
# Recommendation: Use GPT-3.5 for classification (18× cheaper, acceptable quality)

Model Cascade (Try Cheap First)

Problem: Don't know if task needs GPT-4 until you try.

Solution: Try GPT-3.5, escalate to GPT-4 if unsatisfied.

def cascade_generation(
    prompt: str,
    quality_threshold: float = 0.8,
    max_attempts: int = 2
) -> tuple[str, str, float]:
    """
    Try cheaper model first, escalate if quality insufficient.

    Args:
        prompt: User prompt
        quality_threshold: Minimum quality score (0-1)
        max_attempts: Max escalation attempts

    Returns:
        (response, model_used, estimated_quality)
    """
    models = ["gpt-3.5-turbo", "gpt-4-turbo", "gpt-4"]

    for i, model in enumerate(models[:max_attempts]):
        response = openai.ChatCompletion.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )

        result = response.choices[0].message.content

        # Estimate quality (simplified - use LLM-as-judge in production)
        quality = estimate_quality(result, prompt)

        if quality >= quality_threshold:
            print(f"✓ {model} met quality threshold ({quality:.2f} >= {quality_threshold})")
            return result, model, quality
        else:
            print(f"✗ {model} below threshold ({quality:.2f} < {quality_threshold}), escalating...")

    # Return best attempt even if below threshold
    return result, models[max_attempts-1], quality

def estimate_quality(response: str, prompt: str) -> float:
    """
    Estimate quality score (0-1).

    Production: Use LLM-as-judge or other quality metrics.
    """
    # Simplified heuristic
    if len(response) < 20:
        return 0.3  # Too short
    elif len(response) > 500:
        return 0.9  # Detailed
    else:
        return 0.7  # Moderate

# Example
prompt = "Explain quantum entanglement in simple terms."

result, model, quality = cascade_generation(prompt, quality_threshold=0.8)

print(f"\nFinal result:")
print(f"Model: {model}")
print(f"Quality: {quality:.2f}")
print(f"Response: {result[:200]}...")

# Average case: GPT-3.5 suffices (90% of queries)
# Cost: $0.002 per query

# Complex case: Escalate to GPT-4 (10% of queries)
# Cost: $0.002 (GPT-3.5 attempt) + $0.030 (GPT-4) = $0.032

# Overall cost: 0.9 × $0.002 + 0.1 × $0.032 = $0.0018 + $0.0032 = $0.005
# vs Always GPT-4: $0.030
# Savings: 83%!

Part 4: Streaming

Streaming for Long-Form Generation

Problem: 20-second wait for full article (40% bounce rate).

Solution: Stream tokens as generated (perceived latency: 0.5s).

import openai

def generate_streaming(prompt: str, model: str = "gpt-4"):
    """
    Generate response with streaming.

    Benefits:
    - First token in 0.5s (vs 20s wait)
    - User sees progress (engagement)
    - Can cancel early if needed
    """
    response = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=2000,
        stream=True  # Enable streaming
    )

    full_response = ""

    for chunk in response:
        if chunk.choices[0].delta.get("content"):
            token = chunk.choices[0].delta.content
            full_response += token
            print(token, end="", flush=True)  # Display immediately

    print()  # Newline
    return full_response

# Example
prompt = "Write a detailed article about the history of artificial intelligence."

# Without streaming: Wait 20s, then see full article
# With streaming: See first words in 0.5s, smooth streaming for 20s
article = generate_streaming(prompt)

# User experience improvement:
# - Perceived latency: 20s → 0.5s (40× better!)
# - Bounce rate: 40% → 5% (35pp improvement!)
# - Satisfaction: 3.2/5 → 4.3/5 (+1.1 points!)

Streaming in Web Applications

Flask with Server-Sent Events (SSE):

from flask import Flask, Response, request
import openai

app = Flask(__name__)

@app.route('/generate', methods=['POST'])
def generate_stream():
    """Stream generation results to frontend."""
    prompt = request.json.get('prompt')

    def event_stream():
        """Generator for SSE."""
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )

        for chunk in response:
            if chunk.choices[0].delta.get("content"):
                token = chunk.choices[0].delta.content
                # SSE format: "data: {content}\n\n"
                yield f"data: {token}\n\n"

        # Signal completion
        yield "data: [DONE]\n\n"

    return Response(event_stream(), mimetype="text/event-stream")

# Frontend (JavaScript):
"""
const eventSource = new EventSource('/generate', {
    method: 'POST',
    body: JSON.stringify({prompt: userPrompt})
});

eventSource.onmessage = (event) => {
    if (event.data === '[DONE]') {
        eventSource.close();
    } else {
        // Append token to display
        document.getElementById('output').innerText += event.data;
    }
};
"""

Part 5: Cost-Latency-Quality Trade-offs

Multi-Objective Optimization

Problem: Optimizing single objective (cost OR latency) leads to poor trade-offs.

Solution: Pareto analysis to find balanced solutions.

import numpy as np
from typing import List, Dict

class OptimizationOption:
    def __init__(
        self,
        name: str,
        latency_p95: float,  # seconds
        cost_per_1k: float,  # dollars
        quality_score: float  # 0-1
    ):
        self.name = name
        self.latency_p95 = latency_p95
        self.cost_per_1k = cost_per_1k
        self.quality_score = quality_score

    def dominates(self, other: 'OptimizationOption') -> bool:
        """Check if this option dominates another (Pareto dominance)."""
        # Dominate if: better or equal in all dimensions, strictly better in at least one
        better_latency = self.latency_p95 <= other.latency_p95
        better_cost = self.cost_per_1k <= other.cost_per_1k
        better_quality = self.quality_score >= other.quality_score

        strictly_better = (
            self.latency_p95 < other.latency_p95 or
            self.cost_per_1k < other.cost_per_1k or
            self.quality_score > other.quality_score
        )

        return better_latency and better_cost and better_quality and strictly_better

    def __repr__(self):
        return f"{self.name}: {self.latency_p95:.2f}s, ${self.cost_per_1k:.3f}/1k, {self.quality_score:.2f} quality"

def find_pareto_optimal(options: List[OptimizationOption]) -> List[OptimizationOption]:
    """Find Pareto optimal solutions (non-dominated options)."""
    pareto_optimal = []

    for option in options:
        is_dominated = False
        for other in options:
            if other.dominates(option):
                is_dominated = True
                break

        if not is_dominated:
            pareto_optimal.append(option)

    return pareto_optimal

# Example: RAG chatbot optimization
options = [
    OptimizationOption("GPT-4, no caching", latency_p95=2.5, cost_per_1k=10.0, quality_score=0.92),
    OptimizationOption("GPT-3.5, no caching", latency_p95=0.8, cost_per_1k=2.0, quality_score=0.78),
    OptimizationOption("GPT-3.5 + caching", latency_p95=0.6, cost_per_1k=1.2, quality_score=0.78),
    OptimizationOption("GPT-3.5 + caching + prompt eng", latency_p95=0.7, cost_per_1k=1.3, quality_score=0.85),
    OptimizationOption("GPT-4 + caching", latency_p95=2.0, cost_per_1k=6.0, quality_score=0.92),
    OptimizationOption("GPT-4-turbo + caching", latency_p95=1.2, cost_per_1k=4.0, quality_score=0.90),
]

# Find Pareto optimal
pareto = find_pareto_optimal(options)

print("Pareto Optimal Solutions:")
for opt in pareto:
    print(f"  {opt}")

# Output:
# Pareto Optimal Solutions:
#   GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
#   GPT-4-turbo + caching: 1.20s, $4.000/1k, 0.90 quality
#   GPT-4 + caching: 2.00s, $6.000/1k, 0.92 quality

# Interpretation:
# - If budget-conscious: GPT-3.5 + caching + prompt eng ($1.30/1k, 0.85 quality)
# - If quality-critical: GPT-4-turbo + caching ($4/1k, 0.90 quality, faster than GPT-4)
# - If maximum quality needed: GPT-4 + caching ($6/1k, 0.92 quality)

Requirements-Based Selection

def select_optimal_solution(
    options: List[OptimizationOption],
    max_latency: float = None,
    max_cost: float = None,
    min_quality: float = None
) -> OptimizationOption:
    """
    Select optimal solution given constraints.

    Args:
        options: Available options
        max_latency: Maximum acceptable latency (seconds)
        max_cost: Maximum cost per 1k queries (dollars)
        min_quality: Minimum quality score (0-1)

    Returns:
        Best option meeting all constraints
    """
    # Filter options meeting constraints
    feasible = []
    for opt in options:
        meets_latency = max_latency is None or opt.latency_p95 <= max_latency
        meets_cost = max_cost is None or opt.cost_per_1k <= max_cost
        meets_quality = min_quality is None or opt.quality_score >= min_quality

        if meets_latency and meets_cost and meets_quality:
            feasible.append(opt)

    if not feasible:
        raise ValueError("No solution meets all constraints")

    # Among feasible, select best cost-quality trade-off
    best = min(feasible, key=lambda opt: opt.cost_per_1k / opt.quality_score)

    return best

# Example: Requirements
requirements = {
    "max_latency": 1.0,  # Must respond within 1 second
    "max_cost": 5.0,     # Budget: $5 per 1k queries
    "min_quality": 0.85  # Minimum 85% quality
}

selected = select_optimal_solution(
    options,
    max_latency=requirements["max_latency"],
    max_cost=requirements["max_cost"],
    min_quality=requirements["min_quality"]
)

print(f"Selected solution: {selected}")
# Output: GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
# (Meets all constraints, most cost-effective)

Part 6: Production Monitoring

Performance Metrics Tracking

import time
from dataclasses import dataclass
from typing import List
import numpy as np

@dataclass
class QueryMetrics:
    """Metrics for a single query."""
    latency_ms: float
    input_tokens: int
    output_tokens: int
    cost: float
    cache_hit: bool
    model: str

class PerformanceMonitor:
    """Track and analyze performance metrics."""

    def __init__(self):
        self.metrics: List[QueryMetrics] = []

    def log_query(
        self,
        latency_ms: float,
        input_tokens: int,
        output_tokens: int,
        cost: float,
        cache_hit: bool,
        model: str
    ):
        """Log query metrics."""
        self.metrics.append(QueryMetrics(
            latency_ms=latency_ms,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            cache_hit=cache_hit,
            model=model
        ))

    def summary(self) -> Dict:
        """Generate summary statistics."""
        if not self.metrics:
            return {}

        latencies = [m.latency_ms for m in self.metrics]
        costs = [m.cost for m in self.metrics]
        cache_hits = [m.cache_hit for m in self.metrics]

        return {
            "total_queries": len(self.metrics),
            "latency_p50": np.percentile(latencies, 50),
            "latency_p95": np.percentile(latencies, 95),
            "latency_p99": np.percentile(latencies, 99),
            "avg_cost": np.mean(costs),
            "total_cost": np.sum(costs),
            "cache_hit_rate": np.mean(cache_hits) * 100,
            "queries_per_model": self._count_by_model()
        }

    def _count_by_model(self) -> Dict[str, int]:
        """Count queries by model."""
        counts = {}
        for m in self.metrics:
            counts[m.model] = counts.get(m.model, 0) + 1
        return counts

# Example usage
monitor = PerformanceMonitor()

# Simulate queries
for i in range(1000):
    cache_hit = np.random.random() < 0.6  # 60% cache hit rate
    latency = 100 if cache_hit else 800  # Cache: 100ms, API: 800ms
    cost = 0 if cache_hit else 0.002

    monitor.log_query(
        latency_ms=latency,
        input_tokens=500,
        output_tokens=200,
        cost=cost,
        cache_hit=cache_hit,
        model="gpt-3.5-turbo"
    )

# Generate summary
summary = monitor.summary()

print("Performance Summary:")
print(f"  Total queries: {summary['total_queries']}")
print(f"  Latency P50: {summary['latency_p50']:.0f}ms")
print(f"  Latency P95: {summary['latency_p95']:.0f}ms")
print(f"  Avg cost: ${summary['avg_cost']:.4f}")
print(f"  Total cost: ${summary['total_cost']:.2f}")
print(f"  Cache hit rate: {summary['cache_hit_rate']:.1f}%")

Summary

Inference optimization is systematic, not ad-hoc.

Core techniques:

  1. Parallelization: Async/await (10× throughput), Batch API (50% cheaper)
  2. Caching: Answer caching (60% savings), Prompt caching (90% savings)
  3. Model routing: GPT-3.5 for simple tasks (10× cheaper), GPT-4 for complex
  4. Streaming: First token in 0.5s (vs 20s wait), 35pp better completion rate
  5. Multi-objective: Pareto analysis (balance cost-latency-quality)

Checklist:

  1. ✓ Measure baseline (latency, cost, quality)
  2. ✓ Set requirements (acceptable latency, budget, quality threshold)
  3. ✓ Parallelize batch processing (10× throughput)
  4. ✓ Implement caching (60-90% cost savings)
  5. ✓ Route by task complexity (10× cost savings)
  6. ✓ Stream long responses (better UX)
  7. ✓ Analyze cost-latency-quality trade-offs (Pareto optimal)
  8. ✓ Monitor production metrics (track improvements)

Production-ready performance requires deliberate optimization across multiple dimensions.