Claude Code Plugins

Community-maintained marketplace

Feedback

fulfillment-optimization

@benchflow-ai/skillsbench
15
0

Optimize order fulfillment operations for speed and cost efficiency. Use this skill when optimizing pick-pack-ship workflows, reducing fulfillment time, managing carrier selection, or improving order processing throughput.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name fulfillment-optimization
description Optimize order fulfillment operations for speed and cost efficiency. Use this skill when optimizing pick-pack-ship workflows, reducing fulfillment time, managing carrier selection, or improving order processing throughput.

Fulfillment Optimization

Optimize order fulfillment operations for speed, cost, and service quality.

Installation

pip install numpy pandas pulp

Quick Start

import numpy as np
import pandas as pd

# Order pool to fulfill
orders = [
    {'id': 'ORD001', 'items': 3, 'priority': 'express', 'destination': 'NYC'},
    {'id': 'ORD002', 'items': 1, 'priority': 'standard', 'destination': 'LA'},
    {'id': 'ORD003', 'items': 5, 'priority': 'express', 'destination': 'CHI'}
]

# Available resources
pickers = 5
pack_stations = 3
shipping_cutoff = '16:00'

Fulfillment Workflow Optimization

Order Sequencing

def optimize_order_sequence(orders, constraints):
    """Optimize the sequence of orders to process."""
    # Score each order based on priority and constraints
    scored_orders = []

    for order in orders:
        score = 0

        # Priority scoring
        priority_scores = {'express': 100, 'priority': 50, 'standard': 10}
        score += priority_scores.get(order['priority'], 0)

        # Cutoff urgency
        if order.get('cutoff_time'):
            time_to_cutoff = calculate_time_remaining(order['cutoff_time'])
            if time_to_cutoff < 2:  # Less than 2 hours
                score += 200
            elif time_to_cutoff < 4:
                score += 100

        # Single-item orders (faster to process)
        if order['items'] == 1:
            score += 20

        # VIP customers
        if order.get('customer_tier') == 'VIP':
            score += 50

        scored_orders.append((score, order))

    # Sort by score (highest first)
    scored_orders.sort(reverse=True, key=lambda x: x[0])

    return [order for _, order in scored_orders]

def batch_for_efficiency(orders, batch_size=20):
    """Group orders into efficient batches."""
    batches = []
    current_batch = []

    for order in orders:
        current_batch.append(order)

        if len(current_batch) >= batch_size:
            batches.append(current_batch)
            current_batch = []

    if current_batch:
        batches.append(current_batch)

    return batches

Resource Allocation

from pulp import LpProblem, LpMaximize, LpVariable, lpSum

def allocate_fulfillment_resources(orders, pickers, pack_stations, time_horizon):
    """Allocate pickers and pack stations to orders."""
    prob = LpProblem("ResourceAllocation", LpMaximize)

    # Decision variables
    # x[o,p] = 1 if order o assigned to picker p
    x = {(o['id'], p): LpVariable(f"pick_{o['id']}_{p}", cat='Binary')
         for o in orders for p in range(pickers)}

    # y[o,s] = 1 if order o assigned to pack station s
    y = {(o['id'], s): LpVariable(f"pack_{o['id']}_{s}", cat='Binary')
         for o in orders for s in range(pack_stations)}

    # Objective: maximize orders fulfilled (weighted by priority)
    priority_weights = {'express': 3, 'priority': 2, 'standard': 1}
    prob += lpSum([
        priority_weights.get(o['priority'], 1) *
        lpSum([x[o['id'], p] for p in range(pickers)])
        for o in orders
    ])

    # Each order assigned to at most one picker
    for o in orders:
        prob += lpSum([x[o['id'], p] for p in range(pickers)]) <= 1

    # Each order assigned to at most one pack station
    for o in orders:
        prob += lpSum([y[o['id'], s] for s in range(pack_stations)]) <= 1

    # Picker capacity (time-based)
    pick_time_per_item = 30  # seconds
    for p in range(pickers):
        prob += lpSum([
            o['items'] * pick_time_per_item * x[o['id'], p]
            for o in orders
        ]) <= time_horizon * 3600

    prob.solve()

    assignments = {
        'picking': {},
        'packing': {}
    }

    for o in orders:
        for p in range(pickers):
            if x[o['id'], p].value() > 0.5:
                assignments['picking'][o['id']] = p

        for s in range(pack_stations):
            if y[o['id'], s].value() > 0.5:
                assignments['packing'][o['id']] = s

    return assignments

Carrier Selection

