Claude Code Plugins

Community-maintained marketplace

Feedback

automl-pipeline-setup

@dengineproblem/agents-monorepo
0
0

Эксперт AutoML. Используй для automated machine learning, hyperparameter tuning и model selection.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name automl-pipeline-setup
description Эксперт AutoML. Используй для automated machine learning, hyperparameter tuning и model selection.

AutoML Pipeline Setup Expert

Эксперт по проектированию и реализации автоматизированных систем машинного обучения.

Архитектура пайплайна

Модульные компоненты

Data Ingestion → Validation → Feature Engineering → Model Training → Evaluation → Deployment

Конфигурация через YAML

pipeline:
  name: customer_churn_prediction
  version: "1.0"

data:
  source: "s3://bucket/data.parquet"
  validation:
    null_threshold: 0.1
    duplicate_check: true

features:
  numerical:
    - age
    - tenure
    - monthly_charges
  categorical:
    - contract_type
    - payment_method
  target: churn

automl:
  framework: h2o
  max_runtime_secs: 3600
  max_models: 20
  stopping_metric: AUC
  sort_metric: AUC

deployment:
  platform: mlflow
  model_registry: true

Data Validation с Great Expectations

import great_expectations as gx

def validate_data(df, expectation_suite_name="default"):
    context = gx.get_context()

    # Создание expectation suite
    suite = context.add_expectation_suite(expectation_suite_name)

    # Определение expectations
    validator = context.get_validator(
        batch_request=batch_request,
        expectation_suite_name=expectation_suite_name
    )

    # Проверки качества данных
    validator.expect_column_values_to_not_be_null("customer_id")
    validator.expect_column_values_to_be_between("age", min_value=18, max_value=100)
    validator.expect_column_values_to_be_in_set(
        "contract_type",
        ["month-to-month", "one_year", "two_year"]
    )

    # Валидация
    results = validator.validate()
    return results.success

Feature Engineering Pipeline

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from feature_engine.creation import CyclicalFeatures
from feature_engine.selection import DropCorrelatedFeatures

def create_feature_pipeline(numerical_cols, categorical_cols):
    numerical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])

    preprocessor = ColumnTransformer([
        ('num', numerical_transformer, numerical_cols),
        ('cat', categorical_transformer, categorical_cols)
    ])

    feature_pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('drop_correlated', DropCorrelatedFeatures(threshold=0.95))
    ])

    return feature_pipeline

H2O AutoML

import h2o
from h2o.automl import H2OAutoML

def train_automl_model(train_df, target_col, config):
    h2o.init()

    # Конвертация в H2O Frame
    h2o_train = h2o.H2OFrame(train_df)

    # Определение типов колонок
    h2o_train[target_col] = h2o_train[target_col].asfactor()

    # Предикторы
    predictors = [col for col in h2o_train.columns if col != target_col]

    # AutoML
    aml = H2OAutoML(
        max_runtime_secs=config.get('max_runtime_secs', 3600),
        max_models=config.get('max_models', 20),
        stopping_metric=config.get('stopping_metric', 'AUC'),
        sort_metric=config.get('sort_metric', 'AUC'),
        seed=42,
        exclude_algos=['DeepLearning'],  # Опционально исключить алгоритмы
        nfolds=5
    )

    aml.train(
        x=predictors,
        y=target_col,
        training_frame=h2o_train
    )

    # Лидерборд
    leaderboard = aml.leaderboard.as_data_frame()
    print(leaderboard)

    return aml.leader

MLflow Experiment Tracking

import mlflow
from mlflow.tracking import MlflowClient

class ExperimentTracker:
    def __init__(self, experiment_name):
        mlflow.set_experiment(experiment_name)
        self.client = MlflowClient()

    def log_automl_run(self, model, metrics, params, artifacts_path=None):
        with mlflow.start_run():
            # Логирование параметров
            for key, value in params.items():
                mlflow.log_param(key, value)

            # Логирование метрик
            for key, value in metrics.items():
                mlflow.log_metric(key, value)

            # Логирование модели
            mlflow.h2o.log_model(model, "model")

            # Логирование артефактов
            if artifacts_path:
                mlflow.log_artifacts(artifacts_path)

            run_id = mlflow.active_run().info.run_id
            return run_id

    def register_best_model(self, run_id, model_name):
        model_uri = f"runs:/{run_id}/model"
        mlflow.register_model(model_uri, model_name)

Optuna для Hyperparameter Tuning

import optuna
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier

def objective(trial, X, y):
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 500),
        'max_depth': trial.suggest_int('max_depth', 3, 20),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
        'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
    }

    model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
    scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')

    return scores.mean()

def run_optimization(X, y, n_trials=100):
    study = optuna.create_study(direction='maximize')
    study.optimize(lambda trial: objective(trial, X, y), n_trials=n_trials)

    print(f"Best trial: {study.best_trial.value}")
    print(f"Best params: {study.best_params}")

    return study.best_params

Airflow DAG для оркестрации

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'automl_pipeline',
    default_args=default_args,
    description='AutoML Training Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)

validate_data_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

feature_engineering_task = PythonOperator(
    task_id='feature_engineering',
    python_callable=run_feature_engineering,
    dag=dag
)

automl_training_task = PythonOperator(
    task_id='automl_training',
    python_callable=train_automl_model,
    dag=dag
)

model_validation_task = PythonOperator(
    task_id='model_validation',
    python_callable=validate_model,
    dag=dag
)

validate_data_task >> feature_engineering_task >> automl_training_task >> model_validation_task

Рекомендации по фреймворкам

Сценарий Рекомендация
Enterprise, табличные данные H2O.ai
Cloud-native Google Vertex AI, AWS SageMaker
Быстрое прототипирование AutoGluon, FLAML
Кастомизация MLflow + Optuna
Deep Learning AutoKeras, Neural Architecture Search

Лучшие практики

  1. Data sampling — для ускорения экспериментов
  2. Early stopping — прекращение неперспективных моделей
  3. Resource management — лимиты памяти и CPU
  4. Distributed training — Ray Tune, Dask для масштабирования
  5. Model versioning — отслеживание всех экспериментов
  6. Reproducibility — фиксация random seeds