Claude Code Plugins

Community-maintained marketplace

Feedback

ml-engineering-workflows

@tachyon-beep/skillpacks
1
0

Experiment tracking, reproducibility, MLflow, Hydra configuration, model lifecycle, metrics and logging, production monitoring, ML project structure

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name ml-engineering-workflows
description Experiment tracking, reproducibility, MLflow, Hydra configuration, model lifecycle, metrics and logging, production monitoring, ML project structure

ML Engineering Workflows

Overview

Core Principle: Experiments must be reproducible. Track everything: code, data, parameters, metrics, environment. Without reproducibility, ML experiments are just random number generation.

ML engineering is about systematic experimentation and production deployment. Track experiments with MLflow/Weights & Biases, manage configuration with Hydra, ensure reproducible data splits, monitor models in production. The biggest mistake: running experiments without tracking parameters or random seeds.

When to Use

Use this skill when:

  • "Track ML experiments"
  • "MLflow setup"
  • "Reproducible ML"
  • "Model lifecycle"
  • "Hyperparameter management"
  • "ML monitoring"
  • "ML project structure"
  • "Experiment comparison"

Don't use when:

  • Setting up Python project (use project-structure-and-tooling first)
  • NumPy/pandas optimization (use scientific-computing-foundations)
  • Profiling ML code (use debugging-and-profiling)

Symptoms triggering this skill:

  • Can't reproduce results
  • Lost track of which parameters produced which metrics
  • Need to compare many experiments
  • Deploying model to production

Experiment Tracking with MLflow

Basic MLflow Setup

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

# ❌ WRONG: Not tracking experiments
model = RandomForestClassifier(n_estimators=100, max_depth=10)
model.fit(X_train, y_train)
accuracy = accuracy_score(y_test, model.predict(X_test))
print(f"Accuracy: {accuracy}")  # Lost forever after terminal closes

# ✅ CORRECT: Track with MLflow
mlflow.set_experiment("my_experiment")

with mlflow.start_run():
    # Log parameters
    params = {"n_estimators": 100, "max_depth": 10}
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred))
    mlflow.log_metric("f1_score", f1_score(y_test, y_pred, average='weighted'))

    # Log model
    mlflow.sklearn.log_model(model, "model")

# ✅ CORRECT: Log artifacts (plots, confusion matrix)
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

with mlflow.start_run():
    mlflow.log_params(params)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    # Create and save confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(cm)
    disp.plot()
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")
    plt.close()

    mlflow.log_metrics({
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred, average='weighted')
    })

Why this matters: MLflow tracks all experiments with parameters and metrics. Can compare runs, reproduce results, and deploy best model.

Nested Runs for Cross-Validation

# ❌ WRONG: CV results not tracked properly
from sklearn.model_selection import cross_val_score
scores = cross_val_score(model, X, y, cv=5)
print(f"Mean: {scores.mean()}")  # Lost context

# ✅ CORRECT: Track CV with nested runs
from sklearn.model_selection import KFold

with mlflow.start_run(run_name="rf_cv_experiment") as parent_run:
    mlflow.log_params(params)

    kf = KFold(n_splits=5, shuffle=True, random_seed=42)
    cv_scores = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        with mlflow.start_run(run_name=f"fold_{fold}", nested=True):
            X_train_fold, X_val_fold = X[train_idx], X[val_idx]
            y_train_fold, y_val_fold = y[train_idx], y[val_idx]

            model = RandomForestClassifier(**params, random_state=42)
            model.fit(X_train_fold, y_train_fold)

            score = accuracy_score(y_val_fold, model.predict(X_val_fold))
            cv_scores.append(score)

            mlflow.log_metric("accuracy", score)
            mlflow.log_metric("fold", fold)

    # Log aggregate metrics in parent run
    mlflow.log_metric("cv_mean_accuracy", np.mean(cv_scores))
    mlflow.log_metric("cv_std_accuracy", np.std(cv_scores))

Hyperparameter Tuning with Tracking

from sklearn.model_selection import GridSearchCV

# ❌ WRONG: GridSearchCV without tracking
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [5, 10, 20]
}
grid_search = GridSearchCV(RandomForestClassifier(), param_grid, cv=5)
grid_search.fit(X_train, y_train)
print(grid_search.best_params_)  # Only get best, lose all other trials

