| name | model-optimization |
| description | Model optimization techniques including hyperparameter tuning, architecture search, training optimization, and performance profiling for ML systems. |
Model Optimization
Techniques for optimizing ML model performance.
Hyperparameter Optimization
Optuna Integration
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
def objective(trial):
# Suggest hyperparameters
params = {
'learning_rate': trial.suggest_float('lr', 1e-5, 1e-1, log=True),
'batch_size': trial.suggest_categorical('batch_size', [16, 32, 64, 128]),
'num_layers': trial.suggest_int('num_layers', 2, 8),
'hidden_dim': trial.suggest_int('hidden_dim', 64, 512, step=64),
'dropout': trial.suggest_float('dropout', 0.1, 0.5),
'optimizer': trial.suggest_categorical('optimizer', ['adam', 'sgd', 'adamw']),
'weight_decay': trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True),
}
model = build_model(params)
for epoch in range(max_epochs):
train_loss = train_epoch(model, train_loader)
val_loss = validate(model, val_loader)
# Report for pruning
trial.report(val_loss, epoch)
if trial.should_prune():
raise optuna.TrialPruned()
return val_loss
# Create study with pruning
study = optuna.create_study(
direction='minimize',
sampler=TPESampler(seed=42),
pruner=MedianPruner(n_startup_trials=5, n_warmup_steps=10)
)
study.optimize(objective, n_trials=100, n_jobs=4)
print(f"Best params: {study.best_params}")
print(f"Best value: {study.best_value:.4f}")
Hyperparameter Search Strategies
# Grid Search
from sklearn.model_selection import GridSearchCV
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [3, 5, 7, 10],
'learning_rate': [0.01, 0.1, 0.3]
}
grid = GridSearchCV(model, param_grid, cv=5, scoring='f1_macro', n_jobs=-1)
grid.fit(X_train, y_train)
# Random Search
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
param_distributions = {
'n_estimators': randint(50, 500),
'max_depth': randint(2, 15),
'learning_rate': uniform(0.001, 0.5)
}
random_search = RandomizedSearchCV(
model, param_distributions, n_iter=100, cv=5, random_state=42
)
# Bayesian Optimization with scikit-optimize
from skopt import BayesSearchCV
from skopt.space import Real, Integer, Categorical
search_spaces = {
'learning_rate': Real(1e-5, 1e-1, prior='log-uniform'),
'num_layers': Integer(2, 10),
'activation': Categorical(['relu', 'gelu', 'swish'])
}
bayes_search = BayesSearchCV(
model, search_spaces, n_iter=50, cv=5, random_state=42
)
Training Optimization
Learning Rate Scheduling
import torch.optim.lr_scheduler as lr_scheduler
# Warmup + Cosine Annealing
class WarmupCosineScheduler:
def __init__(self, optimizer, warmup_steps, total_steps):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.current_step = 0
self.base_lr = optimizer.param_groups[0]['lr']
def step(self):
self.current_step += 1
if self.current_step < self.warmup_steps:
lr = self.base_lr * self.current_step / self.warmup_steps
else:
progress = (self.current_step - self.warmup_steps) / (self.total_steps - self.warmup_steps)
lr = self.base_lr * 0.5 * (1 + math.cos(math.pi * progress))
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr
# One Cycle Policy
scheduler = lr_scheduler.OneCycleLR(
optimizer,
max_lr=0.01,
total_steps=total_steps,
pct_start=0.3,
anneal_strategy='cos'
)
# Reduce on Plateau
scheduler = lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)
Mixed Precision Training
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
for epoch in range(epochs):
for batch in train_loader:
optimizer.zero_grad()
with autocast():
outputs = model(inputs)
loss = criterion(outputs, targets)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
Gradient Accumulation
accumulation_steps = 4
optimizer.zero_grad()
for i, batch in enumerate(train_loader):
outputs = model(batch)
loss = criterion(outputs, targets) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
Architecture Optimization
Neural Architecture Search
import nni
from nni.nas.pytorch import mutables
class SearchableBlock(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.op_choice = mutables.LayerChoice([
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.Conv2d(in_channels, out_channels, 5, padding=2),
DepthSeparableConv(in_channels, out_channels),
nn.Identity() if in_channels == out_channels else nn.Conv2d(in_channels, out_channels, 1)
])
def forward(self, x):
return self.op_choice(x)
# AutoML with Ray Tune
from ray import tune
from ray.tune.schedulers import ASHAScheduler
def train_model(config):
model = build_model(config)
for epoch in range(config['epochs']):
train_loss = train(model)
val_acc = validate(model)
tune.report(loss=train_loss, accuracy=val_acc)
analysis = tune.run(
train_model,
config={
"lr": tune.loguniform(1e-5, 1e-1),
"layers": tune.choice([2, 4, 6, 8]),
"hidden": tune.choice([128, 256, 512]),
},
scheduler=ASHAScheduler(metric="accuracy", mode="max"),
num_samples=100
)
Memory Optimization
Gradient Checkpointing
from torch.utils.checkpoint import checkpoint
class CheckpointedModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
TransformerBlock(d_model=512) for _ in range(24)
])
def forward(self, x):
for layer in self.layers:
x = checkpoint(layer, x, use_reentrant=False)
return x
Memory-Efficient Attention
# Flash Attention (via xformers or native)
from xformers.ops import memory_efficient_attention
class EfficientAttention(nn.Module):
def forward(self, q, k, v, mask=None):
return memory_efficient_attention(q, k, v, attn_bias=mask)
# Sliding Window Attention
class SlidingWindowAttention(nn.Module):
def __init__(self, window_size=256):
super().__init__()
self.window_size = window_size
def forward(self, q, k, v):
seq_len = q.size(1)
outputs = []
for i in range(0, seq_len, self.window_size):
start = max(0, i - self.window_size // 2)
end = min(seq_len, i + self.window_size)
q_chunk = q[:, i:min(i + self.window_size, seq_len)]
k_chunk = k[:, start:end]
v_chunk = v[:, start:end]
attn = torch.matmul(q_chunk, k_chunk.transpose(-2, -1))
attn = F.softmax(attn / math.sqrt(q.size(-1)), dim=-1)
outputs.append(torch.matmul(attn, v_chunk))
return torch.cat(outputs, dim=1)
Performance Profiling
import torch.profiler as profiler
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(wait=1, warmup=1, active=3, repeat=1),
on_trace_ready=profiler.tensorboard_trace_handler('./logs'),
record_shapes=True,
profile_memory=True,
with_stack=True
) as prof:
for step, batch in enumerate(train_loader):
if step >= 5:
break
train_step(model, batch)
prof.step()
# Print summary
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))
Distributed Training
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train_ddp(rank, world_size):
setup(rank, world_size)
model = Model().to(rank)
model = DDP(model, device_ids=[rank])
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
loader = DataLoader(dataset, sampler=sampler, batch_size=32)
for epoch in range(epochs):
sampler.set_epoch(epoch)
for batch in loader:
train_step(model, batch)
dist.destroy_process_group()
# Launch with torchrun
# torchrun --nproc_per_node=4 train.py
Commands
/omgtrain:tune- Hyperparameter tuning/omgoptim:profile- Profile model/omgtrain:train- Train with optimizations
Best Practices
- Profile before optimizing
- Use mixed precision by default
- Start with proven architectures
- Tune learning rate first
- Use distributed training for scale