| name | scheduling-algorithms |
| description | Implement scheduling algorithms for logistics operations. Use this skill when sequencing tasks, allocating time slots, resolving resource conflicts, or optimizing job shop and flow shop scheduling problems. |
Scheduling Algorithms
Implement scheduling algorithms for logistics and operations optimization.
Installation
pip install ortools numpy
Quick Start
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime, timedelta
@dataclass
class Task:
id: str
duration: int # minutes
earliest_start: datetime
latest_end: datetime
priority: int = 1
resource_required: str = None
def schedule_tasks_greedy(tasks: List[Task]) -> dict:
"""Simple greedy scheduling by priority and earliest start."""
# Sort by priority (descending) then earliest start
sorted_tasks = sorted(tasks, key=lambda t: (-t.priority, t.earliest_start))
schedule = {}
current_time = min(t.earliest_start for t in tasks)
for task in sorted_tasks:
start = max(current_time, task.earliest_start)
end = start + timedelta(minutes=task.duration)
if end <= task.latest_end:
schedule[task.id] = {'start': start, 'end': end, 'status': 'scheduled'}
else:
schedule[task.id] = {'status': 'infeasible', 'reason': 'time_window'}
current_time = end
return schedule
Common Patterns
Constraint-Based Scheduling
from ortools.sat.python import cp_model
def schedule_with_constraints(tasks: List[dict], horizon: int) -> dict:
"""
Schedule tasks with constraints using CP-SAT.
tasks: [{'id': str, 'duration': int, 'predecessors': list, 'resource': str}]
horizon: maximum time units
"""
model = cp_model.CpModel()
# Create task variables
task_vars = {}
for task in tasks:
start = model.NewIntVar(0, horizon, f"{task['id']}_start")
end = model.NewIntVar(0, horizon, f"{task['id']}_end")
interval = model.NewIntervalVar(start, task['duration'], end, f"{task['id']}_interval")
task_vars[task['id']] = {'start': start, 'end': end, 'interval': interval}
# Add precedence constraints
for task in tasks:
for pred_id in task.get('predecessors', []):
if pred_id in task_vars:
model.Add(task_vars[task['id']]['start'] >= task_vars[pred_id]['end'])
# Add resource constraints (no overlap for same resource)
resources = {}
for task in tasks:
res = task.get('resource')
if res:
if res not in resources:
resources[res] = []
resources[res].append(task_vars[task['id']]['interval'])
for res, intervals in resources.items():
model.AddNoOverlap(intervals)
# Minimize makespan
makespan = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(makespan, [v['end'] for v in task_vars.values()])
model.Minimize(makespan)
# Solve
solver = cp_model.CpSolver()
status = solver.Solve(model)
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
return {
task_id: {
'start': solver.Value(vars['start']),
'end': solver.Value(vars['end'])
}
for task_id, vars in task_vars.items()
}
return None
Priority-Based Scheduling
def priority_scheduling(tasks: List[dict], resources: dict) -> dict:
"""
Schedule tasks based on priority with resource availability.
"""
import heapq
# Priority queue: (priority, earliest_start, task)
queue = [(-t['priority'], t['earliest_start'], t) for t in tasks]
heapq.heapify(queue)
resource_available = {r: 0 for r in resources} # Time when resource becomes free
schedule = {}
while queue:
_, _, task = heapq.heappop(queue)
resource = task.get('resource')
resource_free = resource_available.get(resource, 0) if resource else 0
start_time = max(task['earliest_start'], resource_free)
end_time = start_time + task['duration']
if end_time <= task.get('latest_end', float('inf')):
schedule[task['id']] = {
'start': start_time,
'end': end_time,
'resource': resource
}
if resource:
resource_available[resource] = end_time
else:
schedule[task['id']] = {'status': 'failed', 'reason': 'deadline_miss'}
return schedule
Job Shop Scheduling
def job_shop_scheduling(jobs: List[List[dict]], machines: List[str]) -> dict:
"""
Solve job shop scheduling problem.
jobs: List of jobs, each job is a list of operations with machine and duration
"""
model = cp_model.CpModel()
horizon = sum(sum(op['duration'] for op in job) for job in jobs)
# Create operation variables
all_operations = {}
machine_intervals = {m: [] for m in machines}
for job_id, job in enumerate(jobs):
for op_idx, op in enumerate(job):
key = (job_id, op_idx)
start = model.NewIntVar(0, horizon, f'start_{job_id}_{op_idx}')
end = model.NewIntVar(0, horizon, f'end_{job_id}_{op_idx}')
interval = model.NewIntervalVar(
start, op['duration'], end, f'interval_{job_id}_{op_idx}'
)
all_operations[key] = {
'start': start,
'end': end,
'interval': interval,
'machine': op['machine']
}
machine_intervals[op['machine']].append(interval)
# Precedence within jobs
for job_id, job in enumerate(jobs):
for op_idx in range(len(job) - 1):
current = all_operations[(job_id, op_idx)]
next_op = all_operations[(job_id, op_idx + 1)]
model.Add(next_op['start'] >= current['end'])
# No overlap on machines
for machine, intervals in machine_intervals.items():
model.AddNoOverlap(intervals)
# Minimize makespan
makespan = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(makespan, [
all_operations[(len(jobs)-1, len(job)-1)]['end']
for job_id, job in enumerate(jobs)
])
model.Minimize(makespan)
solver = cp_model.CpSolver()
status = solver.Solve(model)
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
result = {}
for (job_id, op_idx), op in all_operations.items():
result[f'job{job_id}_op{op_idx}'] = {
'start': solver.Value(op['start']),
'end': solver.Value(op['end']),
'machine': op['machine']
}
result['makespan'] = solver.Value(makespan)
return result
return None
List Scheduling Heuristics
def longest_processing_time(tasks: List[dict], num_machines: int) -> dict:
"""
LPT heuristic for parallel machine scheduling.
"""
# Sort by duration descending
sorted_tasks = sorted(tasks, key=lambda t: -t['duration'])
machine_loads = [0] * num_machines
assignments = {}
for task in sorted_tasks:
# Assign to least loaded machine
min_machine = min(range(num_machines), key=lambda m: machine_loads[m])
start = machine_loads[min_machine]
end = start + task['duration']
assignments[task['id']] = {
'machine': min_machine,
'start': start,
'end': end
}
machine_loads[min_machine] = end
return {
'assignments': assignments,
'makespan': max(machine_loads),
'machine_loads': machine_loads
}
Schedule Metrics
def calculate_schedule_metrics(schedule: dict, tasks: dict) -> dict:
"""
Calculate key metrics for a schedule.
"""
starts = [s['start'] for s in schedule.values() if 'start' in s]
ends = [s['end'] for s in schedule.values() if 'end' in s]
makespan = max(ends) - min(starts) if ends else 0
# Calculate tardiness
total_tardiness = 0
tardy_count = 0
for task_id, sched in schedule.items():
if 'end' in sched and task_id in tasks:
deadline = tasks[task_id].get('deadline')
if deadline and sched['end'] > deadline:
total_tardiness += sched['end'] - deadline
tardy_count += 1
return {
'makespan': makespan,
'total_tardiness': total_tardiness,
'tardy_count': tardy_count,
'completion_rate': (len(schedule) - tardy_count) / len(schedule) if schedule else 0,
'avg_flow_time': sum(ends) / len(ends) - min(starts) if ends else 0
}