| name | distributed-workers |
| description | Use when working on worker implementation, ServiceOrchestrator patterns, WorkerAPIBase, operation dispatch, progress tracking, cancellation, backend-to-worker communication, or adding new worker types. |
Distributed Workers Architecture
Load this skill when working on:
- Worker implementation or debugging
- ServiceOrchestrator or WorkerAPIBase patterns
- Operation dispatch, progress tracking, or cancellation
- Backend-to-worker communication
- Adding new worker types
Architecture Overview
KTRDR uses a distributed workers architecture where the backend orchestrates operations across worker nodes:
┌─────────────────────────────────────────────────────────────────┐
│ Backend (Docker Container, Port 8000) │
│ ├─ API Layer (FastAPI) │
│ ├─ Service Orchestrators (NEVER execute operations) │
│ ├─ WorkerRegistry (tracks all workers) │
│ └─ OperationsService (tracks all operations) │
└─────────────────────────────────────────────────────────────────┘
│
├─ HTTP (Worker Registration & Operation Dispatch)
│
┌────┴────┬──────────┬──────────┬─────────────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ ┌──────────────┐
│Backtest││Backtest││Training││Training│ │IB Host Service│
│Worker 1││Worker 2││Worker 1││Worker 2│ │(Port 5001) │
│:5003 ││:5003 ││:5004 ││:5004 │ │Direct IB TCP │
└────────┘└────────┘└────────┘└────────┘ └──────────────┘
CPU-only CPU-only CPU-only CPU-only Direct IB Gateway
┌─────────────────────┐
│Training Host Service│
│(Port 5002) │
│GPU Access (CUDA/MPS)│
└─────────────────────┘
10x-100x faster training
Key Principles
- Backend as Orchestrator Only: Backend does not execute operations locally — it only selects workers and dispatches operations
- Distributed-Only Execution: All backtesting and training operations execute on workers (no local fallback)
- Self-Registering Workers: Workers push-register with backend on startup (infrastructure-agnostic)
- GPU-First Routing: Training operations prefer GPU workers (10x-100x faster) with CPU worker fallback
- Horizontal Scalability: Add more workers = more concurrent operations
Worker Types
| Worker | Location | Purpose | Scalability |
|---|---|---|---|
| Backtest Workers | Containerized, CPU | Execute backtesting | Horizontal |
| Training Workers | Containerized, CPU | Training fallback | Horizontal |
| Training Host Service | Native, GPU | GPU training (priority) | Limited by hardware |
| IB Host Service | Native | IB Gateway access | Single instance |
ServiceOrchestrator Pattern
Location: ktrdr/async_infrastructure/service_orchestrator.py
All service managers inherit from ServiceOrchestrator:
class DataAcquisitionService(ServiceOrchestrator):
def __init__(self):
# Reads USE_IB_HOST_SERVICE env var
self.provider = self._initialize_provider()
async def download_data(self, ...):
# Unified async pattern with progress tracking
return await self._execute_with_progress(...)
Features provided:
- Environment-based configuration
- Adapter initialization (local vs. host service routing)
- Unified async operations with progress tracking
- Cancellation token support
- Operations service integration
WorkerAPIBase Pattern
Location: ktrdr/workers/base.py
All workers inherit from WorkerAPIBase and get these features for free:
- OperationsService singleton — Worker-local operation tracking
- Operations proxy endpoints:
GET /api/v1/operations/{id}— Get operation statusGET /api/v1/operations/{id}/metrics— Get operation metricsGET /api/v1/operations— List operationsDELETE /api/v1/operations/{id}/cancel— Cancel operation
- Health endpoint — Reports busy/idle status (
GET /health) - FastAPI app with CORS — Ready for Docker communication
- Self-registration — Automatic registration with backend on startup
Key Pattern Elements
- Operation ID Synchronization: Accepts optional
task_idfrom backend, returns sameoperation_id - Progress Tracking: Workers register progress bridges in their OperationsService
- Remote Queryability: Backend can query worker's operations endpoints directly (1s cache TTL)
- Push-Based Registration: Workers call
POST /workers/registeron startup
Example Implementation
class BacktestWorker(WorkerAPIBase):
def __init__(self, worker_port=5003, backend_url="http://backend:8000"):
super().__init__(
worker_type=WorkerType.BACKTESTING,
operation_type=OperationType.BACKTESTING,
worker_port=worker_port,
backend_url=backend_url,
)
# Register domain-specific endpoint
@self.app.post("/backtests/start")
async def start_backtest(request: BacktestStartRequest):
operation_id = request.task_id or f"worker_backtest_{uuid.uuid4().hex[:12]}"
result = await self._execute_backtest_work(operation_id, request)
return {"success": True, "operation_id": operation_id, **result}
Worker Implementations
BacktestWorker (
ktrdr/backtesting/backtest_worker.py):- Adds
/backtests/startendpoint - Calls BacktestingEngine directly via
asyncio.to_thread - Registers BacktestProgressBridge
- Adds
TrainingWorker (
ktrdr/training/training_worker.py):- Adds
/training/startendpoint - Calls TrainingManager directly (async)
- Simplified progress tracking
- Adds
Host Service Integration
IB Host Service (uses environment variables)
USE_IB_HOST_SERVICE=true
IB_HOST_SERVICE_URL=http://localhost:5001 # default
Why separate: IB Gateway requires direct TCP connection (Docker networking limitation)
Training & Backtesting (uses WorkerRegistry)
Environment flags REMOVED in Phase 5.3:
- ❌
USE_TRAINING_HOST_SERVICE - ❌
REMOTE_BACKTEST_SERVICE_URL
Now uses WorkerRegistry:
- Workers self-register with backend on startup
- Backend selects available workers automatically
- GPU workers register with
gpu: truecapability (prioritized) - CPU workers register as fallback
Starting Workers
# Docker Compose (development)
docker-compose up -d --scale backtest-worker=5 --scale training-worker=3
# Training Host Service (GPU, runs natively)
cd training-host-service && ./start.sh
# Workers self-register at:
# - Backtest: http://localhost:5003
# - Training (CPU): http://localhost:5004
# - Training (GPU): http://localhost:5002
Verification
# Check registered workers
curl http://localhost:8000/api/v1/workers | jq
# Expected: All workers show as AVAILABLE with proper capabilities
Cancellation Tokens
Location: ktrdr/async_infrastructure/cancellation.py
All long-running operations support cancellation:
from ktrdr.async_infrastructure.cancellation import create_cancellation_token
token = create_cancellation_token()
# In operation loop
if token.is_cancelled():
raise asyncio.CancelledError()
- Create tokens with
create_cancellation_token() - Check with
token.is_cancelled() - Operations service manages tokens globally
- CLI displays cancellation status
Async Operations Pattern (CLI)
All CLI commands use AsyncCLIClient for API communication:
from ktrdr.cli.helpers.async_cli_client import AsyncCLIClient
async def some_command(symbol: str):
async with AsyncCLIClient() as client:
result = await client.post("/endpoint", json=data)
Progress display: Use GenericProgressManager with ProgressRenderer for live updates
Documentation
- Architecture: docs/architecture-overviews/distributed-workers.md
- Developer Guide: docs/developer/distributed-workers-guide.md
- Deployment: docs/user-guides/deployment.md