Claude Code Plugins

Community-maintained marketplace

Feedback

Monitor model performance, detect data drift, concept drift, and anomalies in production using Prometheus, Grafana, and MLflow

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name Model Monitoring
description Monitor model performance, detect data drift, concept drift, and anomalies in production using Prometheus, Grafana, and MLflow

Model Monitoring

Monitoring deployed machine learning models ensures they continue to perform well in production, detecting data drift, concept drift, and performance degradation.

Monitoring Components

  • Performance Metrics: Accuracy, latency, throughput
  • Data Drift: Distribution changes in input features
  • Concept Drift: Changes in target variable relationships
  • Output Drift: Changes in prediction distribution
  • Feature Drift: Individual feature distribution changes
  • Anomaly Detection: Unusual samples in production

Monitoring Tools

  • Prometheus: Metrics collection and storage
  • Grafana: Visualization and dashboarding
  • MLflow: Model tracking and registry
  • TensorFlow Data Validation: Data statistics
  • Evidently: Drift detection and monitoring
  • Great Expectations: Data quality assertions

Python Implementation

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy import stats
import json
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. Production Monitoring System ===")

class ModelMonitoringSystem:
    def __init__(self, model, scaler, baseline_data, baseline_targets):
        self.model = model
        self.scaler = scaler
        self.baseline_data = baseline_data
        self.baseline_targets = baseline_targets
        self.baseline_mean = baseline_data.mean(axis=0)
        self.baseline_std = baseline_data.std(axis=0)
        self.baseline_predictions = model.predict(baseline_data)

        self.metrics_history = []
        self.drift_alerts = []
        self.performance_history = []

    def log_predictions(self, X, y_true, y_pred):
        """Log predictions and compute metrics"""
        timestamp = datetime.now()

        # Compute metrics
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
        f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

        metric_record = {
            'timestamp': timestamp,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'n_samples': len(X)
        }

        self.metrics_history.append(metric_record)
        return metric_record

    def detect_data_drift(self, X_new):
        """Detect data drift using Kolmogorov-Smirnov test"""
        drift_detected = False
        drift_features = []

        for feature_idx in range(X_new.shape[1]):
            baseline_feature = self.baseline_data[:, feature_idx]
            new_feature = X_new[:, feature_idx]

            # KS Test
            ks_statistic, p_value = stats.ks_2samp(baseline_feature, new_feature)

            if p_value < 0.05:  # Significant drift detected
                drift_detected = True
                drift_features.append({
                    'feature_index': feature_idx,
                    'ks_statistic': float(ks_statistic),
                    'p_value': float(p_value)
                })

        if drift_detected:
            alert = {
                'timestamp': datetime.now(),
                'type': 'data_drift',
                'severity': 'high',
                'drifted_features': drift_features,
                'n_drifted': len(drift_features)
            }
            self.drift_alerts.append(alert)
            logger.warning(f"Data drift detected in {len(drift_features)} features")

        return drift_detected, drift_features

    def detect_output_drift(self, y_pred_new):
        """Detect drift in model predictions"""
        baseline_pred_dist = pd.Series(self.baseline_predictions).value_counts(normalize=True)
        new_pred_dist = pd.Series(y_pred_new).value_counts(normalize=True)

        # Compare distributions
        classes = set(baseline_pred_dist.index) | set(new_pred_dist.index)
        chi2_stat = 0

        for cls in classes:
            exp = baseline_pred_dist.get(cls, 0.01)
            obs = new_pred_dist.get(cls, 0.01)
            chi2_stat += (obs - exp) ** 2 / max(exp, 0.01)

        p_value = 1 - stats.chi2.cdf(chi2_stat, len(classes) - 1)

        if p_value < 0.05:
            alert = {
                'timestamp': datetime.now(),
                'type': 'output_drift',
                'severity': 'medium',
                'chi2_statistic': float(chi2_stat),
                'p_value': float(p_value)
            }
            self.drift_alerts.append(alert)
            logger.warning("Output drift detected in predictions")
            return True

        return False

    def detect_performance_degradation(self, y_true, y_pred):
        """Detect if model performance has degraded"""
        current_accuracy = accuracy_score(y_true, y_pred)
        baseline_accuracy = accuracy_score(self.baseline_targets, self.baseline_predictions)

        degradation_threshold = 0.05  # 5% drop
        degradation = baseline_accuracy - current_accuracy

        if degradation > degradation_threshold:
            alert = {
                'timestamp': datetime.now(),
                'type': 'performance_degradation',
                'severity': 'critical',
                'baseline_accuracy': float(baseline_accuracy),
                'current_accuracy': float(current_accuracy),
                'degradation': float(degradation)
            }
            self.drift_alerts.append(alert)
            logger.error("Performance degradation detected")
            return True

        return False

    def get_monitoring_report(self):
        """Generate monitoring report"""
        if not self.metrics_history:
            return {}

        metrics_df = pd.DataFrame(self.metrics_history)

        return {
            'monitoring_period': {
                'start': self.metrics_history[0]['timestamp'].isoformat(),
                'end': self.metrics_history[-1]['timestamp'].isoformat(),
                'n_batches': len(self.metrics_history)
            },
            'performance_summary': {
                'avg_accuracy': float(metrics_df['accuracy'].mean()),
                'min_accuracy': float(metrics_df['accuracy'].min()),
                'max_accuracy': float(metrics_df['accuracy'].max()),
                'std_accuracy': float(metrics_df['accuracy'].std()),
                'avg_f1': float(metrics_df['f1'].mean())
            },
            'drift_summary': {
                'total_alerts': len(self.drift_alerts),
                'data_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'data_drift'),
                'output_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'output_drift'),
                'performance_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'performance_degradation')
            },
            'alerts': self.drift_alerts[-10:]  # Last 10 alerts
        }

