Claude Code Plugins

Community-maintained marketplace

Feedback

traffic-and-pathfinding

@tachyon-beep/skillpacks
1
0

A*, pathfinding algorithms, hierarchical navigation, traffic flow control

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name traffic-and-pathfinding
description A*, pathfinding algorithms, hierarchical navigation, traffic flow control

Traffic and Pathfinding

When to use this skill: When implementing navigation systems, traffic simulation, crowd movement, or any scenario involving multiple agents finding paths through an environment. Critical for city builders, RTS games, open-world navigation, and crowd simulation.

What this skill provides: Comprehensive understanding of pathfinding algorithms (A*, Dijkstra, JPS), hierarchical pathfinding for scale, traffic flow management, congestion handling, dynamic re-pathing, and performance optimization patterns for 1000+ simultaneous agents.


Core Concepts

Pathfinding Algorithms

A (A-Star)*

  • Use case: Single-source, single-destination pathfinding with heuristic guidance
  • Complexity: O(E log V) where E = edges, V = vertices
  • Strengths: Optimal paths, widely understood, efficient with good heuristics
  • Limitations: Explores many nodes for long distances, no path sharing between agents

Dijkstra's Algorithm

  • Use case: Single-source to all destinations, guaranteed shortest path
  • Complexity: O(E log V)
  • Strengths: Optimal paths, useful for pre-computing distance maps
  • Use when: Need paths from one point to ALL other points (e.g., delivery hub)

Jump Point Search (JPS)

  • Use case: Grid-based pathfinding with symmetry breaking
  • Complexity: Much faster than A* on uniform cost grids
  • Strengths: 10-40× speedup on open grids, same path quality as A*
  • Limitations: Only works on uniform-cost grids, requires preprocessing

Hierarchical Pathfinding (HPA)*

  • Use case: Long-distance navigation in large environments
  • Method: Divide map into clusters, build high-level graph, plan at multiple levels
  • Complexity: Drastically reduced search space (10-100× faster for long paths)
  • Strengths: Scales to massive maps, can cache high-level paths
  • Example: Cities Skylines uses hierarchical road network (highways → arterial → local)

Flow Fields

  • Use case: Many agents moving toward same destination (crowds, RTS unit groups)
  • Method: Pre-compute direction field across entire map from goal
  • Complexity: O(N) for field generation, O(1) per agent movement
  • Strengths: Thousands of agents with negligible per-agent cost
  • Best for: RTS games (100+ units attack-moving), crowd evacuation scenarios

Navigation Mesh (NavMesh)

  • Use case: 3D environments, non-grid spaces, complex terrain
  • Method: Polygon mesh representing walkable surfaces, pathfind on mesh
  • Strengths: Handles slopes, stairs, platforms; natural for 3D worlds
  • Tools: Recast/Detour (industry standard), Unity NavMesh

Traffic Flow Concepts

Congestion Management

  • Dynamic Cost Adjustment: Increase edge cost based on vehicle density
  • Heat Maps: Track traffic density per road segment, update every N frames
  • Spillback: Model how congestion propagates backward from blockages

Lane-Based Navigation

  • Multi-lane Roads: Represent roads as bundles of parallel lanes
  • Lane Changes: Model as higher-cost transitions between adjacent lanes
  • Turn Lanes: Dedicated lanes for turning increase intersection throughput

Intersection Management

  • Traffic Signals: Timed signals with phase plans (red/yellow/green)
  • Reservation Systems: Time-space reservations for autonomous vehicles
  • Priority Rules: Right-of-way, yield signs, stop signs

Decision Frameworks

Framework 1: Choosing the Right Pathfinding Algorithm

START: What's my navigation scenario?

├─ GRID-BASED with UNIFORM COSTS?
│  ├─ Small map (< 1000 nodes)? → Use A* (simple, fast enough)
│  ├─ Large open grids? → Use JPS (10-40× faster than A*)
│  └─ Many obstacles? → Use A* (JPS benefits diminish)
│
├─ NEED PATHS TO MULTIPLE DESTINATIONS?
│  └─ From ONE source to ALL destinations? → Use Dijkstra (single search)
│
├─ LARGE MAP (> 10,000 nodes) with LONG PATHS?
│  └─ Use Hierarchical Pathfinding (HPA*, hierarchical A*)
│     - Divide into clusters (16×16 or 32×32)
│     - Build inter-cluster graph
│     - Plan at high level, refine locally
│
├─ MANY AGENTS (> 100) moving to SAME GOAL?
│  └─ Use Flow Fields
│     - Compute once per destination
│     - All agents follow field (O(1) per agent)
│     - Update field when goal changes
│
├─ 3D ENVIRONMENT with COMPLEX TERRAIN?
│  └─ Use NavMesh (Recast/Detour, Unity NavMesh)
│     - Handles slopes, stairs, platforms
│     - Better for non-grid spaces
│
└─ DYNAMIC ENVIRONMENT with FREQUENT CHANGES?
   └─ Use D* Lite or LPA*
      - Incrementally repair paths when map changes
      - Much faster than full recalculation

Example Decision: Cities Skylines traffic

  • Large city map (100k+ nodes) → Hierarchical pathfinding
  • Multiple vehicle types → NavMesh for complex vehicle physics
  • Traffic congestion → Dynamic cost adjustment every 10 frames

Framework 2: When to Recalculate Paths

NEVER recalculate every frame (performance death)

