MLOps
Production machine learning systems with MLflow, model versioning, and deployment pipelines.
Quick Start
import mlflow
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
# Configure MLflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer-churn-prediction")
# Training with experiment tracking
with mlflow.start_run(run_name="rf-baseline"):
# Log parameters
params = {"n_estimators": 100, "max_depth": 10, "random_state": 42}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate and log metrics
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted")
}
mlflow.log_metrics(metrics)
# Log model to registry
mlflow.sklearn.log_model(
model, "model",
registered_model_name="churn-classifier",
signature=mlflow.models.infer_signature(X_train, y_pred)
)
print(f"Run ID: {mlflow.active_run().info.run_id}")
Core Concepts
1. Model Registry & Versioning
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Promote model to production
client.transition_model_version_stage(
name="churn-classifier",
version=3,
stage="Production"
)
# Archive old version
client.transition_model_version_stage(
name="churn-classifier",
version=2,
stage="Archived"
)
# Load production model
model_uri = "models:/churn-classifier/Production"
model = mlflow.sklearn.load_model(model_uri)
# Model comparison
def compare_model_versions(model_name: str, versions: list[int]) -> dict:
results = {}
for version in versions:
run_id = client.get_model_version(model_name, str(version)).run_id
run = client.get_run(run_id)
results[version] = run.data.metrics
return results
2. Feature Store Pattern
from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedelta
# Define feature store
store = FeatureStore(repo_path="feature_repo/")
# Get training features
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"customer_features:total_purchases",
"customer_features:days_since_last_order",
"customer_features:avg_order_value"
]
).to_df()
# Get online features for inference
feature_vector = store.get_online_features(
features=[
"customer_features:total_purchases",
"customer_features:days_since_last_order"
],
entity_rows=[{"customer_id": "12345"}]
).to_dict()
3. Model Serving with FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np
app = FastAPI()
# Load model at startup
model = mlflow.sklearn.load_model("models:/churn-classifier/Production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: int
probability: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
X = np.array(request.features).reshape(1, -1)
prediction = model.predict(X)[0]
probability = model.predict_proba(X)[0].max()
return PredictionResponse(
prediction=int(prediction),
probability=float(probability),
model_version="v3"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy", "model_loaded": model is not None}
4. CI/CD for ML
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
paths:
- 'src/**'
- 'data/**'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: pytest tests/
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
run: python src/train.py
- name: Evaluate model
run: python src/evaluate.py --threshold 0.85
- name: Register model
if: success()
run: python src/register_model.py
deploy:
needs: train-and-evaluate
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to production
run: |
kubectl set image deployment/model-server \
model-server=gcr.io/$PROJECT/model:${{ github.sha }}
Tools & Technologies
| Tool |
Purpose |
Version (2025) |
| MLflow |
Experiment tracking |
2.10+ |
| Feast |
Feature store |
0.36+ |
| BentoML |
Model serving |
1.2+ |
| Seldon |
K8s model serving |
1.17+ |
| DVC |
Data versioning |
3.40+ |
| Weights & Biases |
Experiment tracking |
Latest |
| Evidently |
Model monitoring |
0.4+ |
Troubleshooting Guide
| Issue |
Symptoms |
Root Cause |
Fix |
| Model Drift |
Accuracy drops |
Data distribution change |
Monitor, retrain |
| Slow Inference |
High latency |
Large model, no optimization |
Quantize, distill |
| Version Mismatch |
Prediction errors |
Wrong model version |
Pin versions |
| Feature Skew |
Train/serve mismatch |
Different preprocessing |
Use feature store |
Best Practices
# ✅ DO: Version everything
mlflow.log_artifact("data/train.csv")
mlflow.log_params({"data_version": "v2.3"})
# ✅ DO: Test model before deployment
def test_model_performance(model, threshold=0.85):
score = evaluate_model(model)
assert score >= threshold, f"Model score {score} below threshold"
# ✅ DO: Monitor in production
# ✅ DO: A/B test new models
# ❌ DON'T: Deploy without validation
# ❌ DON'T: Skip rollback strategy
Resources
Skill Certification Checklist: