Claude Code Plugins

Community-maintained marketplace

Feedback

production-monitoring-and-alerting

@tachyon-beep/skillpacks
1
0

Monitor production models with performance metrics, drift detection, and alerting.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name production-monitoring-and-alerting
description Monitor production models with performance metrics, drift detection, and alerting.

Production Monitoring and Alerting

Overview

Comprehensive production monitoring and alerting for ML systems. Implements performance metrics (RED), model quality tracking, drift detection, dashboard design, alert rules, and SLAs/SLOs.

Core Principle: You can't improve what you don't measure. Monitoring is non-negotiable for production ML - deploy with observability or don't deploy.

Section 1: Performance Metrics (RED Metrics)

Foundation: Rate, Errors, Duration

Every ML service must track:

from prometheus_client import Counter, Histogram, Gauge
import time
import functools

# REQUEST RATE (R)
REQUEST_COUNT = Counter(
    'ml_requests_total',
    'Total ML inference requests',
    ['model_name', 'endpoint', 'model_version']
)

# ERROR RATE (E)
ERROR_COUNT = Counter(
    'ml_errors_total',
    'Total ML inference errors',
    ['model_name', 'endpoint', 'error_type']
)

# DURATION (D) - Latency
REQUEST_LATENCY = Histogram(
    'ml_request_duration_seconds',
    'ML inference request latency',
    ['model_name', 'endpoint'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]  # Customize for your SLO
)

# Additional: In-flight requests (for load monitoring)
IN_PROGRESS = Gauge(
    'ml_requests_in_progress',
    'ML inference requests currently being processed',
    ['model_name']
)

