Claude Code Plugins

Community-maintained marketplace

Feedback

scheduling-algorithms

@benchflow-ai/skillsbench
18
0

Implement scheduling algorithms for logistics operations. Use this skill when sequencing tasks, allocating time slots, resolving resource conflicts, or optimizing job shop and flow shop scheduling problems.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name scheduling-algorithms
description Implement scheduling algorithms for logistics operations. Use this skill when sequencing tasks, allocating time slots, resolving resource conflicts, or optimizing job shop and flow shop scheduling problems.

Scheduling Algorithms

Implement scheduling algorithms for logistics and operations optimization.

Installation

pip install ortools numpy

Quick Start

from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime, timedelta

@dataclass
class Task:
    id: str
    duration: int  # minutes
    earliest_start: datetime
    latest_end: datetime
    priority: int = 1
    resource_required: str = None

def schedule_tasks_greedy(tasks: List[Task]) -> dict:
    """Simple greedy scheduling by priority and earliest start."""
    # Sort by priority (descending) then earliest start
    sorted_tasks = sorted(tasks, key=lambda t: (-t.priority, t.earliest_start))

    schedule = {}
    current_time = min(t.earliest_start for t in tasks)

    for task in sorted_tasks:
        start = max(current_time, task.earliest_start)
        end = start + timedelta(minutes=task.duration)

        if end <= task.latest_end:
            schedule[task.id] = {'start': start, 'end': end, 'status': 'scheduled'}
        else:
            schedule[task.id] = {'status': 'infeasible', 'reason': 'time_window'}

        current_time = end

    return schedule

Common Patterns

Constraint-Based Scheduling

from ortools.sat.python import cp_model

def schedule_with_constraints(tasks: List[dict], horizon: int) -> dict:
    """
    Schedule tasks with constraints using CP-SAT.
    tasks: [{'id': str, 'duration': int, 'predecessors': list, 'resource': str}]
    horizon: maximum time units
    """
    model = cp_model.CpModel()

    # Create task variables
    task_vars = {}
    for task in tasks:
        start = model.NewIntVar(0, horizon, f"{task['id']}_start")
        end = model.NewIntVar(0, horizon, f"{task['id']}_end")
        interval = model.NewIntervalVar(start, task['duration'], end, f"{task['id']}_interval")
        task_vars[task['id']] = {'start': start, 'end': end, 'interval': interval}

    # Add precedence constraints
    for task in tasks:
        for pred_id in task.get('predecessors', []):
            if pred_id in task_vars:
                model.Add(task_vars[task['id']]['start'] >= task_vars[pred_id]['end'])

    # Add resource constraints (no overlap for same resource)
    resources = {}
    for task in tasks:
        res = task.get('resource')
        if res:
            if res not in resources:
                resources[res] = []
            resources[res].append(task_vars[task['id']]['interval'])

    for res, intervals in resources.items():
        model.AddNoOverlap(intervals)

    # Minimize makespan
    makespan = model.NewIntVar(0, horizon, 'makespan')
    model.AddMaxEquality(makespan, [v['end'] for v in task_vars.values()])
    model.Minimize(makespan)

    # Solve
    solver = cp_model.CpSolver()
    status = solver.Solve(model)

    if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
        return {
            task_id: {
                'start': solver.Value(vars['start']),
                'end': solver.Value(vars['end'])
            }
            for task_id, vars in task_vars.items()
        }
    return None

Priority-Based Scheduling

def priority_scheduling(tasks: List[dict], resources: dict) -> dict:
    """
    Schedule tasks based on priority with resource availability.
    """
    import heapq

    # Priority queue: (priority, earliest_start, task)
    queue = [(-t['priority'], t['earliest_start'], t) for t in tasks]
    heapq.heapify(queue)

    resource_available = {r: 0 for r in resources}  # Time when resource becomes free
    schedule = {}

    while queue:
        _, _, task = heapq.heappop(queue)

        resource = task.get('resource')
        resource_free = resource_available.get(resource, 0) if resource else 0

        start_time = max(task['earliest_start'], resource_free)
        end_time = start_time + task['duration']

        if end_time <= task.get('latest_end', float('inf')):
            schedule[task['id']] = {
                'start': start_time,
                'end': end_time,
                'resource': resource
            }
            if resource:
                resource_available[resource] = end_time
        else:
            schedule[task['id']] = {'status': 'failed', 'reason': 'deadline_miss'}

    return schedule

