Claude Code Plugins

Community-maintained marketplace

Feedback

|

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name openrouter-usage-analytics
description Track and analyze OpenRouter usage patterns. Use when optimizing costs or understanding usage. Trigger with phrases like 'openrouter analytics', 'openrouter usage', 'openrouter metrics', 'track openrouter'.
allowed-tools Read, Write, Edit, Grep
version 1.0.0
license MIT
author Jeremy Longshore <jeremy@intentsolutions.io>

OpenRouter Usage Analytics

Overview

This skill covers implementing usage tracking, building dashboards, and analyzing patterns to optimize your OpenRouter usage.

Prerequisites

  • OpenRouter integration
  • Analytics/metrics infrastructure (optional)

Instructions

Follow these steps to implement this skill:

  1. Verify Prerequisites: Ensure all prerequisites listed above are met
  2. Review the Implementation: Study the code examples and patterns below
  3. Adapt to Your Environment: Modify configuration values for your setup
  4. Test the Integration: Run the verification steps to confirm functionality
  5. Monitor in Production: Set up appropriate logging and monitoring

Overview

This skill covers implementing usage tracking, building dashboards, and analyzing patterns to optimize your OpenRouter usage.

Prerequisites

  • OpenRouter integration
  • Analytics/metrics infrastructure (optional)

Dashboard Activity

Access OpenRouter Dashboard

openrouter.ai/activity shows:
- Request history
- Token usage
- Cost breakdown
- Model distribution
- Daily/weekly/monthly views

API Usage Information

# Check current key stats
curl https://openrouter.ai/api/v1/auth/key \
  -H "Authorization: Bearer $OPENROUTER_API_KEY"

Custom Analytics Implementation

Usage Logger

from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
import json

@dataclass
class UsageRecord:
    timestamp: datetime
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    cost: float
    user_id: str = None
    tags: list = None

class UsageAnalytics:
    def __init__(self):
        self.records: list[UsageRecord] = []

    def record(
        self,
        response,
        model: str,
        latency_ms: float,
        user_id: str = None,
        tags: list = None
    ):
        cost = self._calculate_cost(
            model,
            response.usage.prompt_tokens,
            response.usage.completion_tokens
        )

        record = UsageRecord(
            timestamp=datetime.now(),
            model=model,
            prompt_tokens=response.usage.prompt_tokens,
            completion_tokens=response.usage.completion_tokens,
            latency_ms=latency_ms,
            cost=cost,
            user_id=user_id,
            tags=tags or []
        )

        self.records.append(record)
        return record

    def _calculate_cost(
        self,
        model: str,
        prompt_tokens: int,
        completion_tokens: int
    ) -> float:
        prices = {
            "openai/gpt-4-turbo": (10.0, 30.0),
            "anthropic/claude-3.5-sonnet": (3.0, 15.0),
            "anthropic/claude-3-haiku": (0.25, 1.25),
        }
        p_price, c_price = prices.get(model, (10.0, 30.0))
        return (
            prompt_tokens * p_price / 1_000_000 +
            completion_tokens * c_price / 1_000_000
        )

    def get_summary(
        self,
        start: datetime = None,
        end: datetime = None
    ) -> dict:
        """Get usage summary for time period."""
        filtered = self._filter_by_time(start, end)

        if not filtered:
            return {"total_requests": 0}

        total_cost = sum(r.cost for r in filtered)
        total_tokens = sum(r.prompt_tokens + r.completion_tokens for r in filtered)

        return {
            "total_requests": len(filtered),
            "total_tokens": total_tokens,
            "total_cost": total_cost,
            "avg_latency_ms": sum(r.latency_ms for r in filtered) / len(filtered),
            "avg_cost_per_request": total_cost / len(filtered),
        }

    def _filter_by_time(
        self,
        start: datetime = None,
        end: datetime = None
    ) -> list[UsageRecord]:
        filtered = self.records
        if start:
            filtered = [r for r in filtered if r.timestamp >= start]
        if end:
            filtered = [r for r in filtered if r.timestamp <= end]
        return filtered

analytics = UsageAnalytics()

Model Analytics

