| name | mlops |
| description | MLOps practices including CI/CD for ML, experiment tracking, model monitoring, pipeline orchestration, and production ML operations. |
MLOps
Production ML operations and automation.
MLOps Maturity Model
┌─────────────────────────────────────────────────────────────┐
│ MLOPS MATURITY LEVELS │
├─────────────────────────────────────────────────────────────┤
│ │
│ LEVEL 0 LEVEL 1 LEVEL 2 │
│ Manual ML Pipeline CI/CD for ML │
│ ─────── ────────── ────────── │
│ Notebooks Automated Automated │
│ Manual deploy training retraining │
│ No monitoring Basic pipeline Full automation │
│ │
│ Components: │
│ ├── Version Control (Git, DVC) │
│ ├── Experiment Tracking (MLflow, W&B) │
│ ├── Feature Store (Feast, Tecton) │
│ ├── Model Registry (MLflow, Sagemaker) │
│ ├── Orchestration (Airflow, Kubeflow) │
│ └── Monitoring (Prometheus, Evidently) │
│ │
└─────────────────────────────────────────────────────────────┘
Experiment Tracking
MLflow Integration
import mlflow
from mlflow.tracking import MlflowClient
# Set tracking server
mlflow.set_tracking_uri("http://mlflow.example.com:5000")
mlflow.set_experiment("churn_prediction")
# Start run with context manager
with mlflow.start_run(run_name="xgboost_v2") as run:
# Log parameters
mlflow.log_params({
"model_type": "xgboost",
"learning_rate": 0.1,
"max_depth": 6,
"n_estimators": 100
})
# Train model
model = train_model(params)
# Log metrics
mlflow.log_metrics({
"accuracy": 0.92,
"f1_score": 0.89,
"auc_roc": 0.95
})
# Log artifacts
mlflow.log_artifact("feature_importance.png")
mlflow.log_artifact("confusion_matrix.png")
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="churn_predictor"
)
# Log custom metrics over time
for epoch in range(100):
mlflow.log_metric("loss", train_loss, step=epoch)
# Compare runs
client = MlflowClient()
runs = client.search_runs(
experiment_ids=["1"],
filter_string="metrics.f1_score > 0.85",
order_by=["metrics.f1_score DESC"]
)
Weights & Biases
import wandb
wandb.init(
project="ml-project",
config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"epochs": 100
}
)
# Log metrics
for epoch in range(100):
wandb.log({
"epoch": epoch,
"loss": train_loss,
"val_loss": val_loss,
"accuracy": accuracy
})
# Log images
wandb.log({"examples": [wandb.Image(img, caption=label) for img, label in samples]})
# Log model
wandb.save("model.pt")
# Hyperparameter sweeps
sweep_config = {
"method": "bayes",
"metric": {"name": "val_loss", "goal": "minimize"},
"parameters": {
"learning_rate": {"min": 0.0001, "max": 0.1},
"batch_size": {"values": [16, 32, 64]}
}
}
sweep_id = wandb.sweep(sweep_config)
wandb.agent(sweep_id, train_function, count=50)
Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register model
model_uri = f"runs:/{run_id}/model"
result = mlflow.register_model(model_uri, "production_model")
# Transition stages
client.transition_model_version_stage(
name="production_model",
version=result.version,
stage="Staging"
)
# Add description and tags
client.update_model_version(
name="production_model",
version=result.version,
description="XGBoost model trained on Q4 data"
)
client.set_model_version_tag(
name="production_model",
version=result.version,
key="validation_status",
value="passed"
)
# Load production model
model = mlflow.pyfunc.load_model("models:/production_model/Production")
# Compare versions
def compare_model_versions(model_name, version_a, version_b, test_data):
model_a = mlflow.pyfunc.load_model(f"models:/{model_name}/{version_a}")
model_b = mlflow.pyfunc.load_model(f"models:/{model_name}/{version_b}")
metrics_a = evaluate(model_a, test_data)
metrics_b = evaluate(model_b, test_data)
return {
"version_a": {"version": version_a, **metrics_a},
"version_b": {"version": version_b, **metrics_b}
}
Pipeline Orchestration
Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
# Tasks
extract_data = PythonOperator(
task_id='extract_data',
python_callable=extract_training_data,
dag=dag
)
validate_data = PythonOperator(
task_id='validate_data',
python_callable=validate_data_quality,
dag=dag
)
train_model = PythonOperator(
task_id='train_model',
python_callable=train_and_log_model,
dag=dag
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model_performance,
dag=dag
)
deploy_model = PythonOperator(
task_id='deploy_model',
python_callable=deploy_to_production,
dag=dag
)
# Dependencies
extract_data >> validate_data >> train_model >> evaluate_model >> deploy_model
Kubeflow Pipeline
from kfp import dsl
from kfp.components import create_component_from_func
@create_component_from_func
def preprocess_data(input_path: str, output_path: str):
import pandas as pd
df = pd.read_csv(input_path)
# Preprocessing logic
df.to_parquet(output_path)
@create_component_from_func
def train_model(data_path: str, model_path: str, hyperparameters: dict):
import joblib
from sklearn.ensemble import RandomForestClassifier
# Training logic
model = RandomForestClassifier(**hyperparameters)
joblib.dump(model, model_path)
@dsl.pipeline(
name='ML Training Pipeline',
description='End-to-end ML training pipeline'
)
def ml_pipeline(input_data: str, hyperparameters: dict):
preprocess_op = preprocess_data(input_data, '/tmp/processed.parquet')
train_op = train_model(
preprocess_op.output,
'/tmp/model.joblib',
hyperparameters
)
# Add GPU resources
train_op.set_gpu_limit(1)
train_op.set_memory_limit('8Gi')
CI/CD for ML
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
paths:
- 'src/**'
- 'data/**'
schedule:
- cron: '0 0 * * 0' # Weekly retraining
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Validate data
run: |
python -m pytest tests/data_validation/
dvc pull
great_expectations checkpoint run data_quality
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v3
- name: Train model
run: |
python train.py --config configs/production.yaml
mlflow run . -P epochs=100
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- name: Evaluate model
run: |
python evaluate.py --model-version ${{ github.sha }}
python check_performance_regression.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: |
kubectl apply -f k8s/staging/
python smoke_test.py --env staging
- name: Deploy to production
run: |
kubectl apply -f k8s/production/
python smoke_test.py --env production
Data Version Control
# Initialize DVC
dvc init
dvc remote add -d storage s3://my-bucket/dvc-storage
# Track data files
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Add training data"
# Push data
dvc push
# Create pipeline
dvc run -n preprocess \
-d src/preprocess.py -d data/raw.csv \
-o data/processed.csv \
python src/preprocess.py
dvc run -n train \
-d src/train.py -d data/processed.csv \
-o models/model.pkl \
-M metrics.json \
python src/train.py
# Reproduce pipeline
dvc repro
# Compare experiments
dvc exp run --set-param train.lr=0.001
dvc exp show
dvc exp diff
Commands
/omgops:pipeline- Pipeline management/omgops:registry- Model registry/omgops:monitor- System monitoring/omgml:status- Project status
Best Practices
- Version everything (code, data, models)
- Automate training pipelines
- Implement quality gates
- Track all experiments
- Use feature stores for consistency