# ✅ CORRECT: Track all hyperparameter trials
with mlflow.start_run(run_name="grid_search"):
    for n_est in [50, 100, 200]:
        for max_d in [5, 10, 20]:
            with mlflow.start_run(nested=True):
                params = {"n_estimators": n_est, "max_depth": max_d}
                mlflow.log_params(params)

                model = RandomForestClassifier(**params, random_state=42)
                model.fit(X_train, y_train)

                score = accuracy_score(y_test, model.predict(X_test))
                mlflow.log_metric("accuracy", score)

# ✅ BETTER: Use MLflow with Optuna for Bayesian optimization
import optuna
from optuna.integration.mlflow import MLflowCallback

def objective(trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 50, 200),
        "max_depth": trial.suggest_int("max_depth", 5, 20),
    }

    model = RandomForestClassifier(**params, random_state=42)
    model.fit(X_train, y_train)
    return accuracy_score(y_test, model.predict(X_test))

mlflc = MLflowCallback(tracking_uri="mlruns", metric_name="accuracy")
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50, callbacks=[mlflc])

Why this matters: Hyperparameter tuning generates many experiments. Tracking all trials enables comparison and understanding of parameter importance.


Configuration Management with Hydra

Basic Hydra Configuration

# ❌ WRONG: Hardcoded parameters
def train():
    learning_rate = 0.001
    batch_size = 32
    epochs = 100
    # What if we want to try different values? Edit code each time?

# ✅ CORRECT: Hydra configuration
# File: config.yaml
"""
model:
  learning_rate: 0.001
  batch_size: 32
  epochs: 100

data:
  train_path: data/train.csv
  test_path: data/test.csv
"""

# File: train.py
import hydra
from omegaconf import DictConfig

@hydra.main(config_path=".", config_name="config", version_base=None)
def train(cfg: DictConfig):
    print(f"Learning rate: {cfg.model.learning_rate}")
    print(f"Batch size: {cfg.model.batch_size}")

    # Access config values
    model = create_model(
        lr=cfg.model.learning_rate,
        batch_size=cfg.model.batch_size
    )

if __name__ == "__main__":
    train()

# Run with overrides:
# python train.py model.learning_rate=0.01 model.batch_size=64

Structured Configs with Dataclasses

# ✅ CORRECT: Type-safe configs with dataclasses
from dataclasses import dataclass
from hydra.core.config_store import ConfigStore

@dataclass
class ModelConfig:
    learning_rate: float = 0.001
    batch_size: int = 32
    epochs: int = 100
    hidden_dim: int = 256

@dataclass
class DataConfig:
    train_path: str = "data/train.csv"
    test_path: str = "data/test.csv"
    val_split: float = 0.2

@dataclass
class Config:
    model: ModelConfig = ModelConfig()
    data: DataConfig = DataConfig()

cs = ConfigStore.instance()
cs.store(name="config", node=Config)

@hydra.main(config_path=None, config_name="config", version_base=None)
def train(cfg: Config):
    # Type hints work!
    lr: float = cfg.model.learning_rate
    batch_size: int = cfg.model.batch_size

if __name__ == "__main__":
    train()

Why this matters: Hydra enables command-line overrides without code changes. Structured configs provide type safety and IDE autocomplete.

Multi-Run Sweeps

# ✅ CORRECT: Hydra multirun for hyperparameter sweeps
# config.yaml
"""
defaults:
  - override hydra/launcher: basic

model:
  learning_rate: 0.001
  batch_size: 32
"""

# Run multiple experiments:
# python train.py -m model.learning_rate=0.001,0.01,0.1 model.batch_size=32,64,128
# Creates 9 runs (3 x 3)

@hydra.main(config_path=".", config_name="config", version_base=None)
def train(cfg: DictConfig):
    with mlflow.start_run():
        mlflow.log_params({
            "learning_rate": cfg.model.learning_rate,
            "batch_size": cfg.model.batch_size
        })

        model = train_model(cfg)
        metrics = evaluate_model(model, test_data)
        mlflow.log_metrics(metrics)

Reproducibility Best Practices

Random Seed Management

import random
import numpy as np
import torch

# ❌ WRONG: No random seed
model = create_model()
model.fit(X_train, y_train)
# Different results every run!

# ✅ CORRECT: Set all random seeds
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    # For full reproducibility with CUDA
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# In training script
set_seed(42)
model = create_model()
model.fit(X_train, y_train)

# ✅ CORRECT: Track seed in MLflow
with mlflow.start_run():
    seed = 42
    mlflow.log_param("random_seed", seed)
    set_seed(seed)
    # ... training code ...

Reproducible Data Splits

# ❌ WRONG: Non-reproducible split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Different split every time!

