Claude Code Plugins

Community-maintained marketplace

Feedback

anomaly-detector

@anton-abyzov/specweave
3
0

|

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name anomaly-detector
description Anomaly and outlier detection using Isolation Forest, One-Class SVM, autoencoders, and statistical methods. Activates for "anomaly detection", "outlier detection", "fraud detection", "intrusion detection", "abnormal behavior", "unusual patterns", "detect anomalies", "system monitoring". Handles supervised and unsupervised anomaly detection with SpecWeave increment integration.

Anomaly Detector

Overview

Detect unusual patterns, outliers, and anomalies in data using statistical methods, machine learning, and deep learning. Critical for fraud detection, security monitoring, quality control, and system health monitoring—all integrated with SpecWeave's increment workflow.

Why Anomaly Detection is Different

Challenge: Anomalies are rare (0.1% - 5% of data)

Standard classification doesn't work:

  • ❌ Extreme class imbalance
  • ❌ Unknown anomaly patterns
  • ❌ Expensive to label anomalies
  • ❌ Anomalies evolve over time

Anomaly detection approaches:

  • ✅ Unsupervised (no labels needed)
  • ✅ Semi-supervised (learn from normal data)
  • ✅ Statistical (deviation from expected)
  • ✅ Context-aware (what's normal for this user/time/location?)

Anomaly Detection Methods

1. Statistical Methods (Baseline)

Z-Score / Standard Deviation:

from specweave import AnomalyDetector

detector = AnomalyDetector(
    method="statistical",
    increment="0042"
)

# Flag values > 3 standard deviations from mean
anomalies = detector.detect(
    data=transaction_amounts,
    threshold=3.0
)

# Simple, fast, but assumes normal distribution

IQR (Interquartile Range):

# More robust to non-normal distributions
detector = AnomalyDetector(method="iqr")

# Flag values outside [Q1 - 1.5*IQR, Q3 + 1.5*IQR]
anomalies = detector.detect(data=response_times)

# Good for skewed distributions

2. Isolation Forest (Recommended)

Best for: General purpose, high-dimensional data

from specweave import IsolationForestDetector

detector = IsolationForestDetector(
    contamination=0.05,  # Expected anomaly rate (5%)
    increment="0042"
)

# Train on normal data (or mixed data)
detector.fit(X_train)

# Detect anomalies
predictions = detector.predict(X_test)
# -1 = anomaly, 1 = normal

anomaly_scores = detector.score(X_test)
# Lower score = more anomalous

# Generates:
# - Anomaly scores for all samples
# - Feature importance (which features contribute to anomaly)
# - Threshold visualization
# - Top anomalies ranked by score

Why Isolation Forest works:

  • Fast (O(n log n))
  • Handles high dimensions well
  • No assumptions about data distribution
  • Anomalies are easier to isolate (fewer splits)

3. One-Class SVM

Best for: When you have only normal data for training

from specweave import OneClassSVMDetector

# Train only on normal transactions
detector = OneClassSVMDetector(
    kernel='rbf',
    nu=0.05,  # Expected anomaly rate
    increment="0042"
)

detector.fit(X_normal)

# Detect anomalies in new data
predictions = detector.predict(X_new)
# -1 = anomaly, 1 = normal

# Good for: Clean training data of normal samples

4. Autoencoders (Deep Learning)

Best for: Complex patterns, high-dimensional data, images

from specweave import AutoencoderDetector

# Learn to reconstruct normal data
detector = AutoencoderDetector(
    encoding_dim=32,  # Compressed representation
    layers=[64, 32, 16, 32, 64],
    increment="0042"
)

# Train on normal data
detector.fit(
    X_normal,
    epochs=100,
    validation_split=0.2
)

# Anomalies have high reconstruction error
anomaly_scores = detector.score(X_test)

# Generates:
# - Reconstruction error distribution
# - Threshold recommendation
# - Top anomalies with explanations
# - Learned representations (t-SNE plot)

How autoencoders work:

Input → Encoder → Compressed → Decoder → Reconstructed

Normal data: Low reconstruction error (learned well)
Anomalies: High reconstruction error (never seen before)

5. LOF (Local Outlier Factor)

Best for: Density-based anomalies (sparse regions)

from specweave import LOFDetector

# Detects points in low-density regions
detector = LOFDetector(
    n_neighbors=20,
    contamination=0.05,
    increment="0042"
)

detector.fit(X_train)
predictions = detector.predict(X_test)

# Good for: Clustered data with sparse anomalies

Anomaly Detection Workflows

Workflow 1: Fraud Detection

from specweave import FraudDetectionPipeline

pipeline = FraudDetectionPipeline(increment="0042")

# Features: transaction amount, location, time, merchant, etc.
pipeline.fit(normal_transactions)

# Real-time fraud detection
fraud_scores = pipeline.predict_proba(new_transactions)

# For each transaction:
# - Fraud probability (0-1)
# - Anomaly score
# - Contributing features
# - Similar past cases

# Generates:
# - Precision-Recall curve (fraud is rare)
# - Cost-benefit analysis (false positives vs missed fraud)
# - Feature importance for fraud
# - Fraud patterns identified

Fraud Detection Best Practices:

# 1. Use multiple signals
pipeline.add_signals([
    'amount_vs_user_average',
    'distance_from_home',
    'merchant_risk_score',
    'velocity_24h'  # Transactions in last 24h
])

# 2. Set threshold based on cost
# False Positive cost: $5 (manual review)
# False Negative cost: $500 (fraud loss)
# Optimal threshold: Maximize (savings - review_cost)

# 3. Provide explanations
explanation = pipeline.explain_prediction(suspicious_transaction)
# "Flagged because: amount 10x user average, new merchant, foreign location"

Workflow 2: System Anomaly Detection

from specweave import SystemAnomalyPipeline

# Monitor system metrics (CPU, memory, latency, errors)
pipeline = SystemAnomalyPipeline(increment="0042")

# Train on normal system behavior
pipeline.fit(normal_metrics)

# Detect system anomalies
anomalies = pipeline.detect(current_metrics)

# For each anomaly:
# - Severity (low, medium, high, critical)
# - Affected metrics
# - Similar past incidents
# - Recommended actions

# Generates:
# - Anomaly timeline
# - Metric correlations (which metrics moved together)
# - Root cause analysis
# - Alert rules

System Monitoring Best Practices:

# 1. Use time windows
pipeline.add_time_windows([
    '5min',   # Immediate spikes
    '1hour',  # Short-term trends
    '24hour'  # Daily patterns
])

# 2. Correlate metrics
pipeline.detect_correlations([
    ('high_cpu', 'slow_response'),
    ('memory_leak', 'increasing_errors')
])

# 3. Reduce alert fatigue
pipeline.set_alert_rules(
    min_severity='medium',
    min_duration='5min',  # Ignore transient spikes
    max_alerts_per_hour=5
)

Workflow 3: Manufacturing Quality Control

from specweave import QualityControlPipeline

# Detect defective products from sensor data
pipeline = QualityControlPipeline(increment="0042")

# Train on good products
pipeline.fit(good_product_sensors)

# Detect defects in production line
defect_scores = pipeline.predict(production_line_data)

# Generates:
# - Real-time defect alerts
# - Defect rate trends
# - Most common defect patterns
# - Preventive maintenance recommendations

Workflow 4: Network Intrusion Detection

from specweave import IntrusionDetectionPipeline

# Detect malicious network traffic
pipeline = IntrusionDetectionPipeline(increment="0042")

# Features: packet size, frequency, ports, protocols, etc.
pipeline.fit(normal_network_traffic)

# Detect intrusions
intrusions = pipeline.detect(network_traffic_stream)

# Generates:
# - Attack type classification (DDoS, port scan, etc.)
# - Severity scores
# - Source IPs
# - Attack timeline

Evaluation Metrics

Anomaly detection metrics (different from classification):

from specweave import AnomalyEvaluator

evaluator = AnomalyEvaluator(increment="0042")

metrics = evaluator.evaluate(
    y_true=true_labels,  # 0=normal, 1=anomaly
    y_pred=predictions,
    y_scores=anomaly_scores
)

Key Metrics:

  1. Precision @ K - Of top K flagged anomalies, how many are real?

    precision_at_100 = evaluator.precision_at_k(k=100)
    # "Of 100 flagged transactions, 85 were actual fraud" = 85%
    
  2. Recall @ K - Of all real anomalies, how many did we catch in top K?

    recall_at_100 = evaluator.recall_at_k(k=100)
    # "We caught 78% of all fraud in top 100 flagged"
    
  3. ROC AUC - Overall discrimination ability

    roc_auc = evaluator.roc_auc(y_true, y_scores)
    # 0.95 = excellent discrimination
    
  4. PR AUC - Better for imbalanced data

    pr_auc = evaluator.pr_auc(y_true, y_scores)
    # More informative when anomalies are rare (<5%)
    

Evaluation Report:

# Anomaly Detection Evaluation

## Dataset
- Total samples: 100,000
- Anomalies: 500 (0.5%)
- Features: 25

## Method: Isolation Forest

## Performance Metrics
- ROC AUC: 0.94 ✅ (excellent)
- PR AUC: 0.78 ✅ (good for 0.5% anomaly rate)

## Precision-Recall Tradeoff
- Precision @ 100: 85% (85 true anomalies in top 100)
- Recall @ 100: 17% (caught 17% of all anomalies)
- Precision @ 500: 62% (310 true anomalies in top 500)
- Recall @ 500: 62% (caught 62% of all anomalies)

## Business Impact (Fraud Detection Example)
- Review budget: 500 transactions/day
- At Precision @ 500 = 62%:
  - True fraud caught: 310/day ($155,000 saved)
  - False positives: 190/day ($950 review cost)
  - Net benefit: $154,050/day ✅

## Recommendation
✅ DEPLOY with threshold for top 500 (62% precision)

Integration with SpecWeave

Increment Structure

.specweave/increments/0042-fraud-detection/
├── spec.md (detection requirements, business impact)
├── plan.md (method selection, threshold tuning)
├── tasks.md
├── data/
│   ├── normal_transactions.csv
│   ├── labeled_fraud.csv (if available)
│   └── schema.yaml
├── experiments/
│   ├── statistical-baseline/
│   ├── isolation-forest/
│   ├── one-class-svm/
│   └── autoencoder/
├── models/
│   ├── isolation_forest_model.pkl
│   └── threshold_config.json
├── evaluation/
│   ├── precision_recall_curve.png
│   ├── roc_curve.png
│   ├── top_anomalies.csv
│   └── evaluation_report.md
└── deployment/
    ├── real_time_api.py
    ├── monitoring_dashboard.json
    └── alert_rules.yaml

Best Practices

1. Start with Labeled Anomalies (if available)

# Use labeled data to validate unsupervised methods
detector.fit(X_train)  # Unlabeled

# Evaluate on labeled test set
metrics = evaluator.evaluate(y_true_test, detector.predict(X_test))

# Choose method with best precision @ K

2. Tune Contamination Parameter

# Try different contamination rates
for contamination in [0.01, 0.05, 0.1, 0.2]:
    detector = IsolationForestDetector(contamination=contamination)
    detector.fit(X_train)
    
    metrics = evaluator.evaluate(y_test, detector.predict(X_test))
    
# Choose contamination that maximizes business value

3. Explain Anomalies

# Don't just flag anomalies - explain why
explainer = AnomalyExplainer(detector, increment="0042")

for anomaly in top_anomalies:
    explanation = explainer.explain(anomaly)
    print(f"Anomaly: {anomaly.id}")
    print(f"Reasons:")
    print(f"  - {explanation.top_features}")
    print(f"  - Similar cases: {explanation.similar_cases}")

4. Handle Concept Drift

# Anomalies evolve over time
monitor = AnomalyMonitor(increment="0042")

# Track detection performance
monitor.track_daily_performance()

# Retrain when accuracy drops
if monitor.performance_degraded():
    detector.retrain(new_normal_data)

5. Set Business-Driven Thresholds

# Balance false positives vs false negatives
optimizer = ThresholdOptimizer(increment="0042")

optimal_threshold = optimizer.find_optimal(
    detector=detector,
    data=validation_data,
    false_positive_cost=5,    # $5 per manual review
    false_negative_cost=500   # $500 per missed fraud
)

# Use optimal threshold for deployment

Advanced Features

1. Ensemble Anomaly Detection

# Combine multiple detectors
ensemble = AnomalyEnsemble(increment="0042")

ensemble.add_detector("isolation_forest", weight=0.4)
ensemble.add_detector("one_class_svm", weight=0.3)
ensemble.add_detector("autoencoder", weight=0.3)

# Ensemble vote (more robust)
anomalies = ensemble.detect(X_test)

2. Contextual Anomaly Detection

# What's normal varies by context
detector = ContextualAnomalyDetector(increment="0042")

# Different normality for different contexts
detector.fit(data, contexts=['user_id', 'time_of_day', 'location'])

# $10 transaction: Normal for user A, anomaly for user B

3. Sequential Anomaly Detection

# Detect anomalous sequences (not just individual points)
detector = SequenceAnomalyDetector(
    method='lstm',
    window_size=10,
    increment="0042"
)

# Example: Login from unusual sequence of locations

Commands

# Train anomaly detector
/ml:train-anomaly-detector 0042

# Evaluate detector
/ml:evaluate-anomaly-detector 0042

# Explain top anomalies
/ml:explain-anomalies 0042 --top 100

Summary

Anomaly detection is critical for:

  • ✅ Fraud detection (financial transactions)
  • ✅ Security monitoring (intrusion detection)
  • ✅ Quality control (manufacturing defects)
  • ✅ System health (performance monitoring)
  • ✅ Business intelligence (unusual patterns)

This skill provides battle-tested methods integrated with SpecWeave's increment workflow, ensuring anomaly detectors are reproducible, explainable, and business-aligned.