def monitor_ml_endpoint(model_name: str, endpoint: str):
    """Decorator to monitor any ML endpoint"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            REQUEST_COUNT.labels(
                model_name=model_name,
                endpoint=endpoint,
                model_version=get_model_version()
            ).inc()

            IN_PROGRESS.labels(model_name=model_name).inc()
            start_time = time.time()

            try:
                result = func(*args, **kwargs)
                REQUEST_LATENCY.labels(
                    model_name=model_name,
                    endpoint=endpoint
                ).observe(time.time() - start_time)
                return result

            except Exception as e:
                ERROR_COUNT.labels(
                    model_name=model_name,
                    endpoint=endpoint,
                    error_type=type(e).__name__
                ).inc()
                raise

            finally:
                IN_PROGRESS.labels(model_name=model_name).dec()

        return wrapper
    return decorator

# Usage example
@monitor_ml_endpoint(model_name="sentiment_classifier", endpoint="/predict")
def predict_sentiment(text: str):
    result = model.predict(text)
    return result

Latency Percentiles (P50, P95, P99)

# Prometheus automatically calculates percentiles from Histogram
# Query in Prometheus:
#   P50: histogram_quantile(0.50, rate(ml_request_duration_seconds_bucket[5m]))
#   P95: histogram_quantile(0.95, rate(ml_request_duration_seconds_bucket[5m]))
#   P99: histogram_quantile(0.99, rate(ml_request_duration_seconds_bucket[5m]))

# For custom tracking:
import numpy as np
from collections import deque

class LatencyTracker:
    def __init__(self, window_size=1000):
        self.latencies = deque(maxlen=window_size)

    def record(self, latency_seconds):
        self.latencies.append(latency_seconds)

    def get_percentiles(self):
        if not self.latencies:
            return None
        arr = np.array(self.latencies)
        return {
            "p50": np.percentile(arr, 50),
            "p95": np.percentile(arr, 95),
            "p99": np.percentile(arr, 99),
            "mean": np.mean(arr),
            "max": np.max(arr)
        }

Throughput Tracking

THROUGHPUT_GAUGE = Gauge(
    'ml_throughput_requests_per_second',
    'Current requests per second',
    ['model_name']
)

class ThroughputMonitor:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.request_times = deque()

    def record_request(self):
        now = time.time()
        self.request_times.append(now)

        # Keep only last 60 seconds
        cutoff = now - 60
        while self.request_times and self.request_times[0] < cutoff:
            self.request_times.popleft()

        # Update gauge
        throughput = len(self.request_times) / 60.0
        THROUGHPUT_GAUGE.labels(model_name=self.model_name).set(throughput)

Section 2: Model Quality Metrics

Prediction Distribution Tracking

from prometheus_client import Counter

PREDICTION_COUNT = Counter(
    'ml_predictions_by_class',
    'Total predictions by class label',
    ['model_name', 'predicted_class']
)

def track_prediction(model_name: str, prediction: str):
    PREDICTION_COUNT.labels(
        model_name=model_name,
        predicted_class=prediction
    ).inc()

# Example: Sentiment classifier
result = model.predict("Great product!")  # Returns "positive"
track_prediction("sentiment_classifier", result)

# Dashboard query: Check if prediction distribution is shifting
# rate(ml_predictions_by_class{predicted_class="positive"}[1h])

Confidence Distribution Tracking

CONFIDENCE_HISTOGRAM = Histogram(
    'ml_prediction_confidence',
    'Model prediction confidence scores',
    ['model_name'],
    buckets=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)

LOW_CONFIDENCE_COUNT = Counter(
    'ml_low_confidence_predictions',
    'Predictions below confidence threshold',
    ['model_name', 'threshold']
)

def track_confidence(model_name: str, confidence: float, threshold: float = 0.7):
    CONFIDENCE_HISTOGRAM.labels(model_name=model_name).observe(confidence)

    if confidence < threshold:
        LOW_CONFIDENCE_COUNT.labels(
            model_name=model_name,
            threshold=str(threshold)
        ).inc()

# Alert if low confidence predictions increase (model uncertainty rising)

Per-Segment Performance

SEGMENT_ACCURACY_GAUGE = Gauge(
    'ml_accuracy_by_segment',
    'Model accuracy for different data segments',
    ['model_name', 'segment']
)

class SegmentPerformanceTracker:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.segments = {}  # segment -> {"correct": X, "total": Y}

    def record_prediction(self, segment: str, is_correct: bool):
        if segment not in self.segments:
            self.segments[segment] = {"correct": 0, "total": 0}

        self.segments[segment]["total"] += 1
        if is_correct:
            self.segments[segment]["correct"] += 1

        # Update gauge
        accuracy = self.segments[segment]["correct"] / self.segments[segment]["total"]
        SEGMENT_ACCURACY_GAUGE.labels(
            model_name=self.model_name,
            segment=segment
        ).set(accuracy)

# Example: E-commerce recommendations
tracker = SegmentPerformanceTracker("recommender")
tracker.record_prediction(segment="electronics", is_correct=True)
tracker.record_prediction(segment="clothing", is_correct=False)

# Alert if accuracy drops for specific segment (targeted debugging)

Ground Truth Sampling

import random
from typing import Optional

class GroundTruthSampler:
    def __init__(self, model_name: str, sampling_rate: float = 0.1):
        """
        sampling_rate: Fraction of predictions to send for human review (0.0-1.0)
        """
        self.model_name = model_name
        self.sampling_rate = sampling_rate
        self.predictions = []
        self.ground_truths = []

    def sample_prediction(self, request_id: str, prediction: dict) -> bool:
        """
        Returns True if prediction should be sent for human review
        """
        if random.random() < self.sampling_rate:
            self.predictions.append({
                "request_id": request_id,
                "prediction": prediction,
                "timestamp": time.time()
            })
            # Send to review queue (e.g., Label Studio, human review dashboard)
            send_to_review_queue(request_id, prediction)
            return True
        return False

    def add_ground_truth(self, request_id: str, ground_truth: str):
        """Human reviewer provides true label"""
        self.ground_truths.append({
            "request_id": request_id,
            "ground_truth": ground_truth,
            "timestamp": time.time()
        })

        # Calculate rolling accuracy
        if len(self.ground_truths) >= 100:
            self.calculate_accuracy()

    def calculate_accuracy(self):
        """Calculate accuracy on last N samples"""
        recent = self.ground_truths[-100:]
        pred_map = {p["request_id"]: p["prediction"] for p in self.predictions}

        correct = sum(
            1 for gt in recent
            if pred_map.get(gt["request_id"]) == gt["ground_truth"]
        )

        accuracy = correct / len(recent)

        SEGMENT_ACCURACY_GAUGE.labels(
            model_name=self.model_name,
            segment="ground_truth_sample"
        ).set(accuracy)

        return accuracy

# Usage
sampler = GroundTruthSampler("sentiment_classifier", sampling_rate=0.1)

@app.post("/predict")
def predict(text: str):
    result = model.predict(text)
    request_id = generate_request_id()

    # Sample for human review
    sampler.sample_prediction(request_id, result)

    return {"request_id": request_id, "result": result}

# Later: Human reviewer provides label
@app.post("/feedback")
def feedback(request_id: str, true_label: str):
    sampler.add_ground_truth(request_id, true_label)
    return {"status": "recorded"}

Section 3: Data Drift Detection

Kolmogorov-Smirnov Test (Distribution Comparison)

from scipy.stats import ks_2samp
import numpy as np
from prometheus_client import Gauge

DRIFT_SCORE_GAUGE = Gauge(
    'ml_data_drift_score',
    'KS test D-statistic for data drift',
    ['model_name', 'feature_name']
)

DRIFT_ALERT = Counter(
    'ml_data_drift_alerts',
    'Data drift alerts triggered',
    ['model_name', 'feature_name', 'severity']
)

class DataDriftDetector:
    def __init__(self, model_name: str, reference_data: dict, window_size: int = 1000):
        """
        reference_data: Dict of feature_name -> np.array of training data values
        window_size: Number of production samples before checking drift
        """
        self.model_name = model_name
        self.reference_data = reference_data
        self.window_size = window_size
        self.current_window = {feature: [] for feature in reference_data.keys()}

        # Drift thresholds
        self.thresholds = {
            "info": 0.1,      # Slight shift (log only)
            "warning": 0.15,  # Moderate shift (investigate)
            "critical": 0.25  # Severe shift (retrain needed)
        }

    def add_sample(self, features: dict):
        """Add new production sample"""
        for feature_name, value in features.items():
            if feature_name in self.current_window:
                self.current_window[feature_name].append(value)

        # Check drift when window full
        if len(self.current_window[list(self.current_window.keys())[0]]) >= self.window_size:
            self.check_drift()
            # Reset window
            self.current_window = {feature: [] for feature in self.reference_data.keys()}

    def check_drift(self):
        """Compare current window to reference using KS test"""
        results = {}

        for feature_name in self.reference_data.keys():
            reference = self.reference_data[feature_name]
            current = np.array(self.current_window[feature_name])

            # Kolmogorov-Smirnov test
            statistic, p_value = ks_2samp(reference, current)

            results[feature_name] = {
                "ks_statistic": statistic,
                "p_value": p_value
            }

            # Update Prometheus gauge
            DRIFT_SCORE_GAUGE.labels(
                model_name=self.model_name,
                feature_name=feature_name
            ).set(statistic)

            # Alert if drift detected
            severity = self._get_severity(statistic)
            if severity:
                DRIFT_ALERT.labels(
                    model_name=self.model_name,
                    feature_name=feature_name,
                    severity=severity
                ).inc()
                self._send_alert(feature_name, statistic, p_value, severity)

        return results

    def _get_severity(self, ks_statistic: float) -> Optional[str]:
        """Determine alert severity based on KS statistic"""
        if ks_statistic >= self.thresholds["critical"]:
            return "critical"
        elif ks_statistic >= self.thresholds["warning"]:
            return "warning"
        elif ks_statistic >= self.thresholds["info"]:
            return "info"
        return None

    def _send_alert(self, feature_name: str, ks_stat: float, p_value: float, severity: str):
        """Send drift alert to monitoring system"""
        message = f"""
DATA DRIFT DETECTED

Model: {self.model_name}
Feature: {feature_name}
Severity: {severity.upper()}

KS Statistic: {ks_stat:.3f}
P-value: {p_value:.4f}

Interpretation:
- KS < 0.1: No significant drift
- KS 0.1-0.15: Slight shift (monitor)
- KS 0.15-0.25: Moderate drift (investigate)
- KS > 0.25: Severe drift (retrain recommended)

Action:
1. Review recent input examples
2. Check for data source changes
3. Compare distributions visually
4. Consider retraining if accuracy dropping
        """
        send_alert_to_slack(message)  # Or PagerDuty, email, etc.

# Usage example
# Training data statistics
reference_features = {
    "text_length": np.random.normal(100, 20, 10000),  # Mean 100, std 20
    "sentiment_score": np.random.normal(0.5, 0.2, 10000),  # Mean 0.5, std 0.2
}

drift_detector = DataDriftDetector("sentiment_classifier", reference_features)

@app.post("/predict")
def predict(text: str):
    # Extract features
    features = {
        "text_length": len(text),
        "sentiment_score": get_sentiment_score(text)
    }

    # Track for drift detection
    drift_detector.add_sample(features)

    result = model.predict(text)
    return result

Population Stability Index (PSI) for Concept Drift

import numpy as np

PSI_GAUGE = Gauge(
    'ml_concept_drift_psi',
    'Population Stability Index for concept drift',
    ['model_name']
)

class ConceptDriftDetector:
    def __init__(self, model_name: str, num_bins: int = 10):
        """
        num_bins: Number of bins for PSI calculation
        """
        self.model_name = model_name
        self.num_bins = num_bins
        self.baseline_distribution = None
        self.current_predictions = []
        self.window_size = 1000

        # PSI thresholds
        self.thresholds = {
            "info": 0.1,      # Slight shift
            "warning": 0.2,   # Moderate shift (investigate)
            "critical": 0.25  # Severe shift (model behavior changed)
        }

    def set_baseline(self, predictions: list):
        """Set baseline prediction distribution (from first week of production)"""
        self.baseline_distribution = self._calculate_distribution(predictions)

    def track_prediction(self, prediction: float):
        """Track new prediction (probability or class)"""
        self.current_predictions.append(prediction)

        # Check concept drift when window full
        if len(self.current_predictions) >= self.window_size:
            self.check_concept_drift()
            self.current_predictions = []

    def _calculate_distribution(self, values: list) -> np.ndarray:
        """Calculate binned distribution"""
        hist, _ = np.histogram(values, bins=self.num_bins, range=(0, 1))
        # Convert to proportions
        return hist / len(values)

    def calculate_psi(self, expected: np.ndarray, actual: np.ndarray) -> float:
        """
        Calculate Population Stability Index (PSI)

        PSI = sum((actual% - expected%) * ln(actual% / expected%))

        Interpretation:
        - PSI < 0.1: No significant change
        - PSI 0.1-0.2: Slight change (monitor)
        - PSI > 0.2: Significant change (investigate/retrain)
        """
        # Avoid division by zero
        expected = np.where(expected == 0, 0.0001, expected)
        actual = np.where(actual == 0, 0.0001, actual)

        psi = np.sum((actual - expected) * np.log(actual / expected))
        return psi

    def check_concept_drift(self):
        """Check if model behavior has changed"""
        if self.baseline_distribution is None:
            # Set first window as baseline
            self.baseline_distribution = self._calculate_distribution(self.current_predictions)
            return None

        current_distribution = self._calculate_distribution(self.current_predictions)
        psi = self.calculate_psi(self.baseline_distribution, current_distribution)

        # Update Prometheus gauge
        PSI_GAUGE.labels(model_name=self.model_name).set(psi)

        # Alert if concept drift detected
        severity = self._get_severity(psi)
        if severity:
            self._send_alert(psi, severity)

        return psi

    def _get_severity(self, psi: float) -> Optional[str]:
        if psi >= self.thresholds["critical"]:
            return "critical"
        elif psi >= self.thresholds["warning"]:
            return "warning"
        elif psi >= self.thresholds["info"]:
            return "info"
        return None

    def _send_alert(self, psi: float, severity: str):
        message = f"""
