Claude Code Plugins

Community-maintained marketplace

Feedback

executor-architecture

@chaingraphlabs/chaingraph
11
0

Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md


name: executor-architecture description: Executor package architecture for ChainGraph flow execution engine. Use when working on packages/chaingraph-executor, execution services, DBOS workflows, event bus, task queues, tRPC routes, or execution-related database operations. Triggers: executor, execution, service, worker, queue, event bus, dbos, workflow, tRPC execution, execution-api, execution-worker.

ChainGraph Executor Architecture

This skill provides architectural guidance for the @badaitech/chaingraph-executor package - the execution engine that runs ChainGraph flows with durable execution via DBOS.

Package Overview

Location: packages/chaingraph-executor/ Purpose: Flow execution engine with DBOS durable execution Key Feature: Exactly-once execution semantics with automatic recovery

Directory Structure

packages/chaingraph-executor/
├── server/
│   ├── index.ts                    # Main exports
│   │
│   ├── dbos/                       # DBOS durable execution ⭐
│   │   ├── config.ts               # DBOS initialization
│   │   ├── DBOSExecutionWorker.ts  # Worker lifecycle
│   │   ├── queue.ts                # Queue management
│   │   ├── workflows/
│   │   │   └── ExecutionWorkflows.ts  # Main orchestration
│   │   └── steps/
│   │       ├── ExecuteFlowAtomicStep.ts  # Core execution
│   │       └── UpdateStatusStep.ts  # Status updates
│   │
│   ├── services/                   # Business logic layer
│   │   ├── ExecutionService.ts     # Execution instance management
│   │   ├── RecoveryService.ts      # Failure recovery
│   │   └── ServiceFactory.ts       # Service initialization
│   │
│   ├── implementations/            # Interface implementations
│   │   ├── dbos/
│   │   │   ├── DBOSEventBus.ts     # DBOS event streaming
│   │   │   └── DBOSTaskQueue.ts    # DBOS task queue
│   │   └── local/
│   │       ├── InMemoryEventBus.ts # Dev/test event bus
│   │       └── InMemoryTaskQueue.ts
│   │
│   ├── interfaces/                 # Abstract interfaces
│   │   ├── IEventBus.ts            # Event streaming contract
│   │   └── ITaskQueue.ts           # Task queue contract
│   │
│   ├── stores/                     # Data access layer
│   │   ├── execution-store.ts      # Execution CRUD
│   │   ├── flow-store.ts           # Flow loading
│   │   └── postgres/
│   │       ├── schema.ts           # Drizzle schema
│   │       └── postgres-execution-store.ts
│   │
│   ├── trpc/                       # API layer
│   │   ├── router.ts               # tRPC procedures
│   │   └── context.ts              # Request context
│   │
│   └── utils/                      # Utilities
│       ├── config.ts               # Environment config
│       ├── db.ts                   # Database connection
│       └── logger.ts               # Logging
│
├── client/                         # tRPC client exports
└── types/                          # TypeScript types

Architecture Layers

┌────────────────────────────────────────────────────────────┐
│ Layer 1: API (tRPC)                                         │
│ ├─ create()    → Start execution workflow                   │
│ ├─ start()     → Send START_SIGNAL                          │
│ ├─ stop()      → Cancel workflow                            │
│ ├─ pause()     → Send PAUSE command                         │
│ └─ subscribeToExecutionEvents() → Stream events             │
├────────────────────────────────────────────────────────────┤
│ Layer 2: Services                                           │
│ ├─ ExecutionService   → Instance management                 │
│ ├─ RecoveryService    → Failure recovery                    │
│ └─ ServiceFactory     → Dependency injection                │
├────────────────────────────────────────────────────────────┤
│ Layer 3: DBOS (Durable Execution)                           │
│ ├─ ExecutionWorkflow  → Orchestration + child spawning      │
│ └─ ExecuteFlowAtomicStep → Core flow execution              │
├────────────────────────────────────────────────────────────┤
│ Layer 4: Implementations                                    │
│ ├─ DBOSEventBus       → PostgreSQL event streaming          │
│ └─ DBOSTaskQueue      → PostgreSQL task queue               │
├────────────────────────────────────────────────────────────┤
│ Layer 5: Stores                                             │
│ ├─ ExecutionStore     → Execution row CRUD                  │
│ └─ FlowStore          → Flow definition loading             │
└────────────────────────────────────────────────────────────┘

Two Execution Modes

The executor supports two modes controlled by ENABLE_DBOS_EXECUTION:

DBOS Mode (Production)

ENABLE_DBOS_EXECUTION=true

Features:
├─ Exactly-once execution via workflow IDs
├─ Automatic recovery from failures
├─ Real-time event streaming via PostgreSQL
├─ Durable task queue (no Kafka needed)
└─ DBOS Admin UI at localhost:3022

Legacy/Local Mode (Development)

ENABLE_DBOS_EXECUTION=false

Features:
├─ In-memory event bus
├─ In-memory task queue
├─ Simpler debugging
└─ No durability guarantees

Key Files

File Purpose Critical?
server/dbos/workflows/ExecutionWorkflows.ts Main orchestration ⭐⭐⭐
server/dbos/steps/ExecuteFlowAtomicStep.ts Core execution step ⭐⭐⭐
server/services/ExecutionService.ts Instance management ⭐⭐
server/services/ServiceFactory.ts Service initialization ⭐⭐
server/implementations/dbos/DBOSEventBus.ts Event streaming ⭐⭐
server/trpc/router.ts API procedures ⭐⭐
server/stores/postgres/schema.ts Database schema
server/utils/config.ts Environment config

Execution Lifecycle