print("Monitoring system initialized")

# 2. Create baseline model
print("\n=== 2. Train Baseline Model ===")

from sklearn.datasets import make_classification

# Create baseline data
X_baseline, y_baseline = make_classification(n_samples=1000, n_features=20,
                                            n_informative=15, random_state=42)
scaler = StandardScaler()
X_baseline_scaled = scaler.fit_transform(X_baseline)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_baseline_scaled, y_baseline)

print(f"Baseline model trained on {len(X_baseline)} samples")

# 3. Initialize monitoring
monitor = ModelMonitoringSystem(model, scaler, X_baseline_scaled, y_baseline)

# 4. Simulate production data and monitoring
print("\n=== 3. Production Monitoring Simulation ===")

# Simulate normal production data
X_prod_normal = np.random.randn(500, 20) * 0.5
X_prod_normal_scaled = scaler.transform(X_prod_normal)
y_pred_normal = model.predict(X_prod_normal_scaled)
y_true_normal = np.random.randint(0, 2, 500)

metrics_normal = monitor.log_predictions(X_prod_normal_scaled, y_true_normal, y_pred_normal)
print(f"Normal production batch - Accuracy: {metrics_normal['accuracy']:.4f}")

# Simulate drifted data
X_prod_drift = np.random.randn(500, 20) * 2.0  # Different distribution
X_prod_drift_scaled = scaler.transform(X_prod_drift)
y_pred_drift = model.predict(X_prod_drift_scaled)
y_true_drift = np.random.randint(0, 2, 500)

metrics_drift = monitor.log_predictions(X_prod_drift_scaled, y_true_drift, y_pred_drift)
drift_detected, drift_features = monitor.detect_data_drift(X_prod_drift_scaled)
print(f"Drifted production batch - Accuracy: {metrics_drift['accuracy']:.4f}")
print(f"Data drift detected: {drift_detected}")

# Check performance degradation
perf_degradation = monitor.detect_performance_degradation(y_true_drift, y_pred_drift)
print(f"Performance degradation: {perf_degradation}")

# 5. Prometheus metrics export
print("\n=== 4. Prometheus Metrics Format ===")

prometheus_metrics = f"""
# HELP model_accuracy Model accuracy score
# TYPE model_accuracy gauge
model_accuracy{"{timestamp='2024-01-01'}"} {metrics_normal['accuracy']:.4f}

# HELP model_f1_score Model F1 score
# TYPE model_f1_score gauge
model_f1_score{"{timestamp='2024-01-01'}"} {metrics_normal['f1']:.4f}

# HELP model_predictions_total Total predictions made
# TYPE model_predictions_total counter
model_predictions_total 5000

# HELP model_drift_detected Data drift detection flag
# TYPE model_drift_detected gauge
model_drift_detected {int(drift_detected)}

# HELP model_latency_seconds Prediction latency in seconds
# TYPE model_latency_seconds histogram
model_latency_seconds_bucket{{le="0.01"}} 100
model_latency_seconds_bucket{{le="0.05"}} 450
model_latency_seconds_bucket{{le="0.1"}} 500
"""