├─ PATH BECOMES INVALID?
│  ├─ Blocked by obstacle → Recalculate immediately
│  ├─ Destination moved → Recalculate immediately
│  └─ Road closed/destroyed → Recalculate immediately
│
├─ PATH STILL VALID but SUBOPTIMAL?
│  ├─ Traffic congestion on route → Recalculate after delay (5-10 sec)
│  ├─ Found shortcut → Recalculate opportunistically (low priority)
│  └─ Better route available → Queue for background recalc
│
├─ AGENT DEVIATED FROM PATH?
│  ├─ Small deviation → Use local correction (steer back)
│  ├─ Large deviation → Recalculate from current position
│  └─ Pushed off path → Recalculate after N failed corrections
│
└─ PERIODIC REFRESH?
   └─ Recalculate every N seconds (30-60 sec typical)
      - Catch gradual cost changes
      - Spread recalc cost over time
      - Lower priority than invalid paths

Performance Budget Example:

  • 1000 vehicles at 60 FPS
  • Budget: 5ms for pathfinding per frame
  • Max paths per frame: ~10 (0.5ms each)
  • Queue remaining requests, process over multiple frames

Framework 3: Exact vs Approximate Paths

Choose path quality based on agent importance and distance

├─ DISTANT FROM CAMERA (> 100 units)?
│  └─ Use LOW-DETAIL paths
│     - Fewer waypoints (every 10th node)
│     - Straight-line segments
│     - Skip local avoidance
│     - Update less frequently
│
├─ NEAR CAMERA (< 50 units)?
│  └─ Use HIGH-DETAIL paths
│     - All waypoints
│     - Smooth curves (spline interpolation)
│     - Local steering behaviors
│     - Frequent updates
│
├─ BACKGROUND TRAFFIC?
│  └─ Use SIMPLIFIED paths
│     - Scripted routes (no pathfinding)
│     - Pre-baked traffic patterns
│     - No collision avoidance
│
└─ PLAYER-CONTROLLED or HERO UNITS?
   └─ Use EXACT paths
      - Full pathfinding
      - Smooth movement
      - Perfect collision avoidance

Level-of-Detail System:

Distance from camera:
- 0-50 units: Full pathfinding, 10 Hz updates
- 50-100 units: Reduced waypoints, 5 Hz updates
- 100-200 units: Major waypoints only, 1 Hz updates
- 200+ units: Straight-line movement, 0.2 Hz updates

Implementation Patterns

Pattern 1: Path Caching and Sharing

Problem: Recalculating identical paths wastes CPU. Multiple agents going A→B all compute separately.

Solution: Cache paths in lookup table, share between agents.

class PathCache:
    def __init__(self, max_size=10000, ttl=60.0):
        self.cache = {}  # (start, goal) -> CachedPath
        self.max_size = max_size
        self.ttl = ttl  # Time-to-live in seconds

    def get_path(self, start, goal, current_time):
        key = (start, goal)

        if key in self.cache:
            cached = self.cache[key]
            # Check if still valid
            if current_time - cached.timestamp < self.ttl:
                cached.ref_count += 1
                return cached.path
            else:
                del self.cache[key]  # Expired

        return None

    def cache_path(self, start, goal, path, current_time):
        key = (start, goal)

        # Evict oldest if cache full
        if len(self.cache) >= self.max_size:
            oldest = min(self.cache.items(), key=lambda x: x[1].timestamp)
            del self.cache[oldest[0]]

        self.cache[key] = CachedPath(path, current_time, ref_count=1)

    def invalidate_region(self, bbox):
        """Invalidate cached paths through region (for dynamic obstacles)"""
        keys_to_remove = []
        for key, cached in self.cache.items():
            if self._path_intersects_bbox(cached.path, bbox):
                keys_to_remove.append(key)

        for key in keys_to_remove:
            del self.cache[key]

class CachedPath:
    def __init__(self, path, timestamp, ref_count=0):
        self.path = path
        self.timestamp = timestamp
        self.ref_count = ref_count

Benefits:

  • 10-100× speedup when multiple agents share destinations
  • Reduced CPU for common routes (residential → downtown)
  • Automatic invalidation when map changes

When to use:

  • City builders (many cars going to popular districts)
  • RTS games (multiple units attack-moving to same location)
  • Crowd simulations (people going to exits, landmarks)

Real-world example: SimCity 4 caches paths between residential and commercial zones, updated when zoning changes.

Pattern 2: Hierarchical A* for Large Maps

Problem: A* searches too many nodes on large maps. 100×100 grid = 10,000 nodes for cross-map paths.

Solution: Divide map into clusters, create high-level graph, search at multiple abstraction levels.

