| name | fulfillment-optimization |
| description | Optimize order fulfillment operations for speed and cost efficiency. Use this skill when optimizing pick-pack-ship workflows, reducing fulfillment time, managing carrier selection, or improving order processing throughput. |
Fulfillment Optimization
Optimize order fulfillment operations for speed, cost, and service quality.
Installation
pip install numpy pandas pulp
Quick Start
import numpy as np
import pandas as pd
# Order pool to fulfill
orders = [
{'id': 'ORD001', 'items': 3, 'priority': 'express', 'destination': 'NYC'},
{'id': 'ORD002', 'items': 1, 'priority': 'standard', 'destination': 'LA'},
{'id': 'ORD003', 'items': 5, 'priority': 'express', 'destination': 'CHI'}
]
# Available resources
pickers = 5
pack_stations = 3
shipping_cutoff = '16:00'
Fulfillment Workflow Optimization
Order Sequencing
def optimize_order_sequence(orders, constraints):
"""Optimize the sequence of orders to process."""
# Score each order based on priority and constraints
scored_orders = []
for order in orders:
score = 0
# Priority scoring
priority_scores = {'express': 100, 'priority': 50, 'standard': 10}
score += priority_scores.get(order['priority'], 0)
# Cutoff urgency
if order.get('cutoff_time'):
time_to_cutoff = calculate_time_remaining(order['cutoff_time'])
if time_to_cutoff < 2: # Less than 2 hours
score += 200
elif time_to_cutoff < 4:
score += 100
# Single-item orders (faster to process)
if order['items'] == 1:
score += 20
# VIP customers
if order.get('customer_tier') == 'VIP':
score += 50
scored_orders.append((score, order))
# Sort by score (highest first)
scored_orders.sort(reverse=True, key=lambda x: x[0])
return [order for _, order in scored_orders]
def batch_for_efficiency(orders, batch_size=20):
"""Group orders into efficient batches."""
batches = []
current_batch = []
for order in orders:
current_batch.append(order)
if len(current_batch) >= batch_size:
batches.append(current_batch)
current_batch = []
if current_batch:
batches.append(current_batch)
return batches
Resource Allocation
from pulp import LpProblem, LpMaximize, LpVariable, lpSum
def allocate_fulfillment_resources(orders, pickers, pack_stations, time_horizon):
"""Allocate pickers and pack stations to orders."""
prob = LpProblem("ResourceAllocation", LpMaximize)
# Decision variables
# x[o,p] = 1 if order o assigned to picker p
x = {(o['id'], p): LpVariable(f"pick_{o['id']}_{p}", cat='Binary')
for o in orders for p in range(pickers)}
# y[o,s] = 1 if order o assigned to pack station s
y = {(o['id'], s): LpVariable(f"pack_{o['id']}_{s}", cat='Binary')
for o in orders for s in range(pack_stations)}
# Objective: maximize orders fulfilled (weighted by priority)
priority_weights = {'express': 3, 'priority': 2, 'standard': 1}
prob += lpSum([
priority_weights.get(o['priority'], 1) *
lpSum([x[o['id'], p] for p in range(pickers)])
for o in orders
])
# Each order assigned to at most one picker
for o in orders:
prob += lpSum([x[o['id'], p] for p in range(pickers)]) <= 1
# Each order assigned to at most one pack station
for o in orders:
prob += lpSum([y[o['id'], s] for s in range(pack_stations)]) <= 1
# Picker capacity (time-based)
pick_time_per_item = 30 # seconds
for p in range(pickers):
prob += lpSum([
o['items'] * pick_time_per_item * x[o['id'], p]
for o in orders
]) <= time_horizon * 3600
prob.solve()
assignments = {
'picking': {},
'packing': {}
}
for o in orders:
for p in range(pickers):
if x[o['id'], p].value() > 0.5:
assignments['picking'][o['id']] = p
for s in range(pack_stations):
if y[o['id'], s].value() > 0.5:
assignments['packing'][o['id']] = s
return assignments
Carrier Selection
def optimize_carrier_selection(shipments, carriers, rates, transit_times):
"""Select optimal carrier for each shipment."""
selections = []
for shipment in shipments:
origin = shipment['origin']
dest = shipment['destination']
weight = shipment['weight']
deadline = shipment.get('delivery_deadline')
candidates = []
for carrier in carriers:
rate = rates.get((carrier, origin, dest), {})
transit = transit_times.get((carrier, origin, dest))
if transit is None:
continue
# Check if carrier can meet deadline
if deadline:
arrival = calculate_arrival(transit)
if arrival > deadline:
continue
# Calculate cost
cost = calculate_shipping_cost(weight, rate)
candidates.append({
'carrier': carrier,
'cost': cost,
'transit_days': transit,
'service_level': rate.get('service_level', 'standard')
})
if candidates:
# Select cheapest option that meets requirements
best = min(candidates, key=lambda x: x['cost'])
selections.append({
'shipment_id': shipment['id'],
'selected_carrier': best['carrier'],
'cost': best['cost'],
'transit_days': best['transit_days']
})
else:
selections.append({
'shipment_id': shipment['id'],
'status': 'no_valid_carrier'
})
return selections
def calculate_shipping_cost(weight, rate):
"""Calculate shipping cost based on weight and rate structure."""
base = rate.get('base_rate', 0)
per_lb = rate.get('per_lb', 0)
fuel_surcharge = rate.get('fuel_surcharge_pct', 0)
subtotal = base + (weight * per_lb)
total = subtotal * (1 + fuel_surcharge / 100)
return round(total, 2)
Throughput Optimization
class FulfillmentThroughputOptimizer:
def __init__(self, capacity_params):
self.pick_rate = capacity_params['picks_per_hour']
self.pack_rate = capacity_params['packs_per_hour']
self.ship_rate = capacity_params['ships_per_hour']
def calculate_throughput(self, orders, resources):
"""Calculate expected throughput given resources."""
total_picks = sum(o['items'] for o in orders)
total_orders = len(orders)
pick_capacity = resources['pickers'] * self.pick_rate
pack_capacity = resources['pack_stations'] * self.pack_rate
ship_capacity = resources['ship_lanes'] * self.ship_rate
# Bottleneck determines actual throughput
pick_time = total_picks / pick_capacity if pick_capacity > 0 else float('inf')
pack_time = total_orders / pack_capacity if pack_capacity > 0 else float('inf')
ship_time = total_orders / ship_capacity if ship_capacity > 0 else float('inf')
bottleneck_time = max(pick_time, pack_time, ship_time)
return {
'orders_per_hour': total_orders / bottleneck_time if bottleneck_time > 0 else 0,
'bottleneck': 'picking' if pick_time == bottleneck_time else (
'packing' if pack_time == bottleneck_time else 'shipping'
),
'utilization': {
'picking': pick_time / bottleneck_time * 100,
'packing': pack_time / bottleneck_time * 100,
'shipping': ship_time / bottleneck_time * 100
}
}
def optimize_resource_mix(self, orders, total_resources, cost_per_resource):
"""Find optimal resource allocation."""
best_throughput = 0
best_allocation = None
# Try different allocations
for pickers in range(1, total_resources):
for packers in range(1, total_resources - pickers):
shippers = total_resources - pickers - packers
if shippers < 1:
continue
resources = {
'pickers': pickers,
'pack_stations': packers,
'ship_lanes': shippers
}
result = self.calculate_throughput(orders, resources)
if result['orders_per_hour'] > best_throughput:
best_throughput = result['orders_per_hour']
best_allocation = resources
return best_allocation, best_throughput
Fulfillment Metrics
def calculate_fulfillment_metrics(completed_orders, time_period_hours):
"""Calculate key fulfillment performance metrics."""
if not completed_orders:
return {}
df = pd.DataFrame(completed_orders)
metrics = {
'total_orders': len(df),
'total_units': df['items'].sum(),
'orders_per_hour': len(df) / time_period_hours,
'units_per_hour': df['items'].sum() / time_period_hours,
# Time metrics
'avg_fulfillment_time': df['fulfillment_time_min'].mean(),
'median_fulfillment_time': df['fulfillment_time_min'].median(),
# Accuracy
'accuracy_rate': df['accurate'].mean() * 100 if 'accurate' in df else None,
# On-time performance
'on_time_rate': df['on_time'].mean() * 100 if 'on_time' in df else None,
# By priority
'by_priority': df.groupby('priority').agg({
'id': 'count',
'fulfillment_time_min': 'mean'
}).to_dict()
}
return metrics
def track_order_lifecycle(order_events):
"""Track order through fulfillment lifecycle."""
stages = ['received', 'allocated', 'picking', 'picked', 'packing', 'packed', 'shipped']
lifecycle = {}
for event in order_events:
order_id = event['order_id']
if order_id not in lifecycle:
lifecycle[order_id] = {}
lifecycle[order_id][event['stage']] = event['timestamp']
# Calculate stage durations
for order_id, timestamps in lifecycle.items():
durations = {}
for i in range(len(stages) - 1):
if stages[i] in timestamps and stages[i+1] in timestamps:
duration = (timestamps[stages[i+1]] - timestamps[stages[i]]).total_seconds()
durations[f"{stages[i]}_to_{stages[i+1]}"] = duration
lifecycle[order_id]['durations'] = durations
return lifecycle