| name | supply-chain-analytics |
| description | Analyze supply chain performance and identify optimization opportunities. Use this skill when measuring supply chain KPIs, performing cost analysis, evaluating network efficiency, or generating insights from logistics data. |
Supply Chain Analytics
Analyze supply chain performance and identify optimization opportunities.
Installation
pip install numpy pandas scipy matplotlib seaborn
Quick Start
import pandas as pd
import numpy as np
# Load supply chain data
orders = pd.read_csv('orders.csv')
shipments = pd.read_csv('shipments.csv')
inventory = pd.read_csv('inventory.csv')
# Calculate key metrics
fill_rate = orders['fulfilled'].sum() / len(orders)
avg_lead_time = shipments['lead_time_days'].mean()
Key Performance Indicators
class SupplyChainKPIs:
def __init__(self, orders_df, shipments_df, inventory_df):
self.orders = orders_df
self.shipments = shipments_df
self.inventory = inventory_df
def calculate_fill_rate(self):
"""Perfect order fill rate."""
perfect = self.orders[
(self.orders['complete'] == True) &
(self.orders['on_time'] == True) &
(self.orders['undamaged'] == True)
]
return len(perfect) / len(self.orders) * 100
def calculate_otif(self):
"""On-Time In-Full delivery rate."""
otif = self.orders[
(self.orders['on_time'] == True) &
(self.orders['complete'] == True)
]
return len(otif) / len(self.orders) * 100
def calculate_inventory_turnover(self):
"""Inventory turnover ratio."""
cogs = self.orders['cost'].sum()
avg_inventory = self.inventory['value'].mean()
return cogs / avg_inventory if avg_inventory > 0 else 0
def calculate_dsi(self):
"""Days Sales of Inventory."""
turnover = self.calculate_inventory_turnover()
return 365 / turnover if turnover > 0 else float('inf')
def calculate_order_cycle_time(self):
"""Average order cycle time."""
self.orders['cycle_time'] = (
pd.to_datetime(self.orders['delivery_date']) -
pd.to_datetime(self.orders['order_date'])
).dt.days
return self.orders['cycle_time'].mean()
def calculate_cash_to_cash(self):
"""Cash-to-cash cycle time."""
dsi = self.calculate_dsi()
dso = self.calculate_dso() # Days Sales Outstanding
dpo = self.calculate_dpo() # Days Payables Outstanding
return dsi + dso - dpo
def generate_kpi_dashboard(kpi_calculator):
"""Generate comprehensive KPI dashboard."""
return {
'service_metrics': {
'fill_rate': kpi_calculator.calculate_fill_rate(),
'otif': kpi_calculator.calculate_otif(),
'order_cycle_time': kpi_calculator.calculate_order_cycle_time()
},
'inventory_metrics': {
'turnover': kpi_calculator.calculate_inventory_turnover(),
'days_of_supply': kpi_calculator.calculate_dsi()
},
'financial_metrics': {
'cash_to_cash': kpi_calculator.calculate_cash_to_cash()
}
}
Cost Analysis
def total_cost_of_ownership(costs_data):
"""Calculate total cost of ownership for supply chain."""
tco = {
'procurement': sum(costs_data.get('procurement', [])),
'transportation': sum(costs_data.get('transportation', [])),
'warehousing': sum(costs_data.get('warehousing', [])),
'inventory_carrying': sum(costs_data.get('inventory_carrying', [])),
'order_processing': sum(costs_data.get('order_processing', [])),
'returns_handling': sum(costs_data.get('returns', []))
}
tco['total'] = sum(tco.values())
# Calculate percentages
for key in tco:
if key != 'total':
tco[f'{key}_pct'] = tco[key] / tco['total'] * 100
return tco
def landed_cost_calculation(product_cost, shipping_cost, duties, insurance,
handling_fees, other_costs=0):
"""Calculate landed cost for imported goods."""
landed_cost = (
product_cost +
shipping_cost +
duties +
insurance +
handling_fees +
other_costs
)
return {
'product_cost': product_cost,
'shipping_cost': shipping_cost,
'duties': duties,
'insurance': insurance,
'handling_fees': handling_fees,
'other_costs': other_costs,
'total_landed_cost': landed_cost,
'markup_pct': (landed_cost - product_cost) / product_cost * 100
}
def cost_to_serve_analysis(customers_df, transactions_df):
"""Analyze cost to serve different customer segments."""
# Merge data
merged = transactions_df.merge(customers_df, on='customer_id')
# Calculate cost to serve by customer
cost_to_serve = merged.groupby('customer_id').agg({
'order_processing_cost': 'sum',
'shipping_cost': 'sum',
'returns_cost': 'sum',
'revenue': 'sum'
}).reset_index()
cost_to_serve['total_cost'] = (
cost_to_serve['order_processing_cost'] +
cost_to_serve['shipping_cost'] +
cost_to_serve['returns_cost']
)
cost_to_serve['profit'] = cost_to_serve['revenue'] - cost_to_serve['total_cost']
cost_to_serve['margin_pct'] = cost_to_serve['profit'] / cost_to_serve['revenue'] * 100
return cost_to_serve
Network Analysis
def analyze_network_efficiency(nodes_df, flows_df):
"""Analyze efficiency of supply chain network."""
analysis = {}
# Throughput analysis
analysis['throughput'] = {
'total_volume': flows_df['quantity'].sum(),
'by_node': flows_df.groupby('destination')['quantity'].sum().to_dict()
}
# Utilization analysis
nodes_with_capacity = nodes_df[nodes_df['capacity'] > 0]
for _, node in nodes_with_capacity.iterrows():
node_flows = flows_df[flows_df['destination'] == node['node_id']]
utilization = node_flows['quantity'].sum() / node['capacity']
analysis.setdefault('utilization', {})[node['node_id']] = utilization
# Flow balance
for node_id in nodes_df['node_id']:
inflow = flows_df[flows_df['destination'] == node_id]['quantity'].sum()
outflow = flows_df[flows_df['origin'] == node_id]['quantity'].sum()
analysis.setdefault('flow_balance', {})[node_id] = {
'inflow': inflow,
'outflow': outflow,
'net': inflow - outflow
}
return analysis
def identify_bottlenecks(network_data, threshold=0.9):
"""Identify bottlenecks in the supply chain network."""
bottlenecks = []
for node in network_data['nodes']:
if node.get('utilization', 0) >= threshold:
bottlenecks.append({
'node': node['id'],
'type': 'capacity',
'utilization': node['utilization'],
'severity': 'high' if node['utilization'] >= 0.95 else 'medium'
})
for edge in network_data['edges']:
if edge.get('congestion', 0) >= threshold:
bottlenecks.append({
'edge': (edge['from'], edge['to']),
'type': 'flow',
'congestion': edge['congestion'],
'severity': 'high' if edge['congestion'] >= 0.95 else 'medium'
})
return bottlenecks
Demand Analysis
def demand_pattern_analysis(demand_df):
"""Analyze demand patterns for planning."""
analysis = {}
# Basic statistics
analysis['statistics'] = {
'mean': demand_df['quantity'].mean(),
'std': demand_df['quantity'].std(),
'cv': demand_df['quantity'].std() / demand_df['quantity'].mean(),
'min': demand_df['quantity'].min(),
'max': demand_df['quantity'].max()
}
# Seasonality detection
if 'date' in demand_df.columns:
demand_df['month'] = pd.to_datetime(demand_df['date']).dt.month
monthly_avg = demand_df.groupby('month')['quantity'].mean()
overall_avg = demand_df['quantity'].mean()
analysis['seasonality'] = {
'monthly_indices': (monthly_avg / overall_avg).to_dict(),
'peak_month': monthly_avg.idxmax(),
'low_month': monthly_avg.idxmin(),
'seasonal_range': monthly_avg.max() / monthly_avg.min()
}
# Trend analysis
if len(demand_df) > 12:
from scipy import stats
x = np.arange(len(demand_df))
slope, intercept, r_value, p_value, std_err = stats.linregress(
x, demand_df['quantity']
)
analysis['trend'] = {
'slope': slope,
'r_squared': r_value ** 2,
'direction': 'increasing' if slope > 0 else 'decreasing'
}
return analysis
def abc_xyz_analysis(products_df):
"""Perform ABC-XYZ analysis for inventory classification."""
# ABC based on revenue/value
products_df = products_df.sort_values('revenue', ascending=False)
products_df['cumulative_revenue'] = products_df['revenue'].cumsum()
products_df['cumulative_pct'] = (
products_df['cumulative_revenue'] / products_df['revenue'].sum()
)
products_df['abc_class'] = 'C'
products_df.loc[products_df['cumulative_pct'] <= 0.8, 'abc_class'] = 'A'
products_df.loc[
(products_df['cumulative_pct'] > 0.8) &
(products_df['cumulative_pct'] <= 0.95),
'abc_class'
] = 'B'
# XYZ based on demand variability
products_df['xyz_class'] = 'Z'
products_df.loc[products_df['cv'] <= 0.5, 'xyz_class'] = 'X'
products_df.loc[
(products_df['cv'] > 0.5) & (products_df['cv'] <= 1.0),
'xyz_class'
] = 'Y'
products_df['combined_class'] = products_df['abc_class'] + products_df['xyz_class']
return products_df
Reporting
def generate_supply_chain_report(data, period):
"""Generate comprehensive supply chain report."""
report = {
'period': period,
'generated_at': pd.Timestamp.now().isoformat(),
'sections': {}
}
# Executive summary
report['sections']['executive_summary'] = {
'total_orders': len(data['orders']),
'total_revenue': data['orders']['revenue'].sum(),
'fill_rate': calculate_fill_rate(data['orders']),
'avg_lead_time': data['shipments']['lead_time'].mean()
}
# Performance trends
report['sections']['trends'] = {
'vs_prior_period': calculate_period_comparison(data, period),
'vs_target': calculate_vs_target(data)
}
# Issues and risks
report['sections']['issues'] = identify_issues(data)
# Recommendations
report['sections']['recommendations'] = generate_recommendations(data)
return report