class HierarchicalPathfinder:
    def __init__(self, world_map, cluster_size=16):
        self.world_map = world_map
        self.cluster_size = cluster_size

        # Build hierarchy
        self.clusters = self._build_clusters()
        self.high_level_graph = self._build_high_level_graph()

    def find_path(self, start, goal):
        # Step 1: Find which clusters contain start and goal
        start_cluster = self._get_cluster(start)
        goal_cluster = self._get_cluster(goal)

        if start_cluster == goal_cluster:
            # Same cluster, use standard A*
            return self._astar_local(start, goal)

        # Step 2: Find high-level path (cluster to cluster)
        cluster_path = self._astar_high_level(start_cluster, goal_cluster)

        if not cluster_path:
            return None  # No path exists

        # Step 3: Refine to low-level path (actual nodes)
        full_path = []

        # Path from start to first cluster exit
        entry_point = self._get_entry_point(start_cluster, cluster_path[1])
        full_path.extend(self._astar_local(start, entry_point))

        # Path through intermediate clusters
        for i in range(1, len(cluster_path) - 1):
            current_cluster = cluster_path[i]
            next_cluster = cluster_path[i + 1]

            exit_point = self._get_entry_point(current_cluster, next_cluster)
            full_path.extend(self._astar_local(entry_point, exit_point))
            entry_point = exit_point

        # Path from last cluster entry to goal
        full_path.extend(self._astar_local(entry_point, goal))

        return full_path

    def _build_clusters(self):
        """Divide map into grid of clusters"""
        clusters = []
        for y in range(0, self.world_map.height, self.cluster_size):
            for x in range(0, self.world_map.width, self.cluster_size):
                cluster = Cluster(x, y, self.cluster_size)
                self._find_border_nodes(cluster)
                clusters.append(cluster)
        return clusters

    def _build_high_level_graph(self):
        """Build graph connecting cluster border nodes"""
        graph = {}

        for cluster in self.clusters:
            for border_node in cluster.border_nodes:
                # Pre-compute paths to all other border nodes in same cluster
                for other_node in cluster.border_nodes:
                    if border_node != other_node:
                        path = self._astar_local(border_node, other_node)
                        if path:
                            graph[(border_node, other_node)] = len(path)

        return graph

    def _astar_high_level(self, start_cluster, goal_cluster):
        """A* search on cluster graph"""
        # Standard A* but on clusters, not individual nodes
        # Returns list of clusters forming high-level path
        pass

    def _astar_local(self, start, goal):
        """Standard A* within a cluster or between nearby points"""
        pass

Performance Improvement:

  • Without hierarchy: Search 10,000 nodes for 100×100 map
  • With hierarchy (16×16 clusters): Search ~40 clusters + local refinement = ~500 nodes
  • Speedup: 20× faster for long-distance paths

When to use:

  • Maps larger than 5000 nodes
  • Agents frequently traveling long distances
  • Multiple levels of road hierarchy (highways vs local)

Real-world example: Cities Skylines uses 3-level hierarchy:

  1. Highway network (high-level)
  2. Arterial roads (mid-level)
  3. Local streets (low-level)

Pattern 3: Flow Fields for Crowds

Problem: 100+ units moving to same goal. Each running A* = 100× redundant computation.

Solution: Compute direction field once, all units follow arrows.

import numpy as np
from collections import deque

class FlowField:
    def __init__(self, world_map):
        self.world_map = world_map
        self.width = world_map.width
        self.height = world_map.height

        # Pre-allocate arrays
        self.cost_field = np.full((self.height, self.width), np.inf)
        self.integration_field = np.full((self.height, self.width), np.inf)
        self.flow_field = np.zeros((self.height, self.width, 2))  # Direction vectors

    def generate(self, goal_position):
        """Generate flow field from goal"""
        # Step 1: Create cost field (cost to traverse each cell)
        self._generate_cost_field()

        # Step 2: Integration field (distance from goal via Dijkstra)
        self._generate_integration_field(goal_position)

        # Step 3: Flow field (direction of steepest descent)
        self._generate_flow_field()

    def _generate_cost_field(self):
        """Assign traversal cost to each cell"""
        for y in range(self.height):
            for x in range(self.width):
                if self.world_map.is_walkable(x, y):
                    # Base cost + terrain cost
                    self.cost_field[y, x] = self.world_map.get_terrain_cost(x, y)
                else:
                    self.cost_field[y, x] = np.inf  # Unwalkable

    def _generate_integration_field(self, goal):
        """Dijkstra from goal, fills integration field with distances"""
        gx, gy = goal
        self.integration_field.fill(np.inf)
        self.integration_field[gy, gx] = 0

        # BFS/Dijkstra from goal
        queue = deque([goal])

        while queue:
            x, y = queue.popleft()
            current_cost = self.integration_field[y, x]

            # Check all neighbors
            for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0),
                          (1, 1), (1, -1), (-1, 1), (-1, -1)]:
                nx, ny = x + dx, y + dy

                if not (0 <= nx < self.width and 0 <= ny < self.height):
                    continue

                if self.cost_field[ny, nx] == np.inf:
                    continue  # Unwalkable

                # Calculate cost to reach neighbor
                new_cost = current_cost + self.cost_field[ny, nx]

                if new_cost < self.integration_field[ny, nx]:
                    self.integration_field[ny, nx] = new_cost
                    queue.append((nx, ny))

    def _generate_flow_field(self):
        """Generate direction vectors pointing toward goal"""
        for y in range(self.height):
            for x in range(self.width):
                if self.cost_field[y, x] == np.inf:
                    continue  # Unwalkable

                # Find neighbor with lowest integration value
                best_dir = None
                best_cost = self.integration_field[y, x]

                for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0),
                              (1, 1), (1, -1), (-1, 1), (-1, -1)]:
                    nx, ny = x + dx, y + dy

                    if not (0 <= nx < self.width and 0 <= ny < self.height):
                        continue

                    neighbor_cost = self.integration_field[ny, nx]
                    if neighbor_cost < best_cost:
                        best_cost = neighbor_cost
                        best_dir = (dx, dy)

                if best_dir:
                    # Normalize direction
                    length = np.sqrt(best_dir[0]**2 + best_dir[1]**2)
                    self.flow_field[y, x] = (best_dir[0] / length,
                                            best_dir[1] / length)

    def get_direction(self, position):
        """Get movement direction at position"""
        x, y = int(position[0]), int(position[1])
        if 0 <= x < self.width and 0 <= y < self.height:
            return self.flow_field[y, x]
        return (0, 0)


