Claude Code Plugins

Community-maintained marketplace

Feedback

Real-time monitoring and detection of adversarial attacks and model drift in production

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name continuous-monitoring
version 2.0.0
description Real-time monitoring and detection of adversarial attacks and model drift in production
sasmp_version 1.3.0
bonded_agent 05-defense-strategy-developer
bond_type SECONDARY_BOND
input_schema [object Object]
output_schema [object Object]
owasp_llm_2025 LLM10, LLM02
nist_ai_rmf Measure, Manage

Continuous Monitoring

Implement real-time detection of adversarial attacks and model degradation in production AI systems.

Quick Reference

Skill:       continuous-monitoring
Agent:       05-defense-strategy-developer
OWASP:       LLM10 (Unbounded Consumption), LLM02 (Sensitive Disclosure)
NIST:        Measure, Manage
Use Case:    Detect attacks and drift in production

Monitoring Architecture

User Input → [Input Monitor] → [Model] → [Output Monitor] → Response
                  ↓                              ↓
            [Anomaly Detection]          [Quality Check]
                  ↓                              ↓
            [Alert System] ←←←←←←←←←←←←←←←←←←←←←←
                  ↓
            [Incident Response]

Detection Categories

1. Input Anomaly Detection

Category: input_anomaly
Latency Impact: 10-20ms
Detection Rate: 85-95%
class InputAnomalyDetector:
    def __init__(self, training_distribution):
        self.mean = training_distribution.mean
        self.cov = training_distribution.covariance
        self.threshold = 3.0  # Standard deviations

    def detect(self, input_embedding):
        # Mahalanobis distance from training distribution
        diff = input_embedding - self.mean
        distance = np.sqrt(diff.T @ np.linalg.inv(self.cov) @ diff)

        if distance > self.threshold:
            return AnomalyAlert(
                type="out_of_distribution",
                score=distance,
                severity=self._classify_severity(distance)
            )
        return None

    def detect_injection(self, text_input):
        # Pattern-based injection detection
        injection_patterns = [
            r'ignore\s+(previous|all)\s+instructions',
            r'system\s*:\s*',
            r'(admin|developer)\s+mode',
        ]
        for pattern in injection_patterns:
            if re.search(pattern, text_input, re.I):
                return AnomalyAlert(type="injection_attempt", severity="HIGH")
        return None

2. Output Quality Monitoring

Category: output_quality
Metrics: [confidence, coherence, toxicity, latency]
class OutputQualityMonitor:
    def __init__(self, config):
        self.confidence_threshold = config.get('confidence', 0.5)
        self.toxicity_threshold = config.get('toxicity', 0.1)
        self.latency_threshold_ms = config.get('latency', 5000)

    def check(self, response, metadata):
        alerts = []

        # Low confidence check
        if metadata.confidence < self.confidence_threshold:
            alerts.append(Alert("low_confidence", metadata.confidence))

        # Toxicity check
        toxicity_score = self.toxicity_classifier(response)
        if toxicity_score > self.toxicity_threshold:
            alerts.append(Alert("high_toxicity", toxicity_score))

        # Latency check
        if metadata.latency_ms > self.latency_threshold_ms:
            alerts.append(Alert("high_latency", metadata.latency_ms))

        # Coherence check
        coherence = self.coherence_scorer(response)
        if coherence < 0.7:
            alerts.append(Alert("low_coherence", coherence))

        return alerts

3. Model Drift Detection

Category: model_drift
Types: [data_drift, concept_drift, prediction_drift]
class DriftDetector:
    def __init__(self, baseline_window=1000):
        self.baseline_window = baseline_window
        self.baseline_inputs = []
        self.baseline_outputs = []

    def detect_data_drift(self, current_inputs):
        """Detect drift in input distribution"""
        if len(self.baseline_inputs) < self.baseline_window:
            self.baseline_inputs.extend(current_inputs)
            return None

        # KL divergence between distributions
        baseline_dist = self._estimate_distribution(self.baseline_inputs)
        current_dist = self._estimate_distribution(current_inputs)
        kl_div = self._kl_divergence(baseline_dist, current_dist)

        if kl_div > 0.1:
            return DriftAlert("data_drift", kl_div)
        return None

    def detect_concept_drift(self, predictions, ground_truth):
        """Detect drift in model performance"""
        # Track accuracy over sliding windows
        recent_accuracy = self._compute_accuracy(predictions, ground_truth)
        baseline_accuracy = self._baseline_accuracy()

        if baseline_accuracy - recent_accuracy > 0.05:
            return DriftAlert("concept_drift", recent_accuracy)
        return None

4. Security Event Monitoring

Category: security_events
Events: [extraction_attempt, jailbreak, rate_abuse]
class SecurityMonitor:
    def __init__(self):
        self.query_history = defaultdict(list)
        self.extraction_patterns = []

    def detect_extraction(self, user_id, queries):
        """Detect model extraction attempts"""
        history = self.query_history[user_id]
        history.extend(queries)

        # Check for systematic querying patterns
        if len(history) > 1000:  # High volume
            diversity = self._query_diversity(history)
            if diversity > 0.9:  # Very diverse
                return SecurityAlert("extraction_attempt", user_id)

        return None

    def detect_abuse(self, user_id, request_timestamps):
        """Detect rate limit abuse"""
        window = 60  # 1 minute
        recent = [t for t in request_timestamps if time.time() - t < window]

        if len(recent) > 100:  # Too many requests
            return SecurityAlert("rate_abuse", user_id, len(recent))
        return None

Alert Configuration

Alert Thresholds:
  input_anomaly:
    warning: 2.5  # standard deviations
    critical: 4.0

  output_toxicity:
    warning: 0.3
    critical: 0.7

  model_drift:
    warning: 0.05  # 5% accuracy drop
    critical: 0.10

  extraction_queries:
    warning: 500/hour
    critical: 1000/hour

Dashboard Metrics

┌──────────────────────────────────────────────────────────┐
│ REAL-TIME MONITORING DASHBOARD                           │
├──────────────────────────────────────────────────────────┤
│ Input Anomalies (1hr):  ████░░░░ 12 (2.4%)              │
│ Output Toxicity (1hr):  █░░░░░░░  3 (0.6%)              │
│ Model Latency P99:      ████████ 2.3s                   │
│ Drift Score:            ██░░░░░░ 0.02 (OK)              │
│ Security Alerts:        ░░░░░░░░ 0                       │
└──────────────────────────────────────────────────────────┘

Troubleshooting

Issue: Too many false positive alerts
Solution: Tune thresholds, add allowlists, improve baseline

Issue: Missing attack detection
Solution: Expand detection patterns, lower thresholds

Issue: High monitoring latency
Solution: Use sampling, async processing, optimize detectors

Integration Points

Component Purpose
Agent 05 Configures monitoring
Agent 08 CI/CD integration
/report Monitoring reports
Prometheus/Grafana Metrics visualization

Detect attacks and drift with real-time AI monitoring.