CONCEPT DRIFT DETECTED

Model: {self.model_name}
Severity: {severity.upper()}

PSI: {psi:.3f}

Interpretation:
- PSI < 0.1: No significant change
- PSI 0.1-0.2: Slight change (model behavior shifting)
- PSI > 0.2: Significant change (model may need retraining)

Action:
1. Compare current vs baseline prediction distributions
2. Check if input distribution also changed (data drift?)
3. Validate accuracy on recent samples
4. Consider retraining if accuracy dropping
        """
        send_alert_to_slack(message)

# Usage
concept_drift_detector = ConceptDriftDetector("sentiment_classifier")

@app.post("/predict")
def predict(text: str):
    result = model.predict(text)
    confidence = result["confidence"]

    # Track prediction for concept drift
    concept_drift_detector.track_prediction(confidence)

    return result

Section 4: Dashboard Design

Tiered Dashboard Structure

Dashboard Hierarchy:

Page 1 - SYSTEM HEALTH (single pane of glass):
  Purpose: Answer "Is the system healthy?" in 5 seconds
  Metrics:
    - Request rate (current vs normal)
    - Error rate (% and count)
    - Latency P95 (current vs SLO)
    - Model accuracy (ground truth sample)
  Layout: 4 large panels, color-coded (green/yellow/red)

Page 2 - MODEL QUALITY:
  Purpose: Deep dive into model performance
  Metrics:
    - Prediction distribution (over time)
    - Confidence distribution (histogram)
    - Per-segment accuracy (if applicable)
    - Ground truth accuracy (rolling window)
  Layout: Time series + histograms

Page 3 - DRIFT DETECTION:
  Purpose: Detect model degradation early
  Metrics:
    - Data drift (KS test per feature)
    - Concept drift (PSI over time)
    - Feature distributions (current vs baseline)
  Layout: Time series + distribution comparisons

Page 4 - RESOURCES (only check when alerted):
  Purpose: Debug resource issues
  Metrics:
    - CPU utilization
    - Memory usage (RSS)
    - GPU utilization/memory (if applicable)
    - Disk I/O
  Layout: System resource graphs

Grafana Dashboard Example (JSON)

{
  "dashboard": {
    "title": "ML Model Monitoring - Sentiment Classifier",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(ml_requests_total{model_name=\"sentiment_classifier\"}[5m])",
            "legendFormat": "{{endpoint}}"
          }
        ],
        "type": "graph",
        "gridPos": {"x": 0, "y": 0, "w": 6, "h": 8}
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(ml_errors_total{model_name=\"sentiment_classifier\"}[5m]) / rate(ml_requests_total{model_name=\"sentiment_classifier\"}[5m])",
            "legendFormat": "Error %"
          }
        ],
        "type": "graph",
        "gridPos": {"x": 6, "y": 0, "w": 6, "h": 8}
      },
      {
        "title": "Latency P95",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(ml_request_duration_seconds_bucket{model_name=\"sentiment_classifier\"}[5m]))",
            "legendFormat": "P95"
          }
        ],
        "type": "graph",
        "gridPos": {"x": 12, "y": 0, "w": 6, "h": 8},
        "alert": {
          "conditions": [
            {
              "query": "A",
              "reducer": "avg",
              "evaluator": {"params": [0.5], "type": "gt"}
            }
          ],
          "message": "Latency P95 above 500ms SLO"
        }
      },
      {
        "title": "Prediction Distribution",
        "targets": [
          {
            "expr": "rate(ml_predictions_by_class{model_name=\"sentiment_classifier\"}[1h])",
            "legendFormat": "{{predicted_class}}"
          }
        ],
        "type": "graph",
        "gridPos": {"x": 0, "y": 8, "w": 12, "h": 8}
      },
      {
        "title": "Data Drift (KS Test)",
        "targets": [
          {
            "expr": "ml_data_drift_score{model_name=\"sentiment_classifier\"}",
            "legendFormat": "{{feature_name}}"
          }
        ],
        "type": "graph",
        "gridPos": {"x": 12, "y": 8, "w": 12, "h": 8},
        "thresholds": [
          {"value": 0.15, "color": "yellow"},
          {"value": 0.25, "color": "red"}
        ]
      }
    ]
  }
}

Section 5: Alert Rules (Actionable, Not Noisy)

Severity-Based Alerting

Alert Severity Levels:

CRITICAL (page immediately, wake up on-call):
  - Error rate > 5% for 5 minutes
  - Latency P95 > 2× SLO for 10 minutes
  - Service down (health check fails)
  - Model accuracy < 60% (catastrophic failure)
  Response time: 15 minutes
  Escalation: Page backup if no ack in 15 min

WARNING (notify, but don't wake up):
  - Error rate > 2% for 10 minutes
  - Latency P95 > 1.5× SLO for 15 minutes
  - Data drift KS > 0.15 (moderate)
  - Low confidence predictions > 20%
  Response time: 1 hour
  Escalation: Slack notification

INFO (log for review):
  - Error rate > 1%
  - Latency increasing trend
  - Data drift KS > 0.1 (slight)
  - Concept drift PSI > 0.1
  Response time: Next business day
  Escalation: Dashboard review

Prometheus Alert Rules

# prometheus_rules.yml

groups:
  - name: ml_model_alerts
    interval: 30s
    rules:

      # CRITICAL: High error rate
      - alert: HighErrorRate
        expr: |
          (
            rate(ml_errors_total[5m])
            /
            rate(ml_requests_total[5m])
          ) > 0.05
        for: 5m
        labels:
          severity: critical
          model: "{{ $labels.model_name }}"
        annotations:
          summary: "High error rate detected"
          description: |
            Model {{ $labels.model_name }} error rate is {{ $value | humanizePercentage }}
            (threshold: 5%)

            RUNBOOK:
            1. Check recent error logs: kubectl logs -l app=ml-service --since=10m | grep ERROR
            2. Check model health: curl http://service/health
            3. Check recent deployments: kubectl rollout history deployment/ml-service
            4. If model OOM: kubectl scale --replicas=5 deployment/ml-service
            5. If persistent: Rollback to previous version

      # CRITICAL: High latency
      - alert: HighLatencyP95
        expr: |
          histogram_quantile(0.95,
            rate(ml_request_duration_seconds_bucket[5m])
          ) > 1.0
        for: 10m
        labels:
          severity: critical
          model: "{{ $labels.model_name }}"
        annotations:
          summary: "Latency P95 above SLO"
          description: |
            Model {{ $labels.model_name }} latency P95 is {{ $value }}s
            (SLO: 0.5s, threshold: 1.0s = 2× SLO)

            RUNBOOK:
            1. Check current load: rate(ml_requests_total[5m])
            2. Check resource usage: CPU/memory/GPU utilization
            3. Check for slow requests: Check P99 latency
            4. Scale if needed: kubectl scale --replicas=10 deployment/ml-service
            5. Check downstream dependencies (database, cache, APIs)

      # WARNING: Moderate data drift
      - alert: DataDriftDetected
        expr: ml_data_drift_score > 0.15
        for: 1h
        labels:
          severity: warning
          model: "{{ $labels.model_name }}"
          feature: "{{ $labels.feature_name }}"
        annotations:
          summary: "Data drift detected"
          description: |
            Model {{ $labels.model_name }} feature {{ $labels.feature_name }}
            KS statistic: {{ $value }}
            (threshold: 0.15 = moderate drift)

            RUNBOOK:
            1. Compare current vs baseline distributions (Grafana dashboard)
            2. Check recent data source changes
            3. Review sample inputs for anomalies
            4. If drift severe (KS > 0.25): Plan retraining
            5. If accuracy dropping: Expedite retraining

      # WARNING: Concept drift
      - alert: ConceptDriftDetected
        expr: ml_concept_drift_psi > 0.2
        for: 1h
        labels:
          severity: warning
          model: "{{ $labels.model_name }}"
        annotations:
          summary: "Concept drift detected"
          description: |
            Model {{ $labels.model_name }} PSI: {{ $value }}
            (threshold: 0.2 = significant shift)

            Model behavior is changing (same inputs → different outputs)

            RUNBOOK:
            1. Check prediction distribution changes (Grafana)
            2. Compare with data drift (correlated?)
            3. Validate accuracy on ground truth samples
            4. If accuracy < 75%: Retraining required
            5. Investigate root cause (seasonality, new patterns, etc.)

      # CRITICAL: Low accuracy
      - alert: LowModelAccuracy
        expr: ml_accuracy_by_segment{segment="ground_truth_sample"} < 0.70
        for: 30m
        labels:
          severity: critical
          model: "{{ $labels.model_name }}"
        annotations:
          summary: "Model accuracy below threshold"
          description: |
            Model {{ $labels.model_name }} accuracy: {{ $value | humanizePercentage }}
            (threshold: 70%, baseline: 85%)

            CRITICAL: Model performance severely degraded

            RUNBOOK:
            1. IMMEDIATE: Increase ground truth sampling rate (validate more)
            2. Check for data drift (likely root cause)
            3. Review recent input examples (new patterns?)
            4. ESCALATE: Notify ML team for emergency retraining
            5. Consider rollback to previous model version

      # INFO: Increased low confidence predictions
      - alert: HighLowConfidencePredictions
        expr: |
          (
            rate(ml_low_confidence_predictions[1h])
            /
            rate(ml_requests_total[1h])
          ) > 0.2
        for: 1h
        labels:
          severity: info
          model: "{{ $labels.model_name }}"
        annotations:
          summary: "High rate of low confidence predictions"
          description: |
            Model {{ $labels.model_name }} low confidence rate: {{ $value | humanizePercentage }}
            (threshold: 20%)

            Model is uncertain about many predictions

            RUNBOOK:
            1. Review low confidence examples (what's different?)
            2. Check if correlated with drift
            3. Consider increasing confidence threshold (trade recall for precision)
            4. Monitor accuracy on low confidence predictions
            5. May indicate need for retraining or model improvement

Alert Grouping (Reduce Noise)

# AlertManager configuration
route:
  group_by: ['model_name', 'severity']
  group_wait: 30s        # Wait 30s before sending first alert (batch correlated)
  group_interval: 5m     # Send updates every 5 minutes
  repeat_interval: 4h    # Re-send if not resolved after 4 hours

  routes:
    # CRITICAL alerts: Page immediately
    - match:
        severity: critical
      receiver: pagerduty
      continue: true  # Also send to Slack

    # WARNING alerts: Slack notification
    - match:
        severity: warning
      receiver: slack_warnings

    # INFO alerts: Log only
    - match:
        severity: info
      receiver: slack_info

receivers:
  - name: pagerduty
    pagerduty_configs:
      - service_key: <YOUR_PAGERDUTY_KEY>
        description: "{{ range .Alerts }}{{ .Annotations.summary }}\n{{ end }}"

  - name: slack_warnings
    slack_configs:
      - api_url: <YOUR_SLACK_WEBHOOK>
        channel: '#ml-alerts-warnings'
        title: "⚠️ ML Warning Alert"
        text: "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"

  - name: slack_info
    slack_configs:
      - api_url: <YOUR_SLACK_WEBHOOK>
        channel: '#ml-alerts-info'
        title: "ℹ️ ML Info Alert"
        text: "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"

Section 6: SLAs and SLOs for ML Systems

Defining Service Level Objectives (SLOs)

Model SLOs Template:

Service: [Model Name]
Version: [Version Number]
Owner: [Team Name]

1. LATENCY
   Objective: 95% of requests complete within [X]ms
   Measurement: P95 latency from Prometheus histogram
   Target: 95% compliance (monthly)
   Current: [Track in dashboard]

   Example:
   - P50 < 100ms
   - P95 < 500ms
   - P99 < 1000ms

2. AVAILABILITY
   Objective: Service uptime > [X]%
   Measurement: Health check success rate
   Target: 99.5% uptime (monthly) = 3.6 hours downtime allowed
   Current: [Track in dashboard]

3. ERROR RATE
   Objective: < [X]% of requests fail
   Measurement: (errors / total requests) × 100
   Target: < 1% error rate
   Current: [Track in dashboard]

4. MODEL ACCURACY
   Objective: Accuracy > [X]% on ground truth sample
   Measurement: Human-labeled sample (10% of traffic)
   Target: > 85% accuracy (rolling 1000 samples)
   Current: [Track in dashboard]

5. THROUGHPUT
   Objective: Support [X] requests/second
   Measurement: Request rate from Prometheus
   Target: Handle 1000 req/s without degradation
   Current: [Track in dashboard]

6. COST
   Objective: < $[X] per 1000 requests
   Measurement: Cloud billing / request count
   Target: < $0.05 per 1000 requests
   Current: [Track in dashboard]

SLO Compliance Dashboard

from prometheus_client import Gauge

SLO_COMPLIANCE_GAUGE = Gauge(
    'ml_slo_compliance_percentage',
    'SLO compliance percentage',
    ['model_name', 'slo_type']
)

class SLOTracker:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.slos = {
            "latency_p95": {"target": 0.5, "threshold": 0.95},  # 500ms, 95% compliance
            "error_rate": {"target": 0.01, "threshold": 0.95},  # 1% errors
            "accuracy": {"target": 0.85, "threshold": 0.95},    # 85% accuracy
            "availability": {"target": 0.995, "threshold": 1.0}  # 99.5% uptime
        }
        self.measurements = {slo: [] for slo in self.slos.keys()}

    def record_measurement(self, slo_type: str, value: float):
        """Record SLO measurement (e.g., latency, error rate)"""
        self.measurements[slo_type].append({
            "value": value,
            "timestamp": time.time(),
            "compliant": self._is_compliant(slo_type, value)
        })

        # Keep last 30 days
        cutoff = time.time() - (30 * 24 * 3600)
        self.measurements[slo_type] = [
            m for m in self.measurements[slo_type]
            if m["timestamp"] > cutoff
        ]

        # Update compliance gauge
        compliance = self.calculate_compliance(slo_type)
        SLO_COMPLIANCE_GAUGE.labels(
            model_name=self.model_name,
            slo_type=slo_type
        ).set(compliance)

    def _is_compliant(self, slo_type: str, value: float) -> bool:
        """Check if single measurement meets SLO"""
        target = self.slos[slo_type]["target"]

        if slo_type in ["latency_p95", "error_rate"]:
            return value <= target  # Lower is better
        else:  # accuracy, availability
            return value >= target  # Higher is better

    def calculate_compliance(self, slo_type: str) -> float:
        """Calculate SLO compliance percentage"""
        if not self.measurements[slo_type]:
            return 0.0

        compliant_count = sum(
            1 for m in self.measurements[slo_type]
            if m["compliant"]
        )

        return compliant_count / len(self.measurements[slo_type])

    def check_slo_status(self) -> dict:
        """Check all SLOs and return status"""
        status = {}

        for slo_type, slo_config in self.slos.items():
            compliance = self.calculate_compliance(slo_type)
            threshold = slo_config["threshold"]

            status[slo_type] = {
                "compliance": compliance,
                "threshold": threshold,
                "status": "✓ MEETING SLO" if compliance >= threshold else "✗ VIOLATING SLO"
            }

        return status

# Usage
slo_tracker = SLOTracker("sentiment_classifier")

# Record measurements periodically
slo_tracker.record_measurement("latency_p95", 0.45)  # 450ms (compliant)
slo_tracker.record_measurement("error_rate", 0.008)  # 0.8% (compliant)
slo_tracker.record_measurement("accuracy", 0.87)     # 87% (compliant)

# Check overall status
status = slo_tracker.check_slo_status()

Section 7: Monitoring Stack (Prometheus + Grafana)

Complete Setup Example

# docker-compose.yml

version: '3'

services:
  # ML Service
  ml-service:
    build: .
    ports:
      - "8000:8000"
      - "8001:8001"  # Metrics endpoint
    environment:
      - MODEL_PATH=/models/sentiment_classifier.pt
    volumes:
      - ./models:/models

  # Prometheus (metrics collection)
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./prometheus_rules.yml:/etc/prometheus/rules.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'

  # Grafana (visualization)
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana_dashboards:/etc/grafana/provisioning/dashboards
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

  # AlertManager (alert routing)
  alertmanager:
    image: prom/alertmanager:latest
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml

volumes:
  prometheus_data:
  grafana_data:
# prometheus.yml

global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - /etc/prometheus/rules.yml

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

scrape_configs:
  - job_name: 'ml-service'
    static_configs:
      - targets: ['ml-service:8001']  # Metrics endpoint
    metrics_path: /metrics
# ML Service with Prometheus metrics

from fastapi import FastAPI
from prometheus_client import make_asgi_app, Counter, Histogram
import uvicorn

app = FastAPI()

# Metrics
REQUEST_COUNT = Counter('ml_requests_total', 'Total requests', ['endpoint'])
REQUEST_LATENCY = Histogram('ml_latency_seconds', 'Request latency', ['endpoint'])

@app.post("/predict")
@REQUEST_LATENCY.labels(endpoint="/predict").time()
def predict(text: str):
    REQUEST_COUNT.labels(endpoint="/predict").inc()
    result = model.predict(text)
    return {"prediction": result}

# Mount Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

if __name__ == "__main__":
    # Main service on port 8000
    # Metrics on port 8001
    uvicorn.run(app, host="0.0.0.0", port=8000)

Section 8: Complete Example (End-to-End)

# complete_monitoring.py
# Complete production monitoring for sentiment classifier

from fastapi import FastAPI, HTTPException
from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
from scipy.stats import ks_2samp
import numpy as np
import time
from typing import Optional

app = FastAPI()

# === 1. PERFORMANCE METRICS (RED) ===

REQUEST_COUNT = Counter(
    'sentiment_requests_total',
    'Total sentiment analysis requests',
    ['endpoint', 'model_version']
)

ERROR_COUNT = Counter(
    'sentiment_errors_total',
    'Total errors',
    ['error_type']
)

REQUEST_LATENCY = Histogram(
    'sentiment_latency_seconds',
    'Request latency',
    ['endpoint'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0]
)

# === 2. MODEL QUALITY METRICS ===

PREDICTION_COUNT = Counter(
    'sentiment_predictions_by_class',
    'Predictions by sentiment class',
    ['predicted_class']
)

CONFIDENCE_HISTOGRAM = Histogram(
    'sentiment_confidence',
    'Prediction confidence scores',
    buckets=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)

ACCURACY_GAUGE = Gauge(
    'sentiment_accuracy_ground_truth',
    'Accuracy on ground truth sample'
)

# === 3. DRIFT DETECTION ===

DRIFT_SCORE_GAUGE = Gauge(
    'sentiment_data_drift_ks',
    'KS statistic for data drift',
    ['feature']
)

PSI_GAUGE = Gauge(
    'sentiment_concept_drift_psi',
    'PSI for concept drift'
)

# === Initialize Monitoring Components ===

class SentimentMonitor:
    def __init__(self):
        # Reference data (from training)
        self.reference_text_lengths = np.random.normal(100, 30, 10000)

        # Drift detection
        self.current_text_lengths = []
        self.current_predictions = []
        self.baseline_prediction_dist = None

        # Ground truth tracking
        self.predictions = {}
        self.ground_truths = []

        # SLO tracking
        self.slo_measurements = []

    def track_request(self, text: str, prediction: dict, latency: float):
        """Track all metrics for a request"""
        # 1. Performance metrics
        REQUEST_COUNT.labels(
            endpoint="/predict",
            model_version="v1.0"
        ).inc()

        REQUEST_LATENCY.labels(endpoint="/predict").observe(latency)

        # 2. Model quality
        PREDICTION_COUNT.labels(
            predicted_class=prediction["label"]
        ).inc()

        CONFIDENCE_HISTOGRAM.observe(prediction["confidence"])

        # 3. Drift detection
        self.current_text_lengths.append(len(text))
        self.current_predictions.append(prediction["confidence"])

        # Check drift every 1000 samples
        if len(self.current_text_lengths) >= 1000:
            self.check_data_drift()
            self.check_concept_drift()
            self.current_text_lengths = []
            self.current_predictions = []

        # 4. SLO tracking
        self.slo_measurements.append({
            "latency": latency,
            "timestamp": time.time()
        })

monitor = SentimentMonitor()

# === Endpoints ===

@app.post("/predict")
def predict(text: str):
    start_time = time.time()

    try:
        # Dummy model prediction
        result = {
            "label": "positive",
            "confidence": 0.92
        }

        latency = time.time() - start_time

        # Track metrics
        monitor.track_request(text, result, latency)

        return {
            "prediction": result["label"],
            "confidence": result["confidence"],
            "latency_ms": latency * 1000
        }

    except Exception as e:
        ERROR_COUNT.labels(error_type=type(e).__name__).inc()
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/feedback")
def feedback(request_id: str, true_label: str):
    """Collect ground truth labels"""
    monitor.ground_truths.append({
        "request_id": request_id,
        "true_label": true_label,
        "timestamp": time.time()
    })

    # Calculate accuracy on last 100 samples
    if len(monitor.ground_truths) >= 100:
        recent = monitor.ground_truths[-100:]
        # Calculate accuracy (simplified)
        accuracy = 0.87  # Placeholder
        ACCURACY_GAUGE.set(accuracy)

    return {"status": "recorded"}

# Mount Prometheus metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Key Takeaways

  1. Monitoring is mandatory - Instrument before deployment
  2. RED metrics first - Rate, Errors, Duration for every service
  3. Model quality matters - Track predictions, confidence, accuracy
  4. Drift detection prevents degradation - KS test + PSI
  5. Actionable alerts only - Severity-based, with runbooks
  6. SLOs define success - Quantitative targets guide optimization
  7. Dashboard = single pane of glass - Healthy or not in 5 seconds

This skill prevents all 5 RED failures by providing systematic monitoring, alerting, and observability for production ML systems.