class Agent:
    def __init__(self, position, speed=1.0):
        self.position = np.array(position, dtype=float)
        self.speed = speed

    def update(self, flow_field, dt):
        """Move following flow field"""
        direction = flow_field.get_direction(self.position)
        self.position += direction * self.speed * dt

Performance Comparison:

  • Individual A*: 100 agents × 1ms each = 100ms per frame
  • Flow field: 5ms generation + 100 agents × 0.001ms = 5.1ms per frame
  • Speedup: ~20× faster

When to use:

  • RTS games (attack-move, rally points)
  • Crowd evacuation (everyone heading to exits)
  • Tower defense (enemies moving to goal)
  • Any scenario with 50+ agents sharing destination

Real-world example: Supreme Commander uses flow fields for 1000+ unit formations moving together.

Pattern 4: Dynamic Cost Adjustment for Traffic

Problem: All vehicles choose "optimal" path, creating traffic jam. No adaptation to congestion.

Solution: Track vehicle density per road segment, increase cost of congested roads, trigger re-routing.

class TrafficManager:
    def __init__(self, road_network):
        self.road_network = road_network
        self.traffic_density = {}  # edge_id -> vehicle count
        self.base_costs = {}       # edge_id -> base travel time
        self.update_interval = 1.0  # Update costs every N seconds
        self.last_update = 0

        # Initialize
        for edge in road_network.edges:
            self.traffic_density[edge.id] = 0
            self.base_costs[edge.id] = edge.length / edge.speed_limit

    def update(self, vehicles, current_time):
        """Update traffic densities and edge costs"""
        if current_time - self.last_update < self.update_interval:
            return

        # Reset densities
        for edge_id in self.traffic_density:
            self.traffic_density[edge_id] = 0

        # Count vehicles on each edge
        for vehicle in vehicles:
            if vehicle.current_edge:
                self.traffic_density[vehicle.current_edge] += 1

        # Update edge costs based on congestion
        for edge in self.road_network.edges:
            density = self.traffic_density[edge.id]
            capacity = edge.lane_count * 10  # Vehicles per lane

            # Congestion factor (BPR function - standard in traffic engineering)
            congestion_ratio = density / capacity
            congestion_factor = 1.0 + 0.15 * (congestion_ratio ** 4)

            # Update edge cost
            new_cost = self.base_costs[edge.id] * congestion_factor
            edge.current_cost = new_cost

            # Mark for re-routing if severely congested
            if congestion_ratio > 0.8:
                self._trigger_reroute(edge)

        self.last_update = current_time

    def _trigger_reroute(self, edge):
        """Notify vehicles on congested edge to consider re-routing"""
        for vehicle in self.road_network.get_vehicles_on_edge(edge.id):
            # Don't reroute everyone at once (causes oscillation)
            if random.random() < 0.2:  # 20% chance
                vehicle.request_reroute(reason='congestion')

    def get_edge_cost(self, edge_id, include_congestion=True):
        """Get current cost of edge"""
        if include_congestion:
            return self.road_network.get_edge(edge_id).current_cost
        else:
            return self.base_costs[edge_id]


class Vehicle:
    def __init__(self, vehicle_id, start, destination):
        self.id = vehicle_id
        self.position = start
        self.destination = destination
        self.path = []
        self.current_edge = None
        self.last_reroute_time = 0
        self.reroute_cooldown = 10.0  # Don't reroute more than every 10 sec

    def request_reroute(self, reason='congestion'):
        """Request path recalculation"""
        current_time = time.time()

        # Cooldown to prevent thrashing
        if current_time - self.last_reroute_time < self.reroute_cooldown:
            return

        # Calculate new path with current costs
        new_path = self.pathfinder.find_path(
            self.position,
            self.destination,
            use_current_costs=True
        )

        if new_path and len(new_path) < len(self.path) * 0.9:
            # New path is significantly better (10% shorter)
            self.path = new_path
            self.last_reroute_time = current_time

Bureau of Public Roads (BPR) Function:

travel_time = free_flow_time × (1 + α × (density/capacity)^β)

Standard values:
- α = 0.15
- β = 4
- Results in realistic congestion curves

When to use:

  • City builders (Cities Skylines, SimCity)
  • Traffic simulators
  • Delivery route optimization
  • Any network flow problem with capacity constraints

Real-world example: Cities Skylines updates road costs every 30 game ticks based on vehicle counts. Vehicles reroute probabilistically to avoid oscillation.

Pattern 5: Asynchronous Pathfinding with Request Queue

Problem: Can't calculate all 1000 paths in single frame. Causes frame drops.

Solution: Queue path requests, process limited number per frame, deliver asynchronously.

from collections import deque
from dataclasses import dataclass
from typing import Callable, Optional
import time

@dataclass
class PathRequest:
    id: int
    start: tuple
    goal: tuple
    priority: int  # Lower = higher priority
    callback: Callable
    timestamp: float
    max_search_nodes: int = 10000

