Claude Code Plugins

Community-maintained marketplace

Feedback

distributed-workers

@kpiteira/ktrdr
1
0

Use when working on worker implementation, ServiceOrchestrator patterns, WorkerAPIBase, operation dispatch, progress tracking, cancellation, backend-to-worker communication, or adding new worker types.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name distributed-workers
description Use when working on worker implementation, ServiceOrchestrator patterns, WorkerAPIBase, operation dispatch, progress tracking, cancellation, backend-to-worker communication, or adding new worker types.

Distributed Workers Architecture

Load this skill when working on:

  • Worker implementation or debugging
  • ServiceOrchestrator or WorkerAPIBase patterns
  • Operation dispatch, progress tracking, or cancellation
  • Backend-to-worker communication
  • Adding new worker types

Architecture Overview

KTRDR uses a distributed workers architecture where the backend orchestrates operations across worker nodes:

┌─────────────────────────────────────────────────────────────────┐
│ Backend (Docker Container, Port 8000)                          │
│  ├─ API Layer (FastAPI)                                        │
│  ├─ Service Orchestrators (NEVER execute operations)           │
│  ├─ WorkerRegistry (tracks all workers)                        │
│  └─ OperationsService (tracks all operations)                  │
└─────────────────────────────────────────────────────────────────┘
         │
         ├─ HTTP (Worker Registration & Operation Dispatch)
         │
    ┌────┴────┬──────────┬──────────┬─────────────┐
    │         │          │          │             │
    ▼         ▼          ▼          ▼             ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐   ┌──────────────┐
│Backtest││Backtest││Training││Training│   │IB Host Service│
│Worker 1││Worker 2││Worker 1││Worker 2│   │(Port 5001)   │
│:5003   ││:5003   ││:5004   ││:5004   │   │Direct IB TCP │
└────────┘└────────┘└────────┘└────────┘   └──────────────┘
CPU-only  CPU-only  CPU-only  CPU-only    Direct IB Gateway

         ┌─────────────────────┐
         │Training Host Service│
         │(Port 5002)          │
         │GPU Access (CUDA/MPS)│
         └─────────────────────┘
         10x-100x faster training

Key Principles

  1. Backend as Orchestrator Only: Backend does not execute operations locally — it only selects workers and dispatches operations
  2. Distributed-Only Execution: All backtesting and training operations execute on workers (no local fallback)
  3. Self-Registering Workers: Workers push-register with backend on startup (infrastructure-agnostic)
  4. GPU-First Routing: Training operations prefer GPU workers (10x-100x faster) with CPU worker fallback
  5. Horizontal Scalability: Add more workers = more concurrent operations

Worker Types

Worker Location Purpose Scalability
Backtest Workers Containerized, CPU Execute backtesting Horizontal
Training Workers Containerized, CPU Training fallback Horizontal
Training Host Service Native, GPU GPU training (priority) Limited by hardware
IB Host Service Native IB Gateway access Single instance

ServiceOrchestrator Pattern

Location: ktrdr/async_infrastructure/service_orchestrator.py

All service managers inherit from ServiceOrchestrator:

class DataAcquisitionService(ServiceOrchestrator):
    def __init__(self):
        # Reads USE_IB_HOST_SERVICE env var
        self.provider = self._initialize_provider()

    async def download_data(self, ...):
        # Unified async pattern with progress tracking
        return await self._execute_with_progress(...)

Features provided:

  • Environment-based configuration
  • Adapter initialization (local vs. host service routing)
  • Unified async operations with progress tracking
  • Cancellation token support
  • Operations service integration

WorkerAPIBase Pattern

Location: ktrdr/workers/base.py

All workers inherit from WorkerAPIBase and get these features for free:

  1. OperationsService singleton — Worker-local operation tracking
  2. Operations proxy endpoints:
    • GET /api/v1/operations/{id} — Get operation status
    • GET /api/v1/operations/{id}/metrics — Get operation metrics
    • GET /api/v1/operations — List operations
    • DELETE /api/v1/operations/{id}/cancel — Cancel operation
  3. Health endpoint — Reports busy/idle status (GET /health)
  4. FastAPI app with CORS — Ready for Docker communication
  5. Self-registration — Automatic registration with backend on startup

Key Pattern Elements

  • Operation ID Synchronization: Accepts optional task_id from backend, returns same operation_id
  • Progress Tracking: Workers register progress bridges in their OperationsService
  • Remote Queryability: Backend can query worker's operations endpoints directly (1s cache TTL)
  • Push-Based Registration: Workers call POST /workers/register on startup

Example Implementation

class BacktestWorker(WorkerAPIBase):
    def __init__(self, worker_port=5003, backend_url="http://backend:8000"):
        super().__init__(
            worker_type=WorkerType.BACKTESTING,
            operation_type=OperationType.BACKTESTING,
            worker_port=worker_port,
            backend_url=backend_url,
        )

        # Register domain-specific endpoint
        @self.app.post("/backtests/start")
        async def start_backtest(request: BacktestStartRequest):
            operation_id = request.task_id or f"worker_backtest_{uuid.uuid4().hex[:12]}"
            result = await self._execute_backtest_work(operation_id, request)
            return {"success": True, "operation_id": operation_id, **result}

Worker Implementations

  • BacktestWorker (ktrdr/backtesting/backtest_worker.py):

    • Adds /backtests/start endpoint
    • Calls BacktestingEngine directly via asyncio.to_thread
    • Registers BacktestProgressBridge
  • TrainingWorker (ktrdr/training/training_worker.py):

    • Adds /training/start endpoint
    • Calls TrainingManager directly (async)
    • Simplified progress tracking

Host Service Integration

IB Host Service (uses environment variables)

USE_IB_HOST_SERVICE=true
IB_HOST_SERVICE_URL=http://localhost:5001  # default

Why separate: IB Gateway requires direct TCP connection (Docker networking limitation)

Training & Backtesting (uses WorkerRegistry)

Environment flags REMOVED in Phase 5.3:

  • USE_TRAINING_HOST_SERVICE
  • REMOTE_BACKTEST_SERVICE_URL

Now uses WorkerRegistry:

  • Workers self-register with backend on startup
  • Backend selects available workers automatically
  • GPU workers register with gpu: true capability (prioritized)
  • CPU workers register as fallback

Starting Workers

# Docker Compose (development)
docker-compose up -d --scale backtest-worker=5 --scale training-worker=3

# Training Host Service (GPU, runs natively)
cd training-host-service && ./start.sh

# Workers self-register at:
# - Backtest: http://localhost:5003
# - Training (CPU): http://localhost:5004
# - Training (GPU): http://localhost:5002

Verification

# Check registered workers
curl http://localhost:8000/api/v1/workers | jq

# Expected: All workers show as AVAILABLE with proper capabilities

Cancellation Tokens

Location: ktrdr/async_infrastructure/cancellation.py

All long-running operations support cancellation:

from ktrdr.async_infrastructure.cancellation import create_cancellation_token

token = create_cancellation_token()

# In operation loop
if token.is_cancelled():
    raise asyncio.CancelledError()
  • Create tokens with create_cancellation_token()
  • Check with token.is_cancelled()
  • Operations service manages tokens globally
  • CLI displays cancellation status

Async Operations Pattern (CLI)

All CLI commands use AsyncCLIClient for API communication:

from ktrdr.cli.helpers.async_cli_client import AsyncCLIClient

async def some_command(symbol: str):
    async with AsyncCLIClient() as client:
        result = await client.post("/endpoint", json=data)

Progress display: Use GenericProgressManager with ProgressRenderer for live updates


Documentation