Claude Code Plugins

Community-maintained marketplace

Feedback

supply-chain-analytics

@benchflow-ai/skillsbench
15
0

Analyze supply chain performance and identify optimization opportunities. Use this skill when measuring supply chain KPIs, performing cost analysis, evaluating network efficiency, or generating insights from logistics data.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name supply-chain-analytics
description Analyze supply chain performance and identify optimization opportunities. Use this skill when measuring supply chain KPIs, performing cost analysis, evaluating network efficiency, or generating insights from logistics data.

Supply Chain Analytics

Analyze supply chain performance and identify optimization opportunities.

Installation

pip install numpy pandas scipy matplotlib seaborn

Quick Start

import pandas as pd
import numpy as np

# Load supply chain data
orders = pd.read_csv('orders.csv')
shipments = pd.read_csv('shipments.csv')
inventory = pd.read_csv('inventory.csv')

# Calculate key metrics
fill_rate = orders['fulfilled'].sum() / len(orders)
avg_lead_time = shipments['lead_time_days'].mean()

Key Performance Indicators

class SupplyChainKPIs:
    def __init__(self, orders_df, shipments_df, inventory_df):
        self.orders = orders_df
        self.shipments = shipments_df
        self.inventory = inventory_df

    def calculate_fill_rate(self):
        """Perfect order fill rate."""
        perfect = self.orders[
            (self.orders['complete'] == True) &
            (self.orders['on_time'] == True) &
            (self.orders['undamaged'] == True)
        ]
        return len(perfect) / len(self.orders) * 100

    def calculate_otif(self):
        """On-Time In-Full delivery rate."""
        otif = self.orders[
            (self.orders['on_time'] == True) &
            (self.orders['complete'] == True)
        ]
        return len(otif) / len(self.orders) * 100

    def calculate_inventory_turnover(self):
        """Inventory turnover ratio."""
        cogs = self.orders['cost'].sum()
        avg_inventory = self.inventory['value'].mean()
        return cogs / avg_inventory if avg_inventory > 0 else 0

    def calculate_dsi(self):
        """Days Sales of Inventory."""
        turnover = self.calculate_inventory_turnover()
        return 365 / turnover if turnover > 0 else float('inf')

    def calculate_order_cycle_time(self):
        """Average order cycle time."""
        self.orders['cycle_time'] = (
            pd.to_datetime(self.orders['delivery_date']) -
            pd.to_datetime(self.orders['order_date'])
        ).dt.days
        return self.orders['cycle_time'].mean()

    def calculate_cash_to_cash(self):
        """Cash-to-cash cycle time."""
        dsi = self.calculate_dsi()
        dso = self.calculate_dso()  # Days Sales Outstanding
        dpo = self.calculate_dpo()  # Days Payables Outstanding
        return dsi + dso - dpo


def generate_kpi_dashboard(kpi_calculator):
    """Generate comprehensive KPI dashboard."""
    return {
        'service_metrics': {
            'fill_rate': kpi_calculator.calculate_fill_rate(),
            'otif': kpi_calculator.calculate_otif(),
            'order_cycle_time': kpi_calculator.calculate_order_cycle_time()
        },
        'inventory_metrics': {
            'turnover': kpi_calculator.calculate_inventory_turnover(),
            'days_of_supply': kpi_calculator.calculate_dsi()
        },
        'financial_metrics': {
            'cash_to_cash': kpi_calculator.calculate_cash_to_cash()
        }
    }

Cost Analysis

def total_cost_of_ownership(costs_data):
    """Calculate total cost of ownership for supply chain."""
    tco = {
        'procurement': sum(costs_data.get('procurement', [])),
        'transportation': sum(costs_data.get('transportation', [])),
        'warehousing': sum(costs_data.get('warehousing', [])),
        'inventory_carrying': sum(costs_data.get('inventory_carrying', [])),
        'order_processing': sum(costs_data.get('order_processing', [])),
        'returns_handling': sum(costs_data.get('returns', []))
    }

    tco['total'] = sum(tco.values())

    # Calculate percentages
    for key in tco:
        if key != 'total':
            tco[f'{key}_pct'] = tco[key] / tco['total'] * 100

    return tco

def landed_cost_calculation(product_cost, shipping_cost, duties, insurance,
                            handling_fees, other_costs=0):
    """Calculate landed cost for imported goods."""
    landed_cost = (
        product_cost +
        shipping_cost +
        duties +
        insurance +
        handling_fees +
        other_costs
    )

    return {
        'product_cost': product_cost,
        'shipping_cost': shipping_cost,
        'duties': duties,
        'insurance': insurance,
        'handling_fees': handling_fees,
        'other_costs': other_costs,
        'total_landed_cost': landed_cost,
        'markup_pct': (landed_cost - product_cost) / product_cost * 100
    }

def cost_to_serve_analysis(customers_df, transactions_df):
    """Analyze cost to serve different customer segments."""
    # Merge data
    merged = transactions_df.merge(customers_df, on='customer_id')

    # Calculate cost to serve by customer
    cost_to_serve = merged.groupby('customer_id').agg({
        'order_processing_cost': 'sum',
        'shipping_cost': 'sum',
        'returns_cost': 'sum',
        'revenue': 'sum'
    }).reset_index()

    cost_to_serve['total_cost'] = (
        cost_to_serve['order_processing_cost'] +
        cost_to_serve['shipping_cost'] +
        cost_to_serve['returns_cost']
    )

    cost_to_serve['profit'] = cost_to_serve['revenue'] - cost_to_serve['total_cost']
    cost_to_serve['margin_pct'] = cost_to_serve['profit'] / cost_to_serve['revenue'] * 100

    return cost_to_serve

Network Analysis

def analyze_network_efficiency(nodes_df, flows_df):
    """Analyze efficiency of supply chain network."""
    analysis = {}

    # Throughput analysis
    analysis['throughput'] = {
        'total_volume': flows_df['quantity'].sum(),
        'by_node': flows_df.groupby('destination')['quantity'].sum().to_dict()
    }

    # Utilization analysis
    nodes_with_capacity = nodes_df[nodes_df['capacity'] > 0]
    for _, node in nodes_with_capacity.iterrows():
        node_flows = flows_df[flows_df['destination'] == node['node_id']]
        utilization = node_flows['quantity'].sum() / node['capacity']
        analysis.setdefault('utilization', {})[node['node_id']] = utilization

    # Flow balance
    for node_id in nodes_df['node_id']:
        inflow = flows_df[flows_df['destination'] == node_id]['quantity'].sum()
        outflow = flows_df[flows_df['origin'] == node_id]['quantity'].sum()
        analysis.setdefault('flow_balance', {})[node_id] = {
            'inflow': inflow,
            'outflow': outflow,
            'net': inflow - outflow
        }

    return analysis

def identify_bottlenecks(network_data, threshold=0.9):
    """Identify bottlenecks in the supply chain network."""
    bottlenecks = []

    for node in network_data['nodes']:
        if node.get('utilization', 0) >= threshold:
            bottlenecks.append({
                'node': node['id'],
                'type': 'capacity',
                'utilization': node['utilization'],
                'severity': 'high' if node['utilization'] >= 0.95 else 'medium'
            })

    for edge in network_data['edges']:
        if edge.get('congestion', 0) >= threshold:
            bottlenecks.append({
                'edge': (edge['from'], edge['to']),
                'type': 'flow',
                'congestion': edge['congestion'],
                'severity': 'high' if edge['congestion'] >= 0.95 else 'medium'
            })

    return bottlenecks

Demand Analysis

def demand_pattern_analysis(demand_df):
    """Analyze demand patterns for planning."""
    analysis = {}

    # Basic statistics
    analysis['statistics'] = {
        'mean': demand_df['quantity'].mean(),
        'std': demand_df['quantity'].std(),
        'cv': demand_df['quantity'].std() / demand_df['quantity'].mean(),
        'min': demand_df['quantity'].min(),
        'max': demand_df['quantity'].max()
    }

    # Seasonality detection
    if 'date' in demand_df.columns:
        demand_df['month'] = pd.to_datetime(demand_df['date']).dt.month
        monthly_avg = demand_df.groupby('month')['quantity'].mean()
        overall_avg = demand_df['quantity'].mean()

        analysis['seasonality'] = {
            'monthly_indices': (monthly_avg / overall_avg).to_dict(),
            'peak_month': monthly_avg.idxmax(),
            'low_month': monthly_avg.idxmin(),
            'seasonal_range': monthly_avg.max() / monthly_avg.min()
        }

    # Trend analysis
    if len(demand_df) > 12:
        from scipy import stats
        x = np.arange(len(demand_df))
        slope, intercept, r_value, p_value, std_err = stats.linregress(
            x, demand_df['quantity']
        )
        analysis['trend'] = {
            'slope': slope,
            'r_squared': r_value ** 2,
            'direction': 'increasing' if slope > 0 else 'decreasing'
        }

    return analysis

def abc_xyz_analysis(products_df):
    """Perform ABC-XYZ analysis for inventory classification."""
    # ABC based on revenue/value
    products_df = products_df.sort_values('revenue', ascending=False)
    products_df['cumulative_revenue'] = products_df['revenue'].cumsum()
    products_df['cumulative_pct'] = (
        products_df['cumulative_revenue'] / products_df['revenue'].sum()
    )

    products_df['abc_class'] = 'C'
    products_df.loc[products_df['cumulative_pct'] <= 0.8, 'abc_class'] = 'A'
    products_df.loc[
        (products_df['cumulative_pct'] > 0.8) &
        (products_df['cumulative_pct'] <= 0.95),
        'abc_class'
    ] = 'B'

    # XYZ based on demand variability
    products_df['xyz_class'] = 'Z'
    products_df.loc[products_df['cv'] <= 0.5, 'xyz_class'] = 'X'
    products_df.loc[
        (products_df['cv'] > 0.5) & (products_df['cv'] <= 1.0),
        'xyz_class'
    ] = 'Y'

    products_df['combined_class'] = products_df['abc_class'] + products_df['xyz_class']

    return products_df

Reporting

def generate_supply_chain_report(data, period):
    """Generate comprehensive supply chain report."""
    report = {
        'period': period,
        'generated_at': pd.Timestamp.now().isoformat(),
        'sections': {}
    }

    # Executive summary
    report['sections']['executive_summary'] = {
        'total_orders': len(data['orders']),
        'total_revenue': data['orders']['revenue'].sum(),
        'fill_rate': calculate_fill_rate(data['orders']),
        'avg_lead_time': data['shipments']['lead_time'].mean()
    }

    # Performance trends
    report['sections']['trends'] = {
        'vs_prior_period': calculate_period_comparison(data, period),
        'vs_target': calculate_vs_target(data)
    }

    # Issues and risks
    report['sections']['issues'] = identify_issues(data)

    # Recommendations
    report['sections']['recommendations'] = generate_recommendations(data)

    return report