| name | Model Monitoring |
| description | Monitor model performance, detect data drift, concept drift, and anomalies in production using Prometheus, Grafana, and MLflow |
Model Monitoring
Monitoring deployed machine learning models ensures they continue to perform well in production, detecting data drift, concept drift, and performance degradation.
Monitoring Components
- Performance Metrics: Accuracy, latency, throughput
- Data Drift: Distribution changes in input features
- Concept Drift: Changes in target variable relationships
- Output Drift: Changes in prediction distribution
- Feature Drift: Individual feature distribution changes
- Anomaly Detection: Unusual samples in production
Monitoring Tools
- Prometheus: Metrics collection and storage
- Grafana: Visualization and dashboarding
- MLflow: Model tracking and registry
- TensorFlow Data Validation: Data statistics
- Evidently: Drift detection and monitoring
- Great Expectations: Data quality assertions
Python Implementation
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy import stats
import json
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Production Monitoring System ===")
class ModelMonitoringSystem:
def __init__(self, model, scaler, baseline_data, baseline_targets):
self.model = model
self.scaler = scaler
self.baseline_data = baseline_data
self.baseline_targets = baseline_targets
self.baseline_mean = baseline_data.mean(axis=0)
self.baseline_std = baseline_data.std(axis=0)
self.baseline_predictions = model.predict(baseline_data)
self.metrics_history = []
self.drift_alerts = []
self.performance_history = []
def log_predictions(self, X, y_true, y_pred):
"""Log predictions and compute metrics"""
timestamp = datetime.now()
# Compute metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)
metric_record = {
'timestamp': timestamp,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'n_samples': len(X)
}
self.metrics_history.append(metric_record)
return metric_record
def detect_data_drift(self, X_new):
"""Detect data drift using Kolmogorov-Smirnov test"""
drift_detected = False
drift_features = []
for feature_idx in range(X_new.shape[1]):
baseline_feature = self.baseline_data[:, feature_idx]
new_feature = X_new[:, feature_idx]
# KS Test
ks_statistic, p_value = stats.ks_2samp(baseline_feature, new_feature)
if p_value < 0.05: # Significant drift detected
drift_detected = True
drift_features.append({
'feature_index': feature_idx,
'ks_statistic': float(ks_statistic),
'p_value': float(p_value)
})
if drift_detected:
alert = {
'timestamp': datetime.now(),
'type': 'data_drift',
'severity': 'high',
'drifted_features': drift_features,
'n_drifted': len(drift_features)
}
self.drift_alerts.append(alert)
logger.warning(f"Data drift detected in {len(drift_features)} features")
return drift_detected, drift_features
def detect_output_drift(self, y_pred_new):
"""Detect drift in model predictions"""
baseline_pred_dist = pd.Series(self.baseline_predictions).value_counts(normalize=True)
new_pred_dist = pd.Series(y_pred_new).value_counts(normalize=True)
# Compare distributions
classes = set(baseline_pred_dist.index) | set(new_pred_dist.index)
chi2_stat = 0
for cls in classes:
exp = baseline_pred_dist.get(cls, 0.01)
obs = new_pred_dist.get(cls, 0.01)
chi2_stat += (obs - exp) ** 2 / max(exp, 0.01)
p_value = 1 - stats.chi2.cdf(chi2_stat, len(classes) - 1)
if p_value < 0.05:
alert = {
'timestamp': datetime.now(),
'type': 'output_drift',
'severity': 'medium',
'chi2_statistic': float(chi2_stat),
'p_value': float(p_value)
}
self.drift_alerts.append(alert)
logger.warning("Output drift detected in predictions")
return True
return False
def detect_performance_degradation(self, y_true, y_pred):
"""Detect if model performance has degraded"""
current_accuracy = accuracy_score(y_true, y_pred)
baseline_accuracy = accuracy_score(self.baseline_targets, self.baseline_predictions)
degradation_threshold = 0.05 # 5% drop
degradation = baseline_accuracy - current_accuracy
if degradation > degradation_threshold:
alert = {
'timestamp': datetime.now(),
'type': 'performance_degradation',
'severity': 'critical',
'baseline_accuracy': float(baseline_accuracy),
'current_accuracy': float(current_accuracy),
'degradation': float(degradation)
}
self.drift_alerts.append(alert)
logger.error("Performance degradation detected")
return True
return False
def get_monitoring_report(self):
"""Generate monitoring report"""
if not self.metrics_history:
return {}
metrics_df = pd.DataFrame(self.metrics_history)
return {
'monitoring_period': {
'start': self.metrics_history[0]['timestamp'].isoformat(),
'end': self.metrics_history[-1]['timestamp'].isoformat(),
'n_batches': len(self.metrics_history)
},
'performance_summary': {
'avg_accuracy': float(metrics_df['accuracy'].mean()),
'min_accuracy': float(metrics_df['accuracy'].min()),
'max_accuracy': float(metrics_df['accuracy'].max()),
'std_accuracy': float(metrics_df['accuracy'].std()),
'avg_f1': float(metrics_df['f1'].mean())
},
'drift_summary': {
'total_alerts': len(self.drift_alerts),
'data_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'data_drift'),
'output_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'output_drift'),
'performance_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'performance_degradation')
},
'alerts': self.drift_alerts[-10:] # Last 10 alerts
}
print("Monitoring system initialized")
# 2. Create baseline model
print("\n=== 2. Train Baseline Model ===")
from sklearn.datasets import make_classification
# Create baseline data
X_baseline, y_baseline = make_classification(n_samples=1000, n_features=20,
n_informative=15, random_state=42)
scaler = StandardScaler()
X_baseline_scaled = scaler.fit_transform(X_baseline)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_baseline_scaled, y_baseline)
print(f"Baseline model trained on {len(X_baseline)} samples")
# 3. Initialize monitoring
monitor = ModelMonitoringSystem(model, scaler, X_baseline_scaled, y_baseline)
# 4. Simulate production data and monitoring
print("\n=== 3. Production Monitoring Simulation ===")
# Simulate normal production data
X_prod_normal = np.random.randn(500, 20) * 0.5
X_prod_normal_scaled = scaler.transform(X_prod_normal)
y_pred_normal = model.predict(X_prod_normal_scaled)
y_true_normal = np.random.randint(0, 2, 500)
metrics_normal = monitor.log_predictions(X_prod_normal_scaled, y_true_normal, y_pred_normal)
print(f"Normal production batch - Accuracy: {metrics_normal['accuracy']:.4f}")
# Simulate drifted data
X_prod_drift = np.random.randn(500, 20) * 2.0 # Different distribution
X_prod_drift_scaled = scaler.transform(X_prod_drift)
y_pred_drift = model.predict(X_prod_drift_scaled)
y_true_drift = np.random.randint(0, 2, 500)
metrics_drift = monitor.log_predictions(X_prod_drift_scaled, y_true_drift, y_pred_drift)
drift_detected, drift_features = monitor.detect_data_drift(X_prod_drift_scaled)
print(f"Drifted production batch - Accuracy: {metrics_drift['accuracy']:.4f}")
print(f"Data drift detected: {drift_detected}")
# Check performance degradation
perf_degradation = monitor.detect_performance_degradation(y_true_drift, y_pred_drift)
print(f"Performance degradation: {perf_degradation}")
# 5. Prometheus metrics export
print("\n=== 4. Prometheus Metrics Format ===")
prometheus_metrics = f"""
# HELP model_accuracy Model accuracy score
# TYPE model_accuracy gauge
model_accuracy{"{timestamp='2024-01-01'}"} {metrics_normal['accuracy']:.4f}
# HELP model_f1_score Model F1 score
# TYPE model_f1_score gauge
model_f1_score{"{timestamp='2024-01-01'}"} {metrics_normal['f1']:.4f}
# HELP model_predictions_total Total predictions made
# TYPE model_predictions_total counter
model_predictions_total 5000
# HELP model_drift_detected Data drift detection flag
# TYPE model_drift_detected gauge
model_drift_detected {int(drift_detected)}
# HELP model_latency_seconds Prediction latency in seconds
# TYPE model_latency_seconds histogram
model_latency_seconds_bucket{{le="0.01"}} 100
model_latency_seconds_bucket{{le="0.05"}} 450
model_latency_seconds_bucket{{le="0.1"}} 500
"""
print("Prometheus metrics:")
print(prometheus_metrics)
# 6. Grafana dashboard JSON
print("\n=== 5. Grafana Dashboard Configuration ===")
grafana_dashboard = {
'title': 'ML Model Monitoring Dashboard',
'panels': [
{
'title': 'Model Accuracy',
'targets': [{'metric': 'model_accuracy'}],
'type': 'graph'
},
{
'title': 'Data Drift Alerts',
'targets': [{'metric': 'model_drift_detected'}],
'type': 'stat'
},
{
'title': 'Prediction Latency',
'targets': [{'metric': 'model_latency_seconds'}],
'type': 'graph'
},
{
'title': 'Feature Distributions',
'targets': [{'metric': 'feature_distribution_change'}],
'type': 'heatmap'
}
]
}
print("Grafana dashboard configured with 4 panels")
# 7. Visualization
print("\n=== 6. Monitoring Visualization ===")
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Performance over time
metrics_df = pd.DataFrame(monitor.metrics_history)
axes[0, 0].plot(range(len(metrics_df)), metrics_df['accuracy'], marker='o', label='Accuracy')
axes[0, 0].axhline(y=metrics_df['accuracy'].mean(), color='r', linestyle='--', label='Mean')
axes[0, 0].set_xlabel('Batch')
axes[0, 0].set_ylabel('Accuracy')
axes[0, 0].set_title('Model Accuracy Over Time')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Precision, Recall, F1
axes[0, 1].plot(range(len(metrics_df)), metrics_df['precision'], label='Precision', marker='s')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['recall'], label='Recall', marker='^')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['f1'], label='F1', marker='d')
axes[0, 1].set_xlabel('Batch')
axes[0, 1].set_ylabel('Score')
axes[0, 1].set_title('Performance Metrics Over Time')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Feature distributions (baseline vs current)
feature_stats = pd.DataFrame({
'Baseline Mean': np.mean(X_baseline, axis=0)[:10],
'Current Mean': np.mean(X_prod_drift, axis=0)[:10]
})
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Baseline Mean'], alpha=0.7, label='Baseline')
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Current Mean'], alpha=0.7, label='Current')
axes[1, 0].set_xlabel('Feature')
axes[1, 0].set_ylabel('Mean Value')
axes[1, 0].set_title('Feature Distribution Drift (First 10)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3, axis='y')
# Alerts over time
alert_types = [a['type'] for a in monitor.drift_alerts]
alert_counts = pd.Series(alert_types).value_counts()
axes[1, 1].barh(alert_counts.index, alert_counts.values, color=['red', 'orange', 'yellow'])
axes[1, 1].set_xlabel('Count')
axes[1, 1].set_title('Alert Types Distribution')
axes[1, 1].grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.savefig('model_monitoring_dashboard.png', dpi=100, bbox_inches='tight')
print("\nMonitoring dashboard saved as 'model_monitoring_dashboard.png'")
# 8. Report generation
report = monitor.get_monitoring_report()
print("\n=== 7. Monitoring Report ===")
print(json.dumps(report, indent=2, default=str))
print("\nModel monitoring system setup completed!")
Drift Detection Techniques
- Kolmogorov-Smirnov Test: Statistical test for distribution change
- Population Stability Index: Measures feature distribution shifts
- Chi-square Test: Categorical feature drift
- Wasserstein Distance: Optimal transport-based distance
- Jensen-Shannon Divergence: Information-theoretic metric
Alert Thresholds
- Critical: >5% accuracy drop, immediate action
- High: Data drift in 3+ features, investigation needed
- Medium: Output drift, monitoring increased
- Low: Single feature drift, log and track
Deliverables
- Monitoring dashboards
- Alert configuration
- Drift detection reports
- Performance degradation analysis
- Retraining recommendations
- Action playbook