Job Shop Scheduling

def job_shop_scheduling(jobs: List[List[dict]], machines: List[str]) -> dict:
    """
    Solve job shop scheduling problem.
    jobs: List of jobs, each job is a list of operations with machine and duration
    """
    model = cp_model.CpModel()
    horizon = sum(sum(op['duration'] for op in job) for job in jobs)

    # Create operation variables
    all_operations = {}
    machine_intervals = {m: [] for m in machines}

    for job_id, job in enumerate(jobs):
        for op_idx, op in enumerate(job):
            key = (job_id, op_idx)
            start = model.NewIntVar(0, horizon, f'start_{job_id}_{op_idx}')
            end = model.NewIntVar(0, horizon, f'end_{job_id}_{op_idx}')
            interval = model.NewIntervalVar(
                start, op['duration'], end, f'interval_{job_id}_{op_idx}'
            )

            all_operations[key] = {
                'start': start,
                'end': end,
                'interval': interval,
                'machine': op['machine']
            }

            machine_intervals[op['machine']].append(interval)

    # Precedence within jobs
    for job_id, job in enumerate(jobs):
        for op_idx in range(len(job) - 1):
            current = all_operations[(job_id, op_idx)]
            next_op = all_operations[(job_id, op_idx + 1)]
            model.Add(next_op['start'] >= current['end'])

    # No overlap on machines
    for machine, intervals in machine_intervals.items():
        model.AddNoOverlap(intervals)

    # Minimize makespan
    makespan = model.NewIntVar(0, horizon, 'makespan')
    model.AddMaxEquality(makespan, [
        all_operations[(len(jobs)-1, len(job)-1)]['end']
        for job_id, job in enumerate(jobs)
    ])
    model.Minimize(makespan)

    solver = cp_model.CpSolver()
    status = solver.Solve(model)

    if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
        result = {}
        for (job_id, op_idx), op in all_operations.items():
            result[f'job{job_id}_op{op_idx}'] = {
                'start': solver.Value(op['start']),
                'end': solver.Value(op['end']),
                'machine': op['machine']
            }
        result['makespan'] = solver.Value(makespan)
        return result
    return None

List Scheduling Heuristics

def longest_processing_time(tasks: List[dict], num_machines: int) -> dict:
    """
    LPT heuristic for parallel machine scheduling.
    """
    # Sort by duration descending
    sorted_tasks = sorted(tasks, key=lambda t: -t['duration'])

    machine_loads = [0] * num_machines
    assignments = {}

    for task in sorted_tasks:
        # Assign to least loaded machine
        min_machine = min(range(num_machines), key=lambda m: machine_loads[m])

        start = machine_loads[min_machine]
        end = start + task['duration']

        assignments[task['id']] = {
            'machine': min_machine,
            'start': start,
            'end': end
        }

        machine_loads[min_machine] = end

    return {
        'assignments': assignments,
        'makespan': max(machine_loads),
        'machine_loads': machine_loads
    }

Schedule Metrics

def calculate_schedule_metrics(schedule: dict, tasks: dict) -> dict:
    """
    Calculate key metrics for a schedule.
    """
    starts = [s['start'] for s in schedule.values() if 'start' in s]
    ends = [s['end'] for s in schedule.values() if 'end' in s]

    makespan = max(ends) - min(starts) if ends else 0

    # Calculate tardiness
    total_tardiness = 0
    tardy_count = 0
    for task_id, sched in schedule.items():
        if 'end' in sched and task_id in tasks:
            deadline = tasks[task_id].get('deadline')
            if deadline and sched['end'] > deadline:
                total_tardiness += sched['end'] - deadline
                tardy_count += 1

    return {
        'makespan': makespan,
        'total_tardiness': total_tardiness,
        'tardy_count': tardy_count,
        'completion_rate': (len(schedule) - tardy_count) / len(schedule) if schedule else 0,
        'avg_flow_time': sum(ends) / len(ends) - min(starts) if ends else 0
    }