| name | wave-planning |
| description | Plan and manage picking waves for warehouse order fulfillment. Use this skill when organizing orders into waves, scheduling wave releases, balancing workload across waves, or coordinating wave-based picking operations. |
Wave Planning
Plan and manage picking waves for efficient warehouse order fulfillment.
Installation
pip install numpy pandas datetime
Quick Start
from datetime import datetime, timedelta
# Orders to process
orders = [
{'id': 'ORD001', 'priority': 'express', 'items': 5, 'cutoff': '14:00'},
{'id': 'ORD002', 'priority': 'standard', 'items': 3, 'cutoff': '18:00'},
{'id': 'ORD003', 'priority': 'express', 'items': 8, 'cutoff': '14:00'}
]
# Create waves
waves = create_waves(orders, wave_capacity=20, num_pickers=4)
Wave Configuration
class Wave:
def __init__(self, wave_id, start_time, capacity):
self.wave_id = wave_id
self.start_time = start_time
self.capacity = capacity
self.orders = []
self.total_items = 0
self.status = 'planning'
def can_add_order(self, order):
"""Check if order fits in wave capacity."""
return self.total_items + order['items'] <= self.capacity
def add_order(self, order):
"""Add order to wave."""
if self.can_add_order(order):
self.orders.append(order)
self.total_items += order['items']
return True
return False
def utilization(self):
"""Calculate wave capacity utilization."""
return self.total_items / self.capacity if self.capacity > 0 else 0
class WavePlanner:
def __init__(self, wave_duration_minutes, capacity_per_wave):
self.wave_duration = timedelta(minutes=wave_duration_minutes)
self.capacity = capacity_per_wave
self.waves = []
def create_wave(self, start_time):
"""Create a new wave."""
wave_id = f"WAVE_{len(self.waves) + 1:03d}"
wave = Wave(wave_id, start_time, self.capacity)
self.waves.append(wave)
return wave
def get_current_wave(self, time):
"""Get the wave for a given time."""
for wave in self.waves:
wave_end = wave.start_time + self.wave_duration
if wave.start_time <= time < wave_end:
return wave
return None
Wave Creation Strategies
Time-Based Waves
def create_time_based_waves(orders, wave_interval_minutes, capacity):
"""Create waves at fixed time intervals."""
waves = []
current_wave = None
# Sort orders by cutoff time
sorted_orders = sorted(orders, key=lambda o: o['cutoff'])
for order in sorted_orders:
order_time = datetime.strptime(order['cutoff'], '%H:%M')
if current_wave is None or not current_wave.can_add_order(order):
# Start new wave
wave_start = order_time - timedelta(minutes=wave_interval_minutes)
current_wave = Wave(f"WAVE_{len(waves)+1}", wave_start, capacity)
waves.append(current_wave)
current_wave.add_order(order)
return waves
Priority-Based Waves
def create_priority_waves(orders, capacity):
"""Create separate waves for different priority levels."""
priority_order = ['express', 'priority', 'standard']
waves = []
for priority in priority_order:
priority_orders = [o for o in orders if o['priority'] == priority]
if not priority_orders:
continue
current_wave = None
for order in priority_orders:
if current_wave is None or not current_wave.can_add_order(order):
wave_id = f"WAVE_{priority.upper()}_{len(waves)+1}"
current_wave = Wave(wave_id, datetime.now(), capacity)
current_wave.priority = priority
waves.append(current_wave)
current_wave.add_order(order)
return waves
Carrier-Based Waves
def create_carrier_waves(orders, carrier_cutoffs, capacity):
"""
Create waves based on carrier pickup times.
carrier_cutoffs: dict of {carrier: cutoff_time}
"""
waves = []
for carrier, cutoff in sorted(carrier_cutoffs.items(),
key=lambda x: x[1]):
carrier_orders = [o for o in orders if o.get('carrier') == carrier]
if not carrier_orders:
continue
current_wave = None
for order in carrier_orders:
if current_wave is None or not current_wave.can_add_order(order):
wave = Wave(f"WAVE_{carrier}", cutoff, capacity)
wave.carrier = carrier
waves.append(wave)
current_wave = wave
current_wave.add_order(order)
return waves
Wave Balancing
def balance_waves(waves, target_utilization=0.85):
"""Redistribute orders to balance wave utilization."""
# Sort waves by utilization
sorted_waves = sorted(waves, key=lambda w: w.utilization())
# Move orders from over-utilized to under-utilized waves
for i, under_wave in enumerate(sorted_waves):
if under_wave.utilization() >= target_utilization:
break
for over_wave in reversed(sorted_waves):
if over_wave.utilization() <= target_utilization:
break
# Try to move orders
for order in over_wave.orders[:]:
if under_wave.can_add_order(order):
over_wave.orders.remove(order)
over_wave.total_items -= order['items']
under_wave.add_order(order)
if under_wave.utilization() >= target_utilization:
break
return waves
def calculate_wave_balance_metrics(waves):
"""Calculate metrics for wave balance."""
utilizations = [w.utilization() for w in waves]
return {
'num_waves': len(waves),
'avg_utilization': np.mean(utilizations),
'min_utilization': min(utilizations),
'max_utilization': max(utilizations),
'std_utilization': np.std(utilizations),
'total_orders': sum(len(w.orders) for w in waves),
'total_items': sum(w.total_items for w in waves)
}
Wave Scheduling
class WaveScheduler:
def __init__(self, num_pickers, items_per_picker_hour):
self.num_pickers = num_pickers
self.productivity = items_per_picker_hour
def estimate_wave_duration(self, wave):
"""Estimate how long a wave will take to complete."""
items_per_hour = self.num_pickers * self.productivity
hours = wave.total_items / items_per_hour
return timedelta(hours=hours)
def schedule_waves(self, waves, start_time):
"""Schedule waves sequentially."""
current_time = start_time
schedule = []
for wave in waves:
duration = self.estimate_wave_duration(wave)
schedule.append({
'wave_id': wave.wave_id,
'start_time': current_time,
'end_time': current_time + duration,
'duration_minutes': duration.total_seconds() / 60,
'orders': len(wave.orders),
'items': wave.total_items
})
current_time = current_time + duration
return schedule
def validate_cutoffs(self, schedule, waves):
"""Check if all orders will meet their cutoff times."""
violations = []
for i, wave in enumerate(waves):
wave_end = schedule[i]['end_time']
for order in wave.orders:
cutoff = datetime.strptime(order['cutoff'], '%H:%M')
cutoff = cutoff.replace(
year=wave_end.year,
month=wave_end.month,
day=wave_end.day
)
if wave_end > cutoff:
violations.append({
'order_id': order['id'],
'cutoff': order['cutoff'],
'wave_end': wave_end.strftime('%H:%M'),
'delay_minutes': (wave_end - cutoff).seconds / 60
})
return violations
Wave Release Management
class WaveReleaseManager:
def __init__(self):
self.released_waves = []
self.completed_waves = []
def release_wave(self, wave):
"""Release wave for picking."""
wave.status = 'released'
wave.release_time = datetime.now()
self.released_waves.append(wave)
return wave
def complete_wave(self, wave):
"""Mark wave as completed."""
wave.status = 'completed'
wave.completion_time = datetime.now()
self.released_waves.remove(wave)
self.completed_waves.append(wave)
def get_wave_progress(self, wave, picked_items):
"""Calculate wave completion progress."""
return {
'wave_id': wave.wave_id,
'total_items': wave.total_items,
'picked_items': picked_items,
'progress_pct': picked_items / wave.total_items * 100,
'remaining_items': wave.total_items - picked_items
}
Wave Performance Metrics
def analyze_wave_performance(wave, actual_duration, planned_duration):
"""Analyze performance of a completed wave."""
variance = (actual_duration - planned_duration).total_seconds() / 60
return {
'wave_id': wave.wave_id,
'planned_duration_min': planned_duration.total_seconds() / 60,
'actual_duration_min': actual_duration.total_seconds() / 60,
'variance_min': variance,
'variance_pct': variance / (planned_duration.total_seconds() / 60) * 100,
'items_picked': wave.total_items,
'orders_completed': len(wave.orders),
'items_per_hour': wave.total_items / (actual_duration.total_seconds() / 3600)
}