Claude Code Plugins

Community-maintained marketplace

Feedback

Plan and manage picking waves for warehouse order fulfillment. Use this skill when organizing orders into waves, scheduling wave releases, balancing workload across waves, or coordinating wave-based picking operations.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name wave-planning
description Plan and manage picking waves for warehouse order fulfillment. Use this skill when organizing orders into waves, scheduling wave releases, balancing workload across waves, or coordinating wave-based picking operations.

Wave Planning

Plan and manage picking waves for efficient warehouse order fulfillment.

Installation

pip install numpy pandas datetime

Quick Start

from datetime import datetime, timedelta

# Orders to process
orders = [
    {'id': 'ORD001', 'priority': 'express', 'items': 5, 'cutoff': '14:00'},
    {'id': 'ORD002', 'priority': 'standard', 'items': 3, 'cutoff': '18:00'},
    {'id': 'ORD003', 'priority': 'express', 'items': 8, 'cutoff': '14:00'}
]

# Create waves
waves = create_waves(orders, wave_capacity=20, num_pickers=4)

Wave Configuration

class Wave:
    def __init__(self, wave_id, start_time, capacity):
        self.wave_id = wave_id
        self.start_time = start_time
        self.capacity = capacity
        self.orders = []
        self.total_items = 0
        self.status = 'planning'

    def can_add_order(self, order):
        """Check if order fits in wave capacity."""
        return self.total_items + order['items'] <= self.capacity

    def add_order(self, order):
        """Add order to wave."""
        if self.can_add_order(order):
            self.orders.append(order)
            self.total_items += order['items']
            return True
        return False

    def utilization(self):
        """Calculate wave capacity utilization."""
        return self.total_items / self.capacity if self.capacity > 0 else 0


class WavePlanner:
    def __init__(self, wave_duration_minutes, capacity_per_wave):
        self.wave_duration = timedelta(minutes=wave_duration_minutes)
        self.capacity = capacity_per_wave
        self.waves = []

    def create_wave(self, start_time):
        """Create a new wave."""
        wave_id = f"WAVE_{len(self.waves) + 1:03d}"
        wave = Wave(wave_id, start_time, self.capacity)
        self.waves.append(wave)
        return wave

    def get_current_wave(self, time):
        """Get the wave for a given time."""
        for wave in self.waves:
            wave_end = wave.start_time + self.wave_duration
            if wave.start_time <= time < wave_end:
                return wave
        return None

Wave Creation Strategies

Time-Based Waves

def create_time_based_waves(orders, wave_interval_minutes, capacity):
    """Create waves at fixed time intervals."""
    waves = []
    current_wave = None

    # Sort orders by cutoff time
    sorted_orders = sorted(orders, key=lambda o: o['cutoff'])

    for order in sorted_orders:
        order_time = datetime.strptime(order['cutoff'], '%H:%M')

        if current_wave is None or not current_wave.can_add_order(order):
            # Start new wave
            wave_start = order_time - timedelta(minutes=wave_interval_minutes)
            current_wave = Wave(f"WAVE_{len(waves)+1}", wave_start, capacity)
            waves.append(current_wave)

        current_wave.add_order(order)

    return waves

Priority-Based Waves

def create_priority_waves(orders, capacity):
    """Create separate waves for different priority levels."""
    priority_order = ['express', 'priority', 'standard']
    waves = []

    for priority in priority_order:
        priority_orders = [o for o in orders if o['priority'] == priority]

        if not priority_orders:
            continue

        current_wave = None
        for order in priority_orders:
            if current_wave is None or not current_wave.can_add_order(order):
                wave_id = f"WAVE_{priority.upper()}_{len(waves)+1}"
                current_wave = Wave(wave_id, datetime.now(), capacity)
                current_wave.priority = priority
                waves.append(current_wave)

            current_wave.add_order(order)

    return waves

Carrier-Based Waves

def create_carrier_waves(orders, carrier_cutoffs, capacity):
    """
    Create waves based on carrier pickup times.
    carrier_cutoffs: dict of {carrier: cutoff_time}
    """
    waves = []

    for carrier, cutoff in sorted(carrier_cutoffs.items(),
                                   key=lambda x: x[1]):
        carrier_orders = [o for o in orders if o.get('carrier') == carrier]

        if not carrier_orders:
            continue

        current_wave = None
        for order in carrier_orders:
            if current_wave is None or not current_wave.can_add_order(order):
                wave = Wave(f"WAVE_{carrier}", cutoff, capacity)
                wave.carrier = carrier
                waves.append(wave)
                current_wave = wave

            current_wave.add_order(order)

    return waves