def get_model_breakdown(self) -> dict:
    """Analyze usage by model."""
    by_model = defaultdict(lambda: {
        "requests": 0,
        "tokens": 0,
        "cost": 0.0,
        "avg_latency": []
    })

    for record in self.records:
        by_model[record.model]["requests"] += 1
        by_model[record.model]["tokens"] += (
            record.prompt_tokens + record.completion_tokens
        )
        by_model[record.model]["cost"] += record.cost
        by_model[record.model]["avg_latency"].append(record.latency_ms)

    # Calculate averages
    result = {}
    for model, data in by_model.items():
        result[model] = {
            "requests": data["requests"],
            "tokens": data["tokens"],
            "cost": data["cost"],
            "avg_latency_ms": (
                sum(data["avg_latency"]) / len(data["avg_latency"])
                if data["avg_latency"] else 0
            ),
            "cost_per_request": data["cost"] / data["requests"]
        }

    return result

Time Series Analytics

def get_daily_stats(self, days: int = 30) -> list[dict]:
    """Get daily statistics."""
    end = datetime.now()
    start = end - timedelta(days=days)

    daily = defaultdict(lambda: {
        "requests": 0,
        "tokens": 0,
        "cost": 0.0
    })

    for record in self._filter_by_time(start, end):
        day = record.timestamp.date().isoformat()
        daily[day]["requests"] += 1
        daily[day]["tokens"] += record.prompt_tokens + record.completion_tokens
        daily[day]["cost"] += record.cost

    # Fill in missing days
    result = []
    current = start.date()
    while current <= end.date():
        day_str = current.isoformat()
        result.append({
            "date": day_str,
            **daily.get(day_str, {"requests": 0, "tokens": 0, "cost": 0.0})
        })
        current += timedelta(days=1)

    return result

def get_hourly_distribution(self) -> dict:
    """Get request distribution by hour."""
    hourly = defaultdict(int)

    for record in self.records:
        hour = record.timestamp.hour
        hourly[hour] += 1

    return {str(h).zfill(2): hourly.get(h, 0) for h in range(24)}

User Analytics

def get_user_stats(self) -> dict:
    """Analyze usage by user."""
    by_user = defaultdict(lambda: {
        "requests": 0,
        "tokens": 0,
        "cost": 0.0,
        "models_used": set()
    })

    for record in self.records:
        user_id = record.user_id or "anonymous"
        by_user[user_id]["requests"] += 1
        by_user[user_id]["tokens"] += (
            record.prompt_tokens + record.completion_tokens
        )
        by_user[user_id]["cost"] += record.cost
        by_user[user_id]["models_used"].add(record.model)

    # Convert sets to lists for JSON serialization
    return {
        user_id: {
            "requests": data["requests"],
            "tokens": data["tokens"],
            "cost": data["cost"],
            "models_used": list(data["models_used"])
        }
        for user_id, data in by_user.items()
    }

def get_top_users(self, limit: int = 10) -> list[dict]:
    """Get top users by cost."""
    user_stats = self.get_user_stats()
    sorted_users = sorted(
        user_stats.items(),
        key=lambda x: x[1]["cost"],
        reverse=True
    )
    return [
        {"user_id": uid, **stats}
        for uid, stats in sorted_users[:limit]
    ]

Performance Analytics

Latency Analysis

import statistics

def get_latency_stats(self, model: str = None) -> dict:
    """Get latency statistics."""
    if model:
        latencies = [r.latency_ms for r in self.records if r.model == model]
    else:
        latencies = [r.latency_ms for r in self.records]

    if not latencies:
        return {"error": "No data"}

    return {
        "count": len(latencies),
        "mean": statistics.mean(latencies),
        "median": statistics.median(latencies),
        "stdev": statistics.stdev(latencies) if len(latencies) > 1 else 0,
        "min": min(latencies),
        "max": max(latencies),
        "p50": statistics.quantiles(latencies, n=100)[49] if len(latencies) >= 2 else latencies[0],
        "p95": statistics.quantiles(latencies, n=100)[94] if len(latencies) >= 2 else latencies[0],
        "p99": statistics.quantiles(latencies, n=100)[98] if len(latencies) >= 2 else latencies[0],
    }

def get_latency_by_model(self) -> dict:
    """Compare latency across models."""
    models = set(r.model for r in self.records)
    return {model: self.get_latency_stats(model) for model in models}

Error Rate Tracking

class ErrorTracker:
    def __init__(self):
        self.errors = []
        self.successes = 0

    def record_success(self, model: str):
        self.successes += 1

    def record_error(self, model: str, error_type: str, error_message: str):
        self.errors.append({
            "timestamp": datetime.now().isoformat(),
            "model": model,
            "error_type": error_type,
            "error_message": error_message
        })

    def get_error_rate(self) -> float:
        total = self.successes + len(self.errors)
        if total == 0:
            return 0.0
        return len(self.errors) / total

    def get_errors_by_type(self) -> dict:
        by_type = defaultdict(int)
        for error in self.errors:
            by_type[error["error_type"]] += 1
        return dict(by_type)

    def get_errors_by_model(self) -> dict:
        by_model = defaultdict(int)
        for error in self.errors:
            by_model[error["model"]] += 1
        return dict(by_model)

error_tracker = ErrorTracker()

Dashboard Export

Generate Report

def generate_analytics_report(analytics: UsageAnalytics) -> dict:
    """Generate comprehensive analytics report."""
    now = datetime.now()
    last_24h = now - timedelta(hours=24)
    last_7d = now - timedelta(days=7)
    last_30d = now - timedelta(days=30)

    return {
        "generated_at": now.isoformat(),
        "summary": {
            "last_24h": analytics.get_summary(start=last_24h),
            "last_7d": analytics.get_summary(start=last_7d),
            "last_30d": analytics.get_summary(start=last_30d),
            "all_time": analytics.get_summary(),
        },
        "model_breakdown": analytics.get_model_breakdown(),
        "daily_trend": analytics.get_daily_stats(30),
        "hourly_distribution": analytics.get_hourly_distribution(),
        "latency_stats": analytics.get_latency_by_model(),
        "top_users": analytics.get_top_users(10),
    }

def export_to_json(analytics: UsageAnalytics, filepath: str):
    """Export report to JSON file."""
    report = generate_analytics_report(analytics)
    with open(filepath, 'w') as f:
        json.dump(report, f, indent=2, default=str)

CSV Export

import csv

def export_to_csv(analytics: UsageAnalytics, filepath: str):
    """Export raw records to CSV."""
    with open(filepath, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([
            "timestamp", "model", "prompt_tokens", "completion_tokens",
            "latency_ms", "cost", "user_id"
        ])

        for record in analytics.records:
            writer.writerow([
                record.timestamp.isoformat(),
                record.model,
                record.prompt_tokens,
                record.completion_tokens,
                record.latency_ms,
                record.cost,
                record.user_id or ""
            ])

Visualization

Terminal Dashboard

def print_dashboard(analytics: UsageAnalytics):
    """Print simple terminal dashboard."""
    summary = analytics.get_summary()
    models = analytics.get_model_breakdown()

    print("=" * 60)
    print("OpenRouter Usage Dashboard")
    print("=" * 60)

    print(f"\nTotal Requests: {summary['total_requests']}")
    print(f"Total Tokens: {summary['total_tokens']:,}")
    print(f"Total Cost: ${summary['total_cost']:.4f}")
    print(f"Avg Latency: {summary['avg_latency_ms']:.0f}ms")

    print("\nBy Model:")
    print("-" * 60)
    for model, stats in sorted(models.items(), key=lambda x: -x[1]["cost"]):
        print(f"  {model}")
        print(f"    Requests: {stats['requests']}, "
              f"Cost: ${stats['cost']:.4f}, "
              f"Latency: {stats['avg_latency_ms']:.0f}ms")

print_dashboard(analytics)

Output

Successful execution produces:

  • Working OpenRouter integration
  • Verified API connectivity
  • Example responses demonstrating functionality

Error Handling

Common errors and solutions:

  1. 401 Unauthorized: Check API key format (must start with sk-or-)
  2. 429 Rate Limited: Implement exponential backoff
  3. 500 Server Error: Retry with backoff, check OpenRouter status page
  4. Model Not Found: Verify model ID includes provider prefix

Examples

See code examples in sections above for complete, runnable implementations.

Resources

Output

Successful execution produces:

  • Working OpenRouter integration
  • Verified API connectivity
  • Example responses demonstrating functionality

Error Handling

Common errors and solutions:

  1. 401 Unauthorized: Check API key format (must start with sk-or-)
  2. 429 Rate Limited: Implement exponential backoff
  3. 500 Server Error: Retry with backoff, check OpenRouter status page
  4. Model Not Found: Verify model ID includes provider prefix

Examples

See code examples in sections above for complete, runnable implementations.

Resources