class AsyncPathfinder:
    def __init__(self, pathfinder, max_ms_per_frame=5.0):
        self.pathfinder = pathfinder
        self.max_ms_per_frame = max_ms_per_frame
        self.request_queue = []  # Priority queue
        self.next_request_id = 0
        self.active_searches = {}  # id -> generator

    def request_path(self, start, goal, callback, priority=5):
        """Queue a path request"""
        request = PathRequest(
            id=self.next_request_id,
            start=start,
            goal=goal,
            priority=priority,
            callback=callback,
            timestamp=time.time()
        )
        self.next_request_id += 1

        # Insert into priority queue (min-heap)
        import heapq
        heapq.heappush(self.request_queue, (priority, request))

        return request.id

    def update(self):
        """Process path requests within time budget"""
        start_time = time.perf_counter()

        # Continue active searches first
        completed_ids = []
        for request_id, search_gen in self.active_searches.items():
            try:
                next(search_gen)  # Continue search

                # Check time budget
                elapsed_ms = (time.perf_counter() - start_time) * 1000
                if elapsed_ms > self.max_ms_per_frame:
                    return  # Out of time, continue next frame

            except StopIteration as result:
                # Search completed
                path = result.value
                request = self._get_request(request_id)
                request.callback(path)
                completed_ids.append(request_id)

        # Remove completed searches
        for req_id in completed_ids:
            del self.active_searches[req_id]

        # Start new searches if time permits
        while self.request_queue:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            if elapsed_ms > self.max_ms_per_frame:
                break

            # Get highest priority request
            _, request = heapq.heappop(self.request_queue)

            # Start incremental search
            search_gen = self.pathfinder.find_path_incremental(
                request.start,
                request.goal,
                max_nodes_per_step=100
            )
            self.active_searches[request.id] = search_gen

    def cancel_request(self, request_id):
        """Cancel pending request"""
        if request_id in self.active_searches:
            del self.active_searches[request_id]


class IncrementalPathfinder:
    def find_path_incremental(self, start, goal, max_nodes_per_step=100):
        """A* that yields control periodically"""
        open_set = []
        heapq.heappush(open_set, (0, start))
        came_from = {}
        g_score = {start: 0}
        nodes_processed = 0

        while open_set:
            current = heapq.heappop(open_set)[1]

            if current == goal:
                # Found path
                return self.reconstruct_path(came_from, current)

            for neighbor in self.get_neighbors(current):
                tentative_g = g_score[current] + self.distance(current, neighbor)

                if neighbor not in g_score or tentative_g < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g
                    f_score = tentative_g + self.heuristic(neighbor, goal)
                    heapq.heappush(open_set, (f_score, neighbor))

            nodes_processed += 1

            # Yield control periodically
            if nodes_processed >= max_nodes_per_step:
                nodes_processed = 0
                yield  # Return control to caller

        return None  # No path found


# Usage
pathfinder = AsyncPathfinder(IncrementalPathfinder(world_map))

# In vehicle AI
def on_path_received(path):
    vehicle.path = path
    vehicle.state = 'following_path'

pathfinder.request_path(
    vehicle.position,
    vehicle.destination,
    on_path_received,
    priority=vehicle.get_priority()
)

# In game loop
def update():
    pathfinder.update()  # Processes requests within time budget
    # ... rest of game logic

Performance Budget Example:

  • 60 FPS = 16.67ms per frame
  • Pathfinding budget: 5ms (30% of frame)
  • Average path: 0.5ms
  • Paths per frame: ~10
  • 1000 vehicles = 100 frames to recalc all (1.67 seconds)

Priority System:

Priority 1: Player-visible vehicles
Priority 2: Vehicles near camera
Priority 3: Vehicles with invalid paths
Priority 4: Periodic refresh
Priority 5: Background traffic

When to use:

  • Any game with 100+ agents needing paths
  • When maintaining 60 FPS is critical
  • Open-world games with dynamic environments

Real-world example: Unity's NavMesh system uses time-sliced pathfinding, spreading calculations across multiple frames.


Common Pitfalls

Pitfall 1: Recalculating Paths Every Frame

Symptom: Frame rate drops to < 5 FPS with 1000 agents.

Why it happens: Developer doesn't realize cost of pathfinding. Puts find_path() in update() loop.

Cost analysis:

A* complexity: O(E log V)
Typical map: 10,000 nodes, 40,000 edges
Cost per path: ~1ms
1000 agents × 1ms × 60 FPS = 60,000ms = 60 seconds per frame!
Result: 0.016 FPS (one frame per minute)

Solution: Path caching, lazy evaluation, request queuing.

# WRONG: Recalculates every frame
class Vehicle:
    def update(self):
        self.path = self.pathfinder.find_path(self.pos, self.dest)  # TERRIBLE!
        self.move_along_path()

# RIGHT: Cache path, only recalculate when needed
class Vehicle:
    def update(self):
        if self._path_needs_recalc():
            self.request_path_recalc()  # Queued, async
        self.move_along_path()

    def _path_needs_recalc(self):
        # Only recalc if path invalid or significantly suboptimal
        if not self.path:
            return True
        if self._path_blocked():
            return True
        if time.time() - self.last_path_time > 60.0:
            return True  # Periodic refresh
        return False

Red flags to watch for:

  • find_path() called in update() or game loop
  • No caching or memoization
  • No check for "does path need recalculation?"

Pitfall 2: No Fallback for Unreachable Destinations

Symptom: Vehicles freeze when destination unreachable. Game state corrupted.

Why it happens: Pathfinding returns None or empty list, vehicle code doesn't handle it.

