AutoML Pipeline Setup Expert
Эксперт по проектированию и реализации автоматизированных систем машинного обучения.
Архитектура пайплайна
Модульные компоненты
Data Ingestion → Validation → Feature Engineering → Model Training → Evaluation → Deployment
Конфигурация через YAML
pipeline:
name: customer_churn_prediction
version: "1.0"
data:
source: "s3://bucket/data.parquet"
validation:
null_threshold: 0.1
duplicate_check: true
features:
numerical:
- age
- tenure
- monthly_charges
categorical:
- contract_type
- payment_method
target: churn
automl:
framework: h2o
max_runtime_secs: 3600
max_models: 20
stopping_metric: AUC
sort_metric: AUC
deployment:
platform: mlflow
model_registry: true
Data Validation с Great Expectations
import great_expectations as gx
def validate_data(df, expectation_suite_name="default"):
context = gx.get_context()
# Создание expectation suite
suite = context.add_expectation_suite(expectation_suite_name)
# Определение expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# Проверки качества данных
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=18, max_value=100)
validator.expect_column_values_to_be_in_set(
"contract_type",
["month-to-month", "one_year", "two_year"]
)
# Валидация
results = validator.validate()
return results.success
Feature Engineering Pipeline
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from feature_engine.creation import CyclicalFeatures
from feature_engine.selection import DropCorrelatedFeatures
def create_feature_pipeline(numerical_cols, categorical_cols):
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', numerical_transformer, numerical_cols),
('cat', categorical_transformer, categorical_cols)
])
feature_pipeline = Pipeline([
('preprocessor', preprocessor),
('drop_correlated', DropCorrelatedFeatures(threshold=0.95))
])
return feature_pipeline
H2O AutoML
import h2o
from h2o.automl import H2OAutoML
def train_automl_model(train_df, target_col, config):
h2o.init()
# Конвертация в H2O Frame
h2o_train = h2o.H2OFrame(train_df)
# Определение типов колонок
h2o_train[target_col] = h2o_train[target_col].asfactor()
# Предикторы
predictors = [col for col in h2o_train.columns if col != target_col]
# AutoML
aml = H2OAutoML(
max_runtime_secs=config.get('max_runtime_secs', 3600),
max_models=config.get('max_models', 20),
stopping_metric=config.get('stopping_metric', 'AUC'),
sort_metric=config.get('sort_metric', 'AUC'),
seed=42,
exclude_algos=['DeepLearning'], # Опционально исключить алгоритмы
nfolds=5
)
aml.train(
x=predictors,
y=target_col,
training_frame=h2o_train
)
# Лидерборд
leaderboard = aml.leaderboard.as_data_frame()
print(leaderboard)
return aml.leader
MLflow Experiment Tracking
import mlflow
from mlflow.tracking import MlflowClient
class ExperimentTracker:
def __init__(self, experiment_name):
mlflow.set_experiment(experiment_name)
self.client = MlflowClient()
def log_automl_run(self, model, metrics, params, artifacts_path=None):
with mlflow.start_run():
# Логирование параметров
for key, value in params.items():
mlflow.log_param(key, value)
# Логирование метрик
for key, value in metrics.items():
mlflow.log_metric(key, value)
# Логирование модели
mlflow.h2o.log_model(model, "model")
# Логирование артефактов
if artifacts_path:
mlflow.log_artifacts(artifacts_path)
run_id = mlflow.active_run().info.run_id
return run_id
def register_best_model(self, run_id, model_name):
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, model_name)
Optuna для Hyperparameter Tuning
import optuna
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
def objective(trial, X, y):
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 500),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
}
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
return scores.mean()
def run_optimization(X, y, n_trials=100):
study = optuna.create_study(direction='maximize')
study.optimize(lambda trial: objective(trial, X, y), n_trials=n_trials)
print(f"Best trial: {study.best_trial.value}")
print(f"Best params: {study.best_params}")
return study.best_params
Airflow DAG для оркестрации
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'automl_pipeline',
default_args=default_args,
description='AutoML Training Pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
validate_data_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
feature_engineering_task = PythonOperator(
task_id='feature_engineering',
python_callable=run_feature_engineering,
dag=dag
)
automl_training_task = PythonOperator(
task_id='automl_training',
python_callable=train_automl_model,
dag=dag
)
model_validation_task = PythonOperator(
task_id='model_validation',
python_callable=validate_model,
dag=dag
)
validate_data_task >> feature_engineering_task >> automl_training_task >> model_validation_task
Рекомендации по фреймворкам
| Сценарий |
Рекомендация |
| Enterprise, табличные данные |
H2O.ai |
| Cloud-native |
Google Vertex AI, AWS SageMaker |
| Быстрое прототипирование |
AutoGluon, FLAML |
| Кастомизация |
MLflow + Optuna |
| Deep Learning |
AutoKeras, Neural Architecture Search |
Лучшие практики
- Data sampling — для ускорения экспериментов
- Early stopping — прекращение неперспективных моделей
- Resource management — лимиты памяти и CPU
- Distributed training — Ray Tune, Dask для масштабирования
- Model versioning — отслеживание всех экспериментов
- Reproducibility — фиксация random seeds