Claude Code Plugins

Community-maintained marketplace

Feedback

MLOps practices including CI/CD for ML, experiment tracking, model monitoring, pipeline orchestration, and production ML operations.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name mlops
description MLOps practices including CI/CD for ML, experiment tracking, model monitoring, pipeline orchestration, and production ML operations.

MLOps

Production ML operations and automation.

MLOps Maturity Model

┌─────────────────────────────────────────────────────────────┐
│                   MLOPS MATURITY LEVELS                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  LEVEL 0           LEVEL 1           LEVEL 2                │
│  Manual            ML Pipeline       CI/CD for ML           │
│  ───────           ──────────        ──────────             │
│  Notebooks         Automated         Automated              │
│  Manual deploy     training          retraining             │
│  No monitoring     Basic pipeline    Full automation        │
│                                                              │
│  Components:                                                 │
│  ├── Version Control (Git, DVC)                             │
│  ├── Experiment Tracking (MLflow, W&B)                      │
│  ├── Feature Store (Feast, Tecton)                          │
│  ├── Model Registry (MLflow, Sagemaker)                     │
│  ├── Orchestration (Airflow, Kubeflow)                      │
│  └── Monitoring (Prometheus, Evidently)                     │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Experiment Tracking

MLflow Integration

import mlflow
from mlflow.tracking import MlflowClient

# Set tracking server
mlflow.set_tracking_uri("http://mlflow.example.com:5000")
mlflow.set_experiment("churn_prediction")

# Start run with context manager
with mlflow.start_run(run_name="xgboost_v2") as run:
    # Log parameters
    mlflow.log_params({
        "model_type": "xgboost",
        "learning_rate": 0.1,
        "max_depth": 6,
        "n_estimators": 100
    })

    # Train model
    model = train_model(params)

    # Log metrics
    mlflow.log_metrics({
        "accuracy": 0.92,
        "f1_score": 0.89,
        "auc_roc": 0.95
    })

    # Log artifacts
    mlflow.log_artifact("feature_importance.png")
    mlflow.log_artifact("confusion_matrix.png")

    # Log model
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="churn_predictor"
    )

    # Log custom metrics over time
    for epoch in range(100):
        mlflow.log_metric("loss", train_loss, step=epoch)

# Compare runs
client = MlflowClient()
runs = client.search_runs(
    experiment_ids=["1"],
    filter_string="metrics.f1_score > 0.85",
    order_by=["metrics.f1_score DESC"]
)

Weights & Biases

import wandb

wandb.init(
    project="ml-project",
    config={
        "learning_rate": 0.001,
        "architecture": "ResNet50",
        "epochs": 100
    }
)

# Log metrics
for epoch in range(100):
    wandb.log({
        "epoch": epoch,
        "loss": train_loss,
        "val_loss": val_loss,
        "accuracy": accuracy
    })

# Log images
wandb.log({"examples": [wandb.Image(img, caption=label) for img, label in samples]})

# Log model
wandb.save("model.pt")

# Hyperparameter sweeps
sweep_config = {
    "method": "bayes",
    "metric": {"name": "val_loss", "goal": "minimize"},
    "parameters": {
        "learning_rate": {"min": 0.0001, "max": 0.1},
        "batch_size": {"values": [16, 32, 64]}
    }
}
sweep_id = wandb.sweep(sweep_config)
wandb.agent(sweep_id, train_function, count=50)

Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model
model_uri = f"runs:/{run_id}/model"
result = mlflow.register_model(model_uri, "production_model")

# Transition stages
client.transition_model_version_stage(
    name="production_model",
    version=result.version,
    stage="Staging"
)

# Add description and tags
client.update_model_version(
    name="production_model",
    version=result.version,
    description="XGBoost model trained on Q4 data"
)

client.set_model_version_tag(
    name="production_model",
    version=result.version,
    key="validation_status",
    value="passed"
)

# Load production model
model = mlflow.pyfunc.load_model("models:/production_model/Production")

# Compare versions
def compare_model_versions(model_name, version_a, version_b, test_data):
    model_a = mlflow.pyfunc.load_model(f"models:/{model_name}/{version_a}")
    model_b = mlflow.pyfunc.load_model(f"models:/{model_name}/{version_b}")

    metrics_a = evaluate(model_a, test_data)
    metrics_b = evaluate(model_b, test_data)

    return {
        "version_a": {"version": version_a, **metrics_a},
        "version_b": {"version": version_b, **metrics_b}
    }

Pipeline Orchestration

Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Tasks
extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extract_training_data,
    dag=dag
)

validate_data = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data_quality,
    dag=dag
)

train_model = PythonOperator(
    task_id='train_model',
    python_callable=train_and_log_model,
    dag=dag
)

evaluate_model = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model_performance,
    dag=dag
)

deploy_model = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_to_production,
    dag=dag
)

# Dependencies
extract_data >> validate_data >> train_model >> evaluate_model >> deploy_model

Kubeflow Pipeline

from kfp import dsl
from kfp.components import create_component_from_func

@create_component_from_func
def preprocess_data(input_path: str, output_path: str):
    import pandas as pd
    df = pd.read_csv(input_path)
    # Preprocessing logic
    df.to_parquet(output_path)

@create_component_from_func
def train_model(data_path: str, model_path: str, hyperparameters: dict):
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    # Training logic
    model = RandomForestClassifier(**hyperparameters)
    joblib.dump(model, model_path)

@dsl.pipeline(
    name='ML Training Pipeline',
    description='End-to-end ML training pipeline'
)
def ml_pipeline(input_data: str, hyperparameters: dict):
    preprocess_op = preprocess_data(input_data, '/tmp/processed.parquet')

    train_op = train_model(
        preprocess_op.output,
        '/tmp/model.joblib',
        hyperparameters
    )

    # Add GPU resources
    train_op.set_gpu_limit(1)
    train_op.set_memory_limit('8Gi')

CI/CD for ML

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    paths:
      - 'src/**'
      - 'data/**'
  schedule:
    - cron: '0 0 * * 0'  # Weekly retraining

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Validate data
        run: |
          python -m pytest tests/data_validation/
          dvc pull
          great_expectations checkpoint run data_quality

  train:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v3
      - name: Train model
        run: |
          python train.py --config configs/production.yaml
          mlflow run . -P epochs=100

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - name: Evaluate model
        run: |
          python evaluate.py --model-version ${{ github.sha }}
          python check_performance_regression.py

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: |
          kubectl apply -f k8s/staging/
          python smoke_test.py --env staging

      - name: Deploy to production
        run: |
          kubectl apply -f k8s/production/
          python smoke_test.py --env production

Data Version Control

# Initialize DVC
dvc init
dvc remote add -d storage s3://my-bucket/dvc-storage

# Track data files
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Add training data"

# Push data
dvc push

# Create pipeline
dvc run -n preprocess \
  -d src/preprocess.py -d data/raw.csv \
  -o data/processed.csv \
  python src/preprocess.py

dvc run -n train \
  -d src/train.py -d data/processed.csv \
  -o models/model.pkl \
  -M metrics.json \
  python src/train.py

# Reproduce pipeline
dvc repro

# Compare experiments
dvc exp run --set-param train.lr=0.001
dvc exp show
dvc exp diff

Commands

  • /omgops:pipeline - Pipeline management
  • /omgops:registry - Model registry
  • /omgops:monitor - System monitoring
  • /omgml:status - Project status

Best Practices

  1. Version everything (code, data, models)
  2. Automate training pipelines
  3. Implement quality gates
  4. Track all experiments
  5. Use feature stores for consistency