# WRONG: No error handling
def update(self):
    self.path = find_path(self.pos, self.dest)
    self.move_to(self.path[0])  # IndexError if path is empty!

# RIGHT: Fallback behavior
def update(self):
    if not self.path:
        self.path = find_path(self.pos, self.dest)

    if not self.path:
        # No path exists, fallback behavior
        self._handle_unreachable_destination()
        return

    self.move_to(self.path[0])

def _handle_unreachable_destination(self):
    # Option 1: Find nearest reachable point
    self.dest = find_nearest_reachable(self.dest)

    # Option 2: Change behavior
    self.state = 'idle'

    # Option 3: Remove vehicle
    self.mark_for_removal()

Fallback strategies:

  1. Find nearest reachable point: Pathfind to closest valid destination
  2. Wait and retry: Obstacle might move, try again in N seconds
  3. Change goal: Pick alternate destination
  4. Enter idle state: Stop trying, wait for player input
  5. Despawn: Remove vehicle from simulation (background traffic)

Pitfall 3: Traffic Deadlocks (No Re-routing)

Symptom: Vehicles stuck in gridlock. All routes blocked by waiting vehicles.

Why it happens: Vehicles commit to paths without checking future congestion. No dynamic re-routing.

Example scenario:

Intersection:
   ↓
← + →
   ↑

All four roads filled with vehicles waiting to cross.
Each blocked by vehicle on adjacent road.
Classic deadlock: A waits for B, B waits for C, C waits for D, D waits for A.

Solutions:

  1. Time-space reservations (prevents deadlock before it happens):
class IntersectionManager:
    def __init__(self):
        self.reservations = {}  # (x, y, time) -> vehicle_id

    def request_crossing(self, vehicle, path_through_intersection, current_time):
        # Check if time-space slots are available
        for i, (x, y) in enumerate(path_through_intersection):
            arrival_time = current_time + i * vehicle.time_per_cell

            if (x, y, arrival_time) in self.reservations:
                return False  # Conflict, deny request

        # Reserve slots
        for i, (x, y) in enumerate(path_through_intersection):
            arrival_time = current_time + i * vehicle.time_per_cell
            self.reservations[(x, y, arrival_time)] = vehicle.id

        return True  # Approved
  1. Deadlock detection and breaking:
def detect_deadlock(self, vehicles):
    # Build wait-for graph
    wait_graph = {}
    for v in vehicles:
        if v.state == 'waiting':
            blocking_vehicle = self.get_blocking_vehicle(v)
            if blocking_vehicle:
                wait_graph[v.id] = blocking_vehicle.id

    # Find cycles (deadlocks)
    cycles = self.find_cycles(wait_graph)

    # Break deadlocks: back up lowest-priority vehicle
    for cycle in cycles:
        lowest_priority = min(cycle, key=lambda v: vehicles[v].priority)
        vehicles[lowest_priority].back_up()
        vehicles[lowest_priority].request_reroute()
  1. Traffic light coordination (prevents conflicts):
class TrafficLightController:
    def __init__(self, intersections):
        self.lights = {}
        self.phase_duration = 30.0  # seconds
        self.yellow_duration = 3.0

        # Coordinate lights to create "green waves"
        for i, intersection in enumerate(intersections):
            offset = i * 5.0  # Stagger by 5 seconds
            self.lights[intersection.id] = TrafficLight(offset)

Red flags:

  • No intersection management system
  • Vehicles can occupy same cell
  • No wait timeout (reroute after stuck for N seconds)
  • No deadlock detection

Pitfall 4: Ignoring Congestion (All Choose Same "Optimal" Path)

Symptom: One route heavily congested while alternate routes empty. Vehicles don't adapt.

Why it happens: Pathfinding uses static costs. All vehicles calculate same "shortest" path.

Example: Highway with traffic jam, but parallel road empty.

Start =====[CONGESTED HIGHWAY (1000 vehicles)]===== Dest
       \                                           /
        [EMPTY ALTERNATE ROUTE (0 vehicles)]

All vehicles still route to highway (shorter distance, but longer time)

Solution: Dynamic cost based on current traffic.

def get_edge_cost(self, edge):
    base_cost = edge.length / edge.speed_limit

    # Count vehicles on edge
    vehicle_count = len(edge.vehicles)
    capacity = edge.lanes * 10

    # BPR congestion function
    congestion_ratio = vehicle_count / capacity
    congestion_factor = 1.0 + 0.15 * (congestion_ratio ** 4)

    return base_cost * congestion_factor

# Also: Stochastic routing (add randomness to prevent everyone choosing same path)
def choose_path(self, paths):
    # Instead of always picking shortest, add some randomness
    costs = [self.get_path_cost(p) for p in paths]

    # Softmax selection (prefer shorter, but not exclusively)
    probs = self.softmax([-c for c in costs], temperature=2.0)
    return np.random.choice(paths, p=probs)

Feedback loop prevention:

  • Don't reroute all vehicles simultaneously (causes oscillation)
  • Add hysteresis: only reroute if new path 10-20% better
  • Staggered updates: different vehicles update at different times

Pitfall 5: No Hierarchical Pathfinding (Poor Scaling)

Symptom: Works fine with 100 vehicles on small map. Becomes slideshow with 1000 vehicles on large map.

Why it happens: A* search space grows quadratically with map size. No hierarchical structure.

Complexity comparison:

Small map: 50×50 = 2,500 nodes
- A* average search: ~500 nodes
- 100 vehicles: 50,000 node expansions per frame
- Performance: 60 FPS ✓