def optimize_carrier_selection(shipments, carriers, rates, transit_times):
    """Select optimal carrier for each shipment."""
    selections = []

    for shipment in shipments:
        origin = shipment['origin']
        dest = shipment['destination']
        weight = shipment['weight']
        deadline = shipment.get('delivery_deadline')

        candidates = []
        for carrier in carriers:
            rate = rates.get((carrier, origin, dest), {})
            transit = transit_times.get((carrier, origin, dest))

            if transit is None:
                continue

            # Check if carrier can meet deadline
            if deadline:
                arrival = calculate_arrival(transit)
                if arrival > deadline:
                    continue

            # Calculate cost
            cost = calculate_shipping_cost(weight, rate)

            candidates.append({
                'carrier': carrier,
                'cost': cost,
                'transit_days': transit,
                'service_level': rate.get('service_level', 'standard')
            })

        if candidates:
            # Select cheapest option that meets requirements
            best = min(candidates, key=lambda x: x['cost'])
            selections.append({
                'shipment_id': shipment['id'],
                'selected_carrier': best['carrier'],
                'cost': best['cost'],
                'transit_days': best['transit_days']
            })
        else:
            selections.append({
                'shipment_id': shipment['id'],
                'status': 'no_valid_carrier'
            })

    return selections

def calculate_shipping_cost(weight, rate):
    """Calculate shipping cost based on weight and rate structure."""
    base = rate.get('base_rate', 0)
    per_lb = rate.get('per_lb', 0)
    fuel_surcharge = rate.get('fuel_surcharge_pct', 0)

    subtotal = base + (weight * per_lb)
    total = subtotal * (1 + fuel_surcharge / 100)

    return round(total, 2)

Throughput Optimization

class FulfillmentThroughputOptimizer:
    def __init__(self, capacity_params):
        self.pick_rate = capacity_params['picks_per_hour']
        self.pack_rate = capacity_params['packs_per_hour']
        self.ship_rate = capacity_params['ships_per_hour']

    def calculate_throughput(self, orders, resources):
        """Calculate expected throughput given resources."""
        total_picks = sum(o['items'] for o in orders)
        total_orders = len(orders)

        pick_capacity = resources['pickers'] * self.pick_rate
        pack_capacity = resources['pack_stations'] * self.pack_rate
        ship_capacity = resources['ship_lanes'] * self.ship_rate

        # Bottleneck determines actual throughput
        pick_time = total_picks / pick_capacity if pick_capacity > 0 else float('inf')
        pack_time = total_orders / pack_capacity if pack_capacity > 0 else float('inf')
        ship_time = total_orders / ship_capacity if ship_capacity > 0 else float('inf')

        bottleneck_time = max(pick_time, pack_time, ship_time)

        return {
            'orders_per_hour': total_orders / bottleneck_time if bottleneck_time > 0 else 0,
            'bottleneck': 'picking' if pick_time == bottleneck_time else (
                'packing' if pack_time == bottleneck_time else 'shipping'
            ),
            'utilization': {
                'picking': pick_time / bottleneck_time * 100,
                'packing': pack_time / bottleneck_time * 100,
                'shipping': ship_time / bottleneck_time * 100
            }
        }

    def optimize_resource_mix(self, orders, total_resources, cost_per_resource):
        """Find optimal resource allocation."""
        best_throughput = 0
        best_allocation = None

        # Try different allocations
        for pickers in range(1, total_resources):
            for packers in range(1, total_resources - pickers):
                shippers = total_resources - pickers - packers
                if shippers < 1:
                    continue

                resources = {
                    'pickers': pickers,
                    'pack_stations': packers,
                    'ship_lanes': shippers
                }

                result = self.calculate_throughput(orders, resources)
                if result['orders_per_hour'] > best_throughput:
                    best_throughput = result['orders_per_hour']
                    best_allocation = resources

        return best_allocation, best_throughput

Fulfillment Metrics

def calculate_fulfillment_metrics(completed_orders, time_period_hours):
    """Calculate key fulfillment performance metrics."""
    if not completed_orders:
        return {}

    df = pd.DataFrame(completed_orders)

    metrics = {
        'total_orders': len(df),
        'total_units': df['items'].sum(),
        'orders_per_hour': len(df) / time_period_hours,
        'units_per_hour': df['items'].sum() / time_period_hours,

        # Time metrics
        'avg_fulfillment_time': df['fulfillment_time_min'].mean(),
        'median_fulfillment_time': df['fulfillment_time_min'].median(),

        # Accuracy
        'accuracy_rate': df['accurate'].mean() * 100 if 'accurate' in df else None,

        # On-time performance
        'on_time_rate': df['on_time'].mean() * 100 if 'on_time' in df else None,

        # By priority
        'by_priority': df.groupby('priority').agg({
            'id': 'count',
            'fulfillment_time_min': 'mean'
        }).to_dict()
    }

    return metrics

def track_order_lifecycle(order_events):
    """Track order through fulfillment lifecycle."""
    stages = ['received', 'allocated', 'picking', 'picked', 'packing', 'packed', 'shipped']

    lifecycle = {}
    for event in order_events:
        order_id = event['order_id']
        if order_id not in lifecycle:
            lifecycle[order_id] = {}

        lifecycle[order_id][event['stage']] = event['timestamp']

    # Calculate stage durations
    for order_id, timestamps in lifecycle.items():
        durations = {}
        for i in range(len(stages) - 1):
            if stages[i] in timestamps and stages[i+1] in timestamps:
                duration = (timestamps[stages[i+1]] - timestamps[stages[i]]).total_seconds()
                durations[f"{stages[i]}_to_{stages[i+1]}"] = duration

        lifecycle[order_id]['durations'] = durations

    return lifecycle