Claude Code Plugins

Community-maintained marketplace

Feedback

tzurot-async-flow

@lbds137/tzurot
7
0

BullMQ and async patterns for Tzurot v3 - Job queue architecture, Discord interaction deferral, idempotency, retry strategies, and error handling. Use when working with jobs or async operations.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name tzurot-async-flow
description BullMQ and async patterns for Tzurot v3 - Job queue architecture, Discord interaction deferral, idempotency, retry strategies, and error handling. Use when working with jobs or async operations.
lastUpdated 2025-12-20

Tzurot v3 Async Flow & Job Queue

Use this skill when: Creating jobs, processing queue tasks, handling Discord interactions, implementing retry logic, or managing async operations.

Quick Reference

// Queue setup (api-gateway)
import { Queue } from 'bullmq';
const aiQueue = new Queue('ai-jobs', { connection });

// Worker setup (ai-worker)
import { Worker } from 'bullmq';
const worker = new Worker('ai-jobs', processJob, { connection, concurrency: 5 });

// Discord deferral (bot-client)
await interaction.deferReply();
const result = await fetch('/ai/generate');
await interaction.editReply({ content: result.content });

Architecture Flow

Discord Interaction → bot-client defers (3s window)
  → api-gateway creates BullMQ job
  → ai-worker picks up job
  → ai-worker returns result
  → bot-client sends webhook reply

🚨 Discord Deferral (CRITICAL)

Discord requires response within 3 seconds. AI calls take longer.

// MUST defer IMMEDIATELY, then process
await interaction.deferReply();

const response = await fetch(`${GATEWAY_URL}/ai/generate`, {
  method: 'POST',
  body: JSON.stringify(requestData),
});

await interaction.editReply({ content: result.content });

Job Patterns

Job Naming

// Use prefixes from common-types
import { JOB_PREFIXES } from '@tzurot/common-types';
const jobId = `${JOB_PREFIXES.LLM_GENERATION}${requestId}`;

Job Processor Structure

export async function processLLMGeneration(
  job: Job<LLMGenerationJobData>
): Promise<AIGenerationResponse> {
  await job.updateProgress(10);
  const personality = await personalityService.getPersonality(job.data.personalityId);

  await job.updateProgress(50);
  const response = await aiProvider.generateResponse({
    /* ... */
  });

  await job.updateProgress(100);
  return { content: response.content };
}

Queue Configuration

const aiQueue = new Queue('ai-jobs', {
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: { count: 10, age: 24 * 3600 },
    removeOnFail: { count: 50, age: 7 * 24 * 3600 },
  },
});

Idempotency

Prevent duplicate requests with Redis:

async isDuplicate(requestId: string): Promise<boolean> {
  const result = await redis.set(`dedup:${requestId}`, '1', 'EX', 5, 'NX');
  return result === null; // null = key already existed
}

Retry Strategy

Retryable errors: Network timeouts, rate limits (429), server errors (5xx) Non-retryable: Validation errors (400), not found (404), auth (401)

async function withRetry<T>(fn: () => Promise<T>, maxAttempts = 3): Promise<T> {
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxAttempts || !isRetryableError(error)) throw error;
      await new Promise(r => setTimeout(r, 2000 * Math.pow(2, attempt - 1)));
    }
  }
}

Timer Patterns

✅ OK Patterns

// Request timeouts
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 30000);

// One-time delays
await new Promise(resolve => setTimeout(resolve, delayMs));

❌ Scaling Blockers (avoid)

// Persistent intervals prevent horizontal scaling
this.cleanupInterval = setInterval(() => this.cleanup(), 60000);

✅ Alternatives

Use BullMQ repeatable jobs instead:

await queue.add('cleanup-cache', {}, { repeat: { every: 60000 } });

Waiting for Job Completion

async function waitForJobCompletion(jobId: string, timeoutMs: number): Promise<any> {
  return new Promise((resolve, reject) => {
    const timeout = setTimeout(() => reject(new Error('Timeout')), timeoutMs);

    const onCompleted = (args: { jobId: string; returnvalue: any }) => {
      if (args.jobId !== jobId) return;
      clearTimeout(timeout);
      queueEvents.off('completed', onCompleted);
      resolve(args.returnvalue);
    };

    queueEvents.on('completed', onCompleted);
  });
}

Monitoring

app.get('/metrics', async (req, res) => {
  const [waiting, active, failed] = await Promise.all([
    aiQueue.getWaitingCount(),
    aiQueue.getActiveCount(),
    aiQueue.getFailedCount(),
  ]);
  res.json({ queue: { waiting, active, failed } });
});

Related Skills

  • tzurot-architecture - Async workflow design
  • tzurot-observability - Job logging and correlation IDs
  • tzurot-types - Job data type definitions
  • tzurot-security - Signed payloads for job verification

References

  • BullMQ docs: https://docs.bullmq.io/
  • Queue constants: packages/common-types/src/constants/queue.ts
  • Job processors: services/ai-worker/src/jobs/