Large map: 200×200 = 40,000 nodes
- A* average search: ~10,000 nodes  (not 500!)
- 1000 vehicles: 10,000,000 node expansions per frame
- Performance: 0.5 FPS ✗

Why it's quadratic: Doubling map size quadruples nodes AND doubles average path length.

Solution: Hierarchical pathfinding.

# Cities Skylines approach: 3-level hierarchy
class HierarchicalRoadNetwork:
    def __init__(self):
        self.highways = Graph()      # Level 3: ~100 nodes
        self.arterials = Graph()     # Level 2: ~1,000 nodes
        self.local_roads = Graph()   # Level 1: ~40,000 nodes

    def find_path(self, start, goal):
        # 1. Find nearest highway on-ramps
        start_ramp = self.local_roads.find_nearest_highway_access(start)
        goal_ramp = self.local_roads.find_nearest_highway_access(goal)

        # 2. Route on highway network (fast, only ~100 nodes)
        highway_path = self.highways.find_path(start_ramp, goal_ramp)

        # 3. Local routing to/from ramps
        path = []
        path += self.local_roads.find_path(start, start_ramp)
        path += highway_path
        path += self.local_roads.find_path(goal_ramp, goal)

        return path

When to implement hierarchy:

  • Map > 5,000 nodes
  • Agents travel long distances (> 50% of map)
  • Natural hierarchy exists (highways, arterials, local streets)

Pitfall: Not all games need this! Small RTS maps (50×50) don't benefit from hierarchy overhead.


Real-World Examples

Example 1: Cities Skylines Traffic System

Scale: 100,000+ vehicles, 200×200 tile cities, real-time simulation.

Approach:

  1. Hierarchical road network:

    • Highways (high-level, ~200 nodes)
    • Arterial roads (mid-level, ~2,000 nodes)
    • Local streets (low-level, ~20,000 nodes)
  2. Path caching:

    • Common routes cached (residential → commercial, industrial → highway)
    • Cache invalidated when zoning changes or roads built/destroyed
  3. Dynamic congestion:

    • Updates road costs every 30 ticks based on vehicle density
    • BPR function for realistic congestion curves
    • Vehicles reroute probabilistically (20% check each update)
  4. LOD system:

    • Vehicles far from camera use simplified pathfinding
    • Close vehicles get full path with lane changes
    • Distant vehicles: straight-line movement between districts
  5. Time-sliced pathfinding:

    • Budget: 10ms per frame for pathfinding
    • ~50 path calculations per frame
    • Priority queue: player-visible > near camera > background

Performance: Maintains 30-60 FPS with 10,000+ active vehicles.

Key lesson: Hierarchy + caching + LOD is essential for large-scale traffic.

Example 2: Unity NavMesh (Recast/Detour)

Use case: 3D games with complex terrain (slopes, stairs, platforms).

Approach:

  1. Voxelization: Convert 3D geometry to voxel grid
  2. Heightfield: Identify walkable surfaces
  3. Contour extraction: Find boundaries of walkable areas
  4. Polygon mesh: Generate simplified navigation mesh
  5. A on mesh*: Pathfind on polygons (not voxels)

Benefits:

  • Handles 3D terrain naturally (no grid needed)
  • Efficient: Only walkable areas in graph
  • Industry standard (Unreal, Unity, CryEngine)

Example:

// Unity NavMesh API
NavMeshPath path = new NavMeshPath();
NavMesh.CalculatePath(startPos, endPos, NavMesh.AllAreas, path);

if (path.status == NavMeshPathStatus.PathComplete) {
    agent.SetPath(path);
} else {
    // Fallback: partial path or alternate destination
    agent.SetDestination(FindNearestValidPoint(endPos));
}

Key lesson: For 3D games, NavMesh is almost always better than grid-based pathfinding.

Example 3: Supreme Commander - Flow Fields for Massive Unit Counts

Scale: 1,000+ units moving together in RTS formations.

Problem: Individual A* per unit doesn't scale. 1000 units × 1ms = 1 second per frame.

Solution: Flow fields.

1. Player orders 1000 units to attack enemy base
2. Generate flow field from enemy base (5ms one-time cost)
3. Each unit follows flow field (0.001ms per unit)
4. Total: 5ms + 1ms = 6ms (instead of 1000ms)

Implementation details:

  • Flow field generated on high-level grid (16×16 cells)
  • Local steering for obstacle avoidance (RVO)
  • Update flow field when goal changes or obstacles appear
  • Works with formations: units maintain relative positions while following field

Performance: 1000 units at 60 FPS on 2007 hardware.

Key lesson: For crowds moving to same goal, flow fields are 100× faster than individual pathfinding.

Example 4: Google Maps / Waze - Real-Time Traffic Routing

Scale: Millions of vehicles, continent-scale road networks, real-time updates.

Approach:

  1. Contraction hierarchies: Preprocess road network into hierarchy

    • Fast queries: microseconds for cross-country routes
    • Update on traffic: recompute affected shortcuts
  2. Live traffic data: Crowdsourced vehicle speeds

    • Updates every 1-5 minutes
    • Edge costs = current measured travel time (not distance)
  3. Predictive routing: Machine learning predicts future congestion

    • Route calculated for expected conditions at arrival time
    • "Leave at 5pm" vs "leave now" gives different routes
  4. Alternate routes: Show multiple options with tradeoffs

    • Fastest vs shortest vs avoiding highways
    • Let user choose based on preferences