# ✅ CORRECT: Fixed random seed for splits
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# ✅ BETTER: Hash-based deterministic split (stable across runs)
import hashlib

def hash_split(df, test_size=0.2, id_column='id'):
    """Deterministic split based on ID hash."""
    def test_set_check(identifier, test_size):
        hash_val = int(hashlib.md5(str(identifier).encode()).hexdigest(), 16)
        return hash_val % 100 < test_size * 100

    is_test = df[id_column].apply(lambda x: test_set_check(x, test_size))
    return df[~is_test], df[is_test]

train_df, test_df = hash_split(df, test_size=0.2, id_column='user_id')
# Same split even if data order changes or new rows added

Environment Reproducibility

# ✅ CORRECT: Log environment info
import mlflow
import sys
import platform

with mlflow.start_run():
    # Log Python version
    mlflow.log_param("python_version", sys.version)

    # Log package versions
    import sklearn
    import pandas
    import numpy
    mlflow.log_params({
        "sklearn_version": sklearn.__version__,
        "pandas_version": pandas.__version__,
        "numpy_version": numpy.__version__,
    })

    # Log system info
    mlflow.log_params({
        "platform": platform.platform(),
        "cpu_count": os.cpu_count()
    })

# ✅ BETTER: Use conda/docker for full reproducibility
# conda env export > environment.yml
# Log environment file as artifact
with mlflow.start_run():
    mlflow.log_artifact("environment.yml")

Why this matters: Reproducibility requires controlling all randomness sources. Different package versions or Python versions can produce different results.


Data Versioning and Lineage

Data Versioning with DVC

# Initialize DVC
dvc init

# Track large data files
dvc add data/train.csv
git add data/train.csv.dvc data/.gitignore
git commit -m "Track training data"

# Configure remote storage (S3, GCS, Azure, etc.)
dvc remote add -d myremote s3://mybucket/dvcstore
dvc push

# Retrieve specific version
git checkout v1.0
dvc pull

Logging Data Info in MLflow

# ✅ CORRECT: Log data characteristics
import pandas as pd
import mlflow

with mlflow.start_run():
    # Load data
    df = pd.read_csv("data/train.csv")

    # Log data info
    mlflow.log_params({
        "n_samples": len(df),
        "n_features": len(df.columns),
        "class_balance": df['target'].value_counts().to_dict(),
        "data_version": "v1.0",  # Track data version
        "data_hash": hashlib.md5(df.to_csv(index=False).encode()).hexdigest()
    })

    # Log sample of data
    df.head(100).to_csv("data_sample.csv", index=False)
    mlflow.log_artifact("data_sample.csv")

Feature Engineering Pipeline Tracking

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# ✅ CORRECT: Track entire preprocessing pipeline
with mlflow.start_run():
    # Define pipeline
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=50)),
        ('classifier', RandomForestClassifier(n_estimators=100))
    ])

    # Log pipeline parameters
    mlflow.log_params({
        "scaler": "StandardScaler",
        "pca_components": 50,
        "classifier": "RandomForestClassifier",
        "n_estimators": 100
    })

    # Fit pipeline
    pipeline.fit(X_train, y_train)

    # Log entire pipeline
    mlflow.sklearn.log_model(pipeline, "model_pipeline")

    # Evaluate
    score = pipeline.score(X_test, y_test)
    mlflow.log_metric("accuracy", score)

Model Lifecycle Management

Model Registry

# ✅ CORRECT: Register model in MLflow
with mlflow.start_run() as run:
    model = train_model(X_train, y_train)
    mlflow.sklearn.log_model(model, "model")

    # Register model
    model_uri = f"runs:/{run.info.run_id}/model"
    mlflow.register_model(model_uri, "my_model")

# ✅ CORRECT: Promote model to production
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Get latest version
latest_version = client.get_latest_versions("my_model", stages=["None"])[0]

# Transition to staging
client.transition_model_version_stage(
    name="my_model",
    version=latest_version.version,
    stage="Staging"
)

# After testing, promote to production
client.transition_model_version_stage(
    name="my_model",
    version=latest_version.version,
    stage="Production"
)

# ✅ CORRECT: Load production model
model = mlflow.pyfunc.load_model(
    model_uri="models:/my_model/Production"
)
predictions = model.predict(X_new)

Model Metadata and Tags