1. CREATE (tRPC)
   └─ ExecutionRow inserted → Workflow started
   └─ Workflow writes EXECUTION_CREATED event
   └─ Workflow waits for START_SIGNAL

2. SUBSCRIBE (tRPC)
   └─ Client subscribes to DBOS stream
   └─ Immediately receives EXECUTION_CREATED

3. START (tRPC)
   └─ Sends START_SIGNAL via DBOS.send()
   └─ Workflow continues

4. EXECUTE (Workflow)
   └─ Step 1: updateToRunning()
   └─ Step 2: executeFlowAtomic()
   │   └─ Load flow from DB
   │   └─ Create execution instance
   │   └─ Execute flow (up to 30min)
   │   └─ Stream events in real-time
   │   └─ Collect child tasks
   └─ Step 3: Spawn children
   └─ Step 4: updateToCompleted()

5. COMPLETE
   └─ DBOS auto-closes event stream
   └─ Client receives all events

Service Layer

ExecutionService

Manages execution instances with event streaming setup:

// server/services/ExecutionService.ts
class ExecutionService {
  // Create execution instance with event handling
  async createExecutionInstance(params: {
    task: ExecutionTask
    flow: Flow
    executionRow: ExecutionRow
    abortController: AbortController
  }): Promise<ExecutionInstance>

  // Get event bus (DBOS or InMemory based on config)
  getEventBus(): IEventBus

  // Setup event handling (connects engine events → event bus)
  setupEventHandling(instance: ExecutionInstance): () => Promise<void>
}

ServiceFactory

Initializes all services with proper dependency injection:

// server/services/ServiceFactory.ts
async function initializeServices(): Promise<Services> {
  // 1. Create event bus (DBOS or InMemory)
  const eventBus = config.dbos.enabled
    ? new DBOSEventBus()
    : new InMemoryEventBus()

  // 2. Create task queue
  const taskQueue = config.dbos.enabled
    ? new DBOSTaskQueue()
    : new InMemoryTaskQueue()

  // 3. Create execution service
  const executionService = new ExecutionService(eventBus, taskQueue)

  // 4. Initialize DBOS steps (dependency injection)
  initializeExecuteFlowStep(executionService, executionStore)

  return { eventBus, taskQueue, executionService }
}

tRPC Router

File: server/trpc/router.ts

export const executionRouter = router({
  // Create execution (starts workflow immediately)
  create: procedure
    .input(CreateExecutionInput)
    .mutation(async ({ input }) => {
      // 1. Create execution row in DB
      // 2. Start DBOS workflow (writes EXECUTION_CREATED)
      // 3. Return executionId
    }),

  // Start execution (sends START_SIGNAL)
  start: procedure
    .input(z.object({ executionId: z.string() }))
    .mutation(async ({ input }) => {
      await DBOS.send(input.executionId, 'API', 'START_SIGNAL')
    }),

  // Subscribe to execution events (real-time streaming)
  subscribeToExecutionEvents: procedure
    .input(z.object({ executionId: z.string(), fromIndex: z.number() }))
    .subscription(async function* ({ input }) {
      // Yields events from DBOS stream
      for await (const event of DBOS.readStream(input.executionId, 'events')) {
        yield event
      }
    }),

  // Control commands
  pause: procedure.mutation(...),
  resume: procedure.mutation(...),
  stop: procedure.mutation(...),
})

Database Schema

File: server/stores/postgres/schema.ts

export const executions = pgTable('executions', {
  id: text('id').primaryKey(),              // EX123...
  flowId: text('flow_id').notNull(),
  ownerId: text('owner_id').notNull(),
  status: executionStatusEnum('status').notNull(),

  // Hierarchy
  rootExecutionId: text('root_execution_id'),
  parentExecutionId: text('parent_execution_id'),
  executionDepth: integer('execution_depth').default(0),

  // Timestamps
  createdAt: timestamp('created_at').notNull(),
  startedAt: timestamp('started_at'),
  completedAt: timestamp('completed_at'),

  // Error tracking
  errorMessage: text('error_message'),
  errorNodeId: text('error_node_id'),

  // Recovery
  failureCount: integer('failure_count').default(0),
  lastFailureAt: timestamp('last_failure_at'),

  // Context
  options: jsonb('options'),
  integration: jsonb('integration'),        // archai context
  externalEvents: jsonb('external_events'), // events for children
})

Environment Variables

# DBOS Mode
ENABLE_DBOS_EXECUTION=true

# Database
DATABASE_URL_EXECUTIONS=postgres://...

# DBOS Configuration
DBOS_ADMIN_ENABLED=true
DBOS_ADMIN_PORT=3022
DBOS_QUEUE_CONCURRENCY=100
DBOS_WORKER_CONCURRENCY=5

# Execution Limits
EXECUTION_MAX_DEPTH=100
EXECUTION_DEFAULT_TIMEOUT_MS=3600000  # 1 hour

Quick Reference

Need Where File
Add new tRPC procedure API layer server/trpc/router.ts
Modify execution logic DBOS step server/dbos/steps/ExecuteFlowAtomicStep.ts
Add orchestration logic DBOS workflow server/dbos/workflows/ExecutionWorkflows.ts
Change event streaming Implementation server/implementations/dbos/DBOSEventBus.ts
Modify schema Store server/stores/postgres/schema.ts
Add service Service layer server/services/

Related Skills

  • dbos-patterns - CRITICAL DBOS constraints and patterns
  • chaingraph-concepts - Core domain concepts (Flow, Node, Port)
  • subscription-sync - Event streaming architecture
  • types-architecture - Execution types and events
  • trpc-execution - Execution tRPC procedures (API layer)
  • trpc-patterns - General tRPC framework patterns