print("Prometheus metrics:")
print(prometheus_metrics)

# 6. Grafana dashboard JSON
print("\n=== 5. Grafana Dashboard Configuration ===")

grafana_dashboard = {
    'title': 'ML Model Monitoring Dashboard',
    'panels': [
        {
            'title': 'Model Accuracy',
            'targets': [{'metric': 'model_accuracy'}],
            'type': 'graph'
        },
        {
            'title': 'Data Drift Alerts',
            'targets': [{'metric': 'model_drift_detected'}],
            'type': 'stat'
        },
        {
            'title': 'Prediction Latency',
            'targets': [{'metric': 'model_latency_seconds'}],
            'type': 'graph'
        },
        {
            'title': 'Feature Distributions',
            'targets': [{'metric': 'feature_distribution_change'}],
            'type': 'heatmap'
        }
    ]
}

print("Grafana dashboard configured with 4 panels")

# 7. Visualization
print("\n=== 6. Monitoring Visualization ===")

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Performance over time
metrics_df = pd.DataFrame(monitor.metrics_history)
axes[0, 0].plot(range(len(metrics_df)), metrics_df['accuracy'], marker='o', label='Accuracy')
axes[0, 0].axhline(y=metrics_df['accuracy'].mean(), color='r', linestyle='--', label='Mean')
axes[0, 0].set_xlabel('Batch')
axes[0, 0].set_ylabel('Accuracy')
axes[0, 0].set_title('Model Accuracy Over Time')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Precision, Recall, F1
axes[0, 1].plot(range(len(metrics_df)), metrics_df['precision'], label='Precision', marker='s')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['recall'], label='Recall', marker='^')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['f1'], label='F1', marker='d')
axes[0, 1].set_xlabel('Batch')
axes[0, 1].set_ylabel('Score')
axes[0, 1].set_title('Performance Metrics Over Time')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Feature distributions (baseline vs current)
feature_stats = pd.DataFrame({
    'Baseline Mean': np.mean(X_baseline, axis=0)[:10],
    'Current Mean': np.mean(X_prod_drift, axis=0)[:10]
})
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Baseline Mean'], alpha=0.7, label='Baseline')
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Current Mean'], alpha=0.7, label='Current')
axes[1, 0].set_xlabel('Feature')
axes[1, 0].set_ylabel('Mean Value')
axes[1, 0].set_title('Feature Distribution Drift (First 10)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3, axis='y')

# Alerts over time
alert_types = [a['type'] for a in monitor.drift_alerts]
alert_counts = pd.Series(alert_types).value_counts()
axes[1, 1].barh(alert_counts.index, alert_counts.values, color=['red', 'orange', 'yellow'])
axes[1, 1].set_xlabel('Count')
axes[1, 1].set_title('Alert Types Distribution')
axes[1, 1].grid(True, alpha=0.3, axis='x')

plt.tight_layout()
plt.savefig('model_monitoring_dashboard.png', dpi=100, bbox_inches='tight')
print("\nMonitoring dashboard saved as 'model_monitoring_dashboard.png'")

# 8. Report generation
report = monitor.get_monitoring_report()
print("\n=== 7. Monitoring Report ===")
print(json.dumps(report, indent=2, default=str))

print("\nModel monitoring system setup completed!")

Drift Detection Techniques

  • Kolmogorov-Smirnov Test: Statistical test for distribution change
  • Population Stability Index: Measures feature distribution shifts
  • Chi-square Test: Categorical feature drift
  • Wasserstein Distance: Optimal transport-based distance
  • Jensen-Shannon Divergence: Information-theoretic metric

Alert Thresholds

  • Critical: >5% accuracy drop, immediate action
  • High: Data drift in 3+ features, investigation needed
  • Medium: Output drift, monitoring increased
  • Low: Single feature drift, log and track

Deliverables

  • Monitoring dashboards
  • Alert configuration
  • Drift detection reports
  • Performance degradation analysis
  • Retraining recommendations
  • Action playbook