# ✅ CORRECT: Add tags for searchability
with mlflow.start_run() as run:
    mlflow.set_tag("model_type", "random_forest")
    mlflow.set_tag("task", "classification")
    mlflow.set_tag("dataset", "customer_churn")
    mlflow.set_tag("owner", "data_science_team")

    # Train and log model
    model = train_model(X_train, y_train)
    mlflow.sklearn.log_model(model, "model")

    # Add version tag
    mlflow.set_tag("version", "v2.1.0")

# Search for runs with tags
from mlflow.tracking import MlflowClient
client = MlflowClient()
runs = client.search_runs(
    experiment_ids=["0"],
    filter_string="tags.model_type = 'random_forest' AND metrics.accuracy > 0.85"
)

Metrics and Logging

Structured Logging

import logging
import mlflow

# ✅ CORRECT: Structured logging with MLflow
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

with mlflow.start_run():
    logger.info("Starting training")
    mlflow.log_param("learning_rate", 0.001)

    for epoch in range(num_epochs):
        train_loss = train_epoch(model, train_loader)
        val_loss = validate(model, val_loader)

        # Log metrics per epoch
        mlflow.log_metrics({
            "train_loss": train_loss,
            "val_loss": val_loss
        }, step=epoch)

        logger.info(f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}")

        # Early stopping check
        if val_loss > best_val_loss:
            patience_counter += 1
            if patience_counter > patience:
                logger.info(f"Early stopping at epoch {epoch}")
                mlflow.set_tag("early_stopped", "true")
                mlflow.log_param("stopped_epoch", epoch)
                break

Custom Metrics

from sklearn.metrics import make_scorer

# ✅ CORRECT: Define and log custom metrics
def business_metric(y_true, y_pred):
    """Custom metric: cost of false positives vs false negatives."""
    fp_cost = 10  # Cost of false positive
    fn_cost = 100  # Cost of false negative

    fp = ((y_pred == 1) & (y_true == 0)).sum()
    fn = ((y_pred == 0) & (y_true == 1)).sum()

    return fp * fp_cost + fn * fn_cost

with mlflow.start_run():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    # Log standard metrics
    mlflow.log_metrics({
        "accuracy": accuracy_score(y_test, y_pred),
        "f1": f1_score(y_test, y_pred)
    })

    # Log custom business metric
    cost = business_metric(y_test, y_pred)
    mlflow.log_metric("business_cost", cost)

Metric Visualization

# ✅ CORRECT: Log plots and visualizations
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc

with mlflow.start_run():
    # Train model
    model.fit(X_train, y_train)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    # ROC curve
    fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
    roc_auc = auc(fpr, tpr)

    plt.figure()
    plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()
    plt.savefig("roc_curve.png")
    mlflow.log_artifact("roc_curve.png")
    plt.close()

    # Feature importance
    importances = model.feature_importances_
    indices = np.argsort(importances)[::-1]

    plt.figure(figsize=(10, 6))
    plt.bar(range(len(importances)), importances[indices])
    plt.xlabel('Feature Index')
    plt.ylabel('Importance')
    plt.title('Feature Importances')
    plt.savefig("feature_importance.png")
    mlflow.log_artifact("feature_importance.png")
    plt.close()

    mlflow.log_metric("roc_auc", roc_auc)

Production Monitoring

Model Performance Monitoring

# ✅ CORRECT: Monitor model performance in production
import mlflow
from datetime import datetime

class ModelMonitor:
    def __init__(self, model_name: str, model_version: str):
        self.model_name = model_name
        self.model_version = model_version
        self.mlflow_client = MlflowClient()

    def log_prediction(self, features, prediction, actual=None):
        """Log prediction for monitoring."""
        with mlflow.start_run(run_name=f"prediction_{datetime.now().isoformat()}"):
            mlflow.log_param("model_name", self.model_name)
            mlflow.log_param("model_version", self.model_version)

            # Log feature statistics
            mlflow.log_params({
                f"feature_{i}_mean": float(features[:, i].mean())
                for i in range(features.shape[1])
            })

            # Log prediction
            mlflow.log_metric("prediction", float(prediction))

            # If actual available (for online evaluation)
            if actual is not None:
                mlflow.log_metric("actual", float(actual))
                mlflow.log_metric("error", abs(float(prediction - actual)))

    def check_data_drift(self, current_data, reference_data):
        """Detect data drift using KS test."""
        from scipy.stats import ks_2samp

        drift_detected = False
        drift_features = []

        with mlflow.start_run(run_name="drift_check"):
            for i in range(current_data.shape[1]):
                stat, p_value = ks_2samp(
                    reference_data[:, i],
                    current_data[:, i]
                )

                mlflow.log_metric(f"feature_{i}_ks_stat", stat)
                mlflow.log_metric(f"feature_{i}_p_value", p_value)

                if p_value < 0.05:  # Significant drift
                    drift_detected = True
                    drift_features.append(i)

            mlflow.log_param("drift_detected", drift_detected)
            mlflow.log_param("drift_features", drift_features)

        return drift_detected, drift_features