Wave Balancing

def balance_waves(waves, target_utilization=0.85):
    """Redistribute orders to balance wave utilization."""
    # Sort waves by utilization
    sorted_waves = sorted(waves, key=lambda w: w.utilization())

    # Move orders from over-utilized to under-utilized waves
    for i, under_wave in enumerate(sorted_waves):
        if under_wave.utilization() >= target_utilization:
            break

        for over_wave in reversed(sorted_waves):
            if over_wave.utilization() <= target_utilization:
                break

            # Try to move orders
            for order in over_wave.orders[:]:
                if under_wave.can_add_order(order):
                    over_wave.orders.remove(order)
                    over_wave.total_items -= order['items']
                    under_wave.add_order(order)

                    if under_wave.utilization() >= target_utilization:
                        break

    return waves

def calculate_wave_balance_metrics(waves):
    """Calculate metrics for wave balance."""
    utilizations = [w.utilization() for w in waves]

    return {
        'num_waves': len(waves),
        'avg_utilization': np.mean(utilizations),
        'min_utilization': min(utilizations),
        'max_utilization': max(utilizations),
        'std_utilization': np.std(utilizations),
        'total_orders': sum(len(w.orders) for w in waves),
        'total_items': sum(w.total_items for w in waves)
    }

Wave Scheduling

class WaveScheduler:
    def __init__(self, num_pickers, items_per_picker_hour):
        self.num_pickers = num_pickers
        self.productivity = items_per_picker_hour

    def estimate_wave_duration(self, wave):
        """Estimate how long a wave will take to complete."""
        items_per_hour = self.num_pickers * self.productivity
        hours = wave.total_items / items_per_hour
        return timedelta(hours=hours)

    def schedule_waves(self, waves, start_time):
        """Schedule waves sequentially."""
        current_time = start_time
        schedule = []

        for wave in waves:
            duration = self.estimate_wave_duration(wave)

            schedule.append({
                'wave_id': wave.wave_id,
                'start_time': current_time,
                'end_time': current_time + duration,
                'duration_minutes': duration.total_seconds() / 60,
                'orders': len(wave.orders),
                'items': wave.total_items
            })

            current_time = current_time + duration

        return schedule

    def validate_cutoffs(self, schedule, waves):
        """Check if all orders will meet their cutoff times."""
        violations = []

        for i, wave in enumerate(waves):
            wave_end = schedule[i]['end_time']

            for order in wave.orders:
                cutoff = datetime.strptime(order['cutoff'], '%H:%M')
                cutoff = cutoff.replace(
                    year=wave_end.year,
                    month=wave_end.month,
                    day=wave_end.day
                )

                if wave_end > cutoff:
                    violations.append({
                        'order_id': order['id'],
                        'cutoff': order['cutoff'],
                        'wave_end': wave_end.strftime('%H:%M'),
                        'delay_minutes': (wave_end - cutoff).seconds / 60
                    })

        return violations

Wave Release Management

class WaveReleaseManager:
    def __init__(self):
        self.released_waves = []
        self.completed_waves = []

    def release_wave(self, wave):
        """Release wave for picking."""
        wave.status = 'released'
        wave.release_time = datetime.now()
        self.released_waves.append(wave)
        return wave

    def complete_wave(self, wave):
        """Mark wave as completed."""
        wave.status = 'completed'
        wave.completion_time = datetime.now()
        self.released_waves.remove(wave)
        self.completed_waves.append(wave)

    def get_wave_progress(self, wave, picked_items):
        """Calculate wave completion progress."""
        return {
            'wave_id': wave.wave_id,
            'total_items': wave.total_items,
            'picked_items': picked_items,
            'progress_pct': picked_items / wave.total_items * 100,
            'remaining_items': wave.total_items - picked_items
        }

Wave Performance Metrics

def analyze_wave_performance(wave, actual_duration, planned_duration):
    """Analyze performance of a completed wave."""
    variance = (actual_duration - planned_duration).total_seconds() / 60

    return {
        'wave_id': wave.wave_id,
        'planned_duration_min': planned_duration.total_seconds() / 60,
        'actual_duration_min': actual_duration.total_seconds() / 60,
        'variance_min': variance,
        'variance_pct': variance / (planned_duration.total_seconds() / 60) * 100,
        'items_picked': wave.total_items,
        'orders_completed': len(wave.orders),
        'items_per_hour': wave.total_items / (actual_duration.total_seconds() / 3600)
    }