Key lesson: Real-world traffic routing is a solved problem. Use contraction hierarchies + live data + prediction.

Example 5: Crowd Evacuation Simulation (Real-World Safety)

Use case: Simulating emergency evacuation of stadiums, buildings, cities.

Requirements:

  • 10,000+ people
  • Real-time or faster-than-real-time
  • Accurate crowd dynamics (pushing, bottlenecks)

Approach:

  1. Multi-level pathfinding:

    • Global: Flow field to nearest exit
    • Local: RVO (Reciprocal Velocity Obstacles) for collision avoidance
  2. Bottleneck detection:

    • Monitor flow rate through doorways
    • Detect crushing hazards (density > threshold)
    • Suggest improvements (widen doors, add exits)
  3. Panic modeling:

    • Agents push harder when panicked (higher speed, lower personal space)
    • May ignore alternate routes (follow crowd)
  4. Validation:

    • Compare to real evacuation drills
    • Calibrate agent parameters to match human behavior

Key lesson: Life-safety simulations require validation against real-world data. Can't just implement A* and call it done.


Cross-References

Related Skills

[Performance Optimization] (same skillpack):

  • Profiling pathfinding bottlenecks
  • Memory pooling for path objects
  • Cache-friendly data structures

[Crowd Simulation] (same skillpack):

  • Local steering behaviors (RVO, boids)
  • Formation movement
  • Flocking and swarming

[State Machines] (game-ai skillpack):

  • Vehicle states: idle, pathfinding, following_path, stuck, rerouting
  • State transitions based on path validity

[Spatial Partitioning] (data-structures skillpack):

  • Quadtrees for neighbor queries
  • Spatial hashing for collision detection
  • Grid-based broad phase

External Resources

Academic:

Tools:

  • Recast/Detour: Open-source NavMesh library
  • Unity NavMesh: Built-in pathfinding (Unity)
  • Unreal Navigation System: Built-in (Unreal Engine)

Industry talks:

  • "Killzone 2 AI" (GDC) - hierarchical pathfinding
  • "Supreme Commander: Forged Alliance" (GDC) - flow fields for 1000+ units
  • "Cities Skylines" traffic system (various talks)

Testing Checklist

Use this checklist to verify your pathfinding implementation is production-ready:

Performance Tests

  • Frame budget: Pathfinding stays under 5ms per frame (60 FPS target)
  • Scaling: Test with 10×, 100×, 1000× agent counts. Should degrade gracefully.
  • Large maps: Test on maximum map size. Long paths shouldn't cause hitches.
  • Profiling: Measure time per path calculation. Identify bottlenecks.
  • Memory: No memory leaks. Path objects properly pooled/reused.

Correctness Tests

  • Valid paths: Paths avoid obstacles and stay on walkable terrain
  • Optimal paths: Paths are shortest or near-shortest (within 5% of optimal)
  • Unreachable destinations: Graceful fallback when no path exists
  • Dynamic obstacles: Paths update when obstacles appear/move
  • Multi-level terrain: Works with bridges, overpasses, slopes

Traffic Tests (if applicable)

  • Congestion handling: Vehicles reroute around traffic jams
  • No deadlocks: Vehicles don't get stuck in gridlock
  • Traffic lights: Vehicles respect signals at intersections
  • Lane usage: Multi-lane roads distribute traffic across lanes
  • Merging: Vehicles merge smoothly onto highways

Robustness Tests

  • Edge cases: Empty map, single tile, no valid path, destination = start
  • Stress test: 10,000 agents pathfinding simultaneously
  • Rapid changes: Add/remove obstacles rapidly, paths stay valid
  • Long running: No degradation after 1 hour of simulation
  • Pathological cases: Worst-case scenarios (maze, spiral, etc.)

Quality Tests

  • Visual smoothness: Agents move naturally, not robotic
  • Collision avoidance: Agents don't overlap (unless intended)
  • Formation movement: Groups stay together when moving
  • Responsive: Path recalculation feels immediate (< 100ms perceived latency)
  • Believable: Traffic/crowd behavior looks realistic

Integration Tests

  • Save/load: Paths serialize/deserialize correctly
  • Multiplayer: Deterministic pathfinding (same inputs = same paths)
  • Modding: Expose pathfinding API for modders
  • Debugging: Visualize paths, flow fields, congestion heat maps
  • Configuration: Exposed parameters (search limits, timeouts, etc.)

Summary

Traffic and pathfinding is about intelligent tradeoffs:

  • Exact vs approximate: Not every agent needs perfect paths
  • Computation now vs later: Cache expensive calculations
  • Individual vs group: Flow fields for crowds, A* for individuals
  • Static vs dynamic: Balance path quality with recalculation cost

The most critical insight: Never recalculate paths every frame. This single mistake causes 90% of pathfinding performance problems. Always cache, always queue, always time-slice.

When implementing:

  1. Start with simple A*, measure performance
  2. Add hierarchy if map > 5,000 nodes
  3. Add flow fields if > 50 agents share destination
  4. Add async pathfinding if frame rate drops
  5. Add LOD if camera distance varies

Architecture matters: Separate pathfinding from movement. Vehicle shouldn't know HOW path is calculated, only that it requests a path and receives waypoints. This allows swapping algorithms without changing vehicle code.

Test at scale early: 10 agents works very differently than 1000 agents. Don't optimize prematurely, but don't wait until beta to test scalability.