Alerting and Anomaly Detection

# ✅ CORRECT: Monitor for anomalies in predictions
class PredictionMonitor:
    def __init__(self, threshold_std: float = 3.0):
        self.threshold_std = threshold_std
        self.recent_predictions = []
        self.window_size = 1000

    def check_anomaly(self, prediction: float) -> bool:
        """Check if prediction is anomalous."""
        self.recent_predictions.append(prediction)

        if len(self.recent_predictions) > self.window_size:
            self.recent_predictions.pop(0)

        if len(self.recent_predictions) < 100:
            return False  # Not enough data

        mean = np.mean(self.recent_predictions)
        std = np.std(self.recent_predictions)

        z_score = abs(prediction - mean) / std

        is_anomaly = z_score > self.threshold_std

        # Log to MLflow
        mlflow.log_metrics({
            "prediction": prediction,
            "rolling_mean": mean,
            "rolling_std": std,
            "z_score": z_score,
            "is_anomaly": int(is_anomaly)
        })

        return is_anomaly

ML Project Structure

Standard Project Layout

ml_project/
├── data/
│   ├── raw/              # Original immutable data
│   ├── processed/        # Cleaned, transformed data
│   └── features/         # Engineered features
├── notebooks/            # Exploratory notebooks
│   └── eda.ipynb
├── src/
│   ├── __init__.py
│   ├── data/
│   │   ├── __init__.py
│   │   ├── load.py       # Data loading
│   │   └── preprocess.py # Preprocessing
│   ├── features/
│   │   ├── __init__.py
│   │   └── build.py      # Feature engineering
│   ├── models/
│   │   ├── __init__.py
│   │   ├── train.py      # Training
│   │   ├── predict.py    # Inference
│   │   └── evaluate.py   # Evaluation
│   └── utils/
│       ├── __init__.py
│       └── config.py     # Configuration
├── tests/
│   ├── test_data.py
│   ├── test_features.py
│   └── test_models.py
├── configs/              # Hydra configs
│   ├── config.yaml
│   ├── model/
│   │   ├── rf.yaml
│   │   └── xgboost.yaml
│   └── data/
│       └── default.yaml
├── mlruns/              # MLflow tracking
├── outputs/             # Hydra outputs
├── requirements.txt
├── setup.py
└── README.md

Makefile for Common Tasks

# ✅ CORRECT: Makefile for reproducible workflows
.PHONY: data features train evaluate

data:
	python src/data/load.py
	python src/data/preprocess.py

features: data
	python src/features/build.py

train: features
	python src/models/train.py

evaluate: train
	python src/models/evaluate.py

clean:
	rm -rf data/processed/*
	rm -rf mlruns/*

test:
	pytest tests/

lint:
	ruff check src/
	mypy src/

Integration Patterns

MLflow + Hydra Integration

# ✅ CORRECT: Combine MLflow tracking with Hydra config
import hydra
from omegaconf import DictConfig, OmegaConf
import mlflow

@hydra.main(config_path="configs", config_name="config", version_base=None)
def train(cfg: DictConfig):
    # Set MLflow experiment
    mlflow.set_experiment(cfg.experiment_name)

    with mlflow.start_run():
        # Log all Hydra config as parameters
        mlflow.log_params(OmegaConf.to_container(cfg, resolve=True))

        # Log Hydra config file as artifact
        config_path = ".hydra/config.yaml"
        mlflow.log_artifact(config_path)

        # Train model
        model = create_model(cfg.model)
        model.fit(X_train, y_train)

        # Log metrics
        metrics = evaluate_model(model, X_test, y_test)
        mlflow.log_metrics(metrics)

        # Log model
        mlflow.sklearn.log_model(model, "model")

if __name__ == "__main__":
    train()

Continuous Training Pipeline

# ✅ CORRECT: Automated retraining pipeline
from datetime import datetime
import mlflow

def continuous_training_pipeline():
    """Retrain model if performance degrades."""
    # Load production model
    prod_model = mlflow.pyfunc.load_model("models:/my_model/Production")

    # Load recent data
    recent_data = load_recent_data()

    # Evaluate production model on recent data
    prod_metrics = evaluate_model(prod_model, recent_data)

    # Check if retraining needed
    if prod_metrics['accuracy'] < 0.85:  # Threshold
        print("Performance degraded, retraining...")

        with mlflow.start_run(run_name=f"retrain_{datetime.now().isoformat()}"):
            # Log why retraining
            mlflow.set_tag("retrain_reason", "accuracy_below_threshold")
            mlflow.log_metric("prod_accuracy", prod_metrics['accuracy'])

            # Train new model
            new_model = train_model(load_training_data())

            # Evaluate new model
            new_metrics = evaluate_model(new_model, recent_data)
            mlflow.log_metrics(new_metrics)

            # If better, register and promote
            if new_metrics['accuracy'] > prod_metrics['accuracy']:
                mlflow.sklearn.log_model(new_model, "model")

                # Register new version
                model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
                model_version = mlflow.register_model(model_uri, "my_model")

                # Promote to production
                client = MlflowClient()
                client.transition_model_version_stage(
                    name="my_model",
                    version=model_version.version,
                    stage="Production",
                    archive_existing_versions=True
                )

Anti-Patterns

Not Tracking Experiments

# ❌ WRONG: No tracking
for lr in [0.001, 0.01, 0.1]:
    model = train_model(lr)
    print(f"LR={lr}, Accuracy={evaluate(model)}")
# Which LR was best? Lost after terminal closes.

# ✅ CORRECT: Track everything
for lr in [0.001, 0.01, 0.1]:
    with mlflow.start_run():
        mlflow.log_param("learning_rate", lr)
        model = train_model(lr)
        acc = evaluate(model)
        mlflow.log_metric("accuracy", acc)
        mlflow.sklearn.log_model(model, "model")

Non-Reproducible Data Splits

# ❌ WRONG: Random split without seed
X_train, X_test = train_test_split(X, y, test_size=0.2)
# Different split every run!

# ✅ CORRECT: Fixed seed
X_train, X_test = train_test_split(X, y, test_size=0.2, random_state=42)

Hardcoded Paths and Parameters

# ❌ WRONG: Hardcoded values
data = pd.read_csv("/home/user/data/train.csv")
model = RandomForestClassifier(n_estimators=100, max_depth=10)

# ✅ CORRECT: Config-driven
@hydra.main(config_path=".", config_name="config", version_base=None)
def train(cfg: DictConfig):
    data = pd.read_csv(cfg.data.train_path)
    model = RandomForestClassifier(
        n_estimators=cfg.model.n_estimators,
        max_depth=cfg.model.max_depth
    )

Decision Trees

Should I Track This Experiment?

Is this a throwaway experiment?
├─ Yes (just testing) → Maybe skip tracking
└─ No → ALWAYS TRACK
    ├─ Comparing models → Track
    ├─ Tuning hyperparameters → Track
    ├─ Production candidate → Track
    └─ Debugging → Track (helps identify issues)

When to Register a Model?

Is model for production use?
├─ Yes → Register in model registry
│   ├─ Test in staging first
│   └─ Promote to production after validation
└─ No (experiment only) → Log but don't register

Integration with Other Skills

After using this skill:

  • If profiling ML code → See @debugging-and-profiling
  • If optimizing data processing → See @scientific-computing-foundations
  • If setting up CI/CD → See @project-structure-and-tooling

Before using this skill:

  • If setting up project → Use @project-structure-and-tooling first
  • If data processing slow → Use @scientific-computing-foundations to optimize

Quick Reference

MLflow Essential Commands

# Start run
with mlflow.start_run():
    mlflow.log_param("param_name", value)
    mlflow.log_metric("metric_name", value)
    mlflow.log_artifact("file.png")
    mlflow.sklearn.log_model(model, "model")

# Register model
mlflow.register_model("runs:/<run_id>/model", "model_name")

# Load model
model = mlflow.pyfunc.load_model("models:/model_name/Production")

Hydra Essential Patterns

# Basic config
@hydra.main(config_path=".", config_name="config", version_base=None)
def main(cfg: DictConfig):
    print(cfg.param)

# Override from CLI
# python script.py param=value

# Multirun
# python script.py -m param=1,2,3

Reproducibility Checklist

  • Set random seeds (Python, NumPy, PyTorch)
  • Use fixed random_state in train_test_split
  • Track data version/hash
  • Log package versions
  • Track preprocessing steps
  • Version control code
  • Use config files (don't hardcode)