Claude Code Plugins

Community-maintained marketplace

Feedback
1
0

Background job processing patterns including job queues, scheduled jobs, worker pools, and retry strategies. Use when implementing async processing, Celery, Bull, Sidekiq, cron jobs, task queues, job monitoring, or worker management.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name background-jobs
description Background job processing patterns including job queues, scheduled jobs, worker pools, and retry strategies. Use when implementing async processing, Celery, Bull, Sidekiq, cron jobs, task queues, job monitoring, or worker management.

Background Jobs

Overview

Background jobs enable asynchronous processing of tasks outside the request-response cycle. This skill covers job queue patterns, scheduling, worker management, retry strategies, and monitoring for reliable task execution across different frameworks and languages.

Key Concepts

Job Queue Patterns

Bull Queue (Node.js/Redis):

import Queue, { Job, JobOptions } from "bull";
import { Redis } from "ioredis";

// Queue configuration
interface QueueConfig {
  name: string;
  redis: Redis;
  defaultJobOptions?: JobOptions;
}

// Job data interfaces
interface EmailJobData {
  to: string;
  subject: string;
  template: string;
  context: Record<string, unknown>;
}

interface ImageProcessingJobData {
  imageId: string;
  operations: Array<{
    type: "resize" | "crop" | "compress";
    params: Record<string, unknown>;
  }>;
}

// Queue factory
function createQueue<T>(config: QueueConfig): Queue.Queue<T> {
  const queue = new Queue<T>(config.name, {
    createClient: (type) => {
      switch (type) {
        case "client":
          return config.redis.duplicate();
        case "subscriber":
          return config.redis.duplicate();
        case "bclient":
          return config.redis.duplicate();
        default:
          return config.redis.duplicate();
      }
    },
    defaultJobOptions: {
      removeOnComplete: 100, // Keep last 100 completed jobs
      removeOnFail: 1000, // Keep last 1000 failed jobs
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 2000,
      },
      ...config.defaultJobOptions,
    },
  });

  // Global error handler
  queue.on("error", (error) => {
    console.error(`Queue ${config.name} error:`, error);
  });

  return queue;
}

// Email queue with typed processor
const emailQueue = createQueue<EmailJobData>({
  name: "email",
  redis: new Redis(process.env.REDIS_URL),
});

// Define processor
emailQueue.process(async (job: Job<EmailJobData>) => {
  const { to, subject, template, context } = job.data;

  // Update progress
  await job.progress(10);

  // Render template
  const html = await renderTemplate(template, context);
  await job.progress(50);

  // Send email
  await emailService.send({ to, subject, html });
  await job.progress(100);

  return { sent: true, messageId: `msg_${Date.now()}` };
});

// Add job with options
async function sendEmail(
  data: EmailJobData,
  options?: JobOptions,
): Promise<Job<EmailJobData>> {
  return emailQueue.add(data, {
    priority: options?.priority || 0,
    delay: options?.delay || 0,
    jobId: options?.jobId, // For deduplication
    ...options,
  });
}

// Bulk job addition
async function sendBulkEmails(
  emails: EmailJobData[],
): Promise<Job<EmailJobData>[]> {
  const jobs = emails.map((data, index) => ({
    data,
    opts: {
      jobId: `bulk_${Date.now()}_${index}`,
    },
  }));

  return emailQueue.addBulk(jobs);
}

Celery (Python):

from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError
from typing import Any, Dict, Optional
import logging

# Celery configuration
app = Celery('tasks')
app.config_from_object({
    'broker_url': 'redis://localhost:6379/0',
    'result_backend': 'redis://localhost:6379/1',
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json'],
    'timezone': 'UTC',
    'task_track_started': True,
    'task_time_limit': 300,  # 5 minutes hard limit
    'task_soft_time_limit': 240,  # 4 minutes soft limit
    'worker_prefetch_multiplier': 4,
    'task_acks_late': True,  # Acknowledge after task completes
    'task_reject_on_worker_lost': True,
})

logger = logging.getLogger(__name__)

# Base task with retry logic
class BaseTask(Task):
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 3}
    retry_backoff = True
    retry_backoff_max = 600  # 10 minutes max
    retry_jitter = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f'Task {self.name}[{task_id}] failed: {exc}')

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f'Task {self.name}[{task_id}] retrying: {exc}')

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'Task {self.name}[{task_id}] succeeded')

# Email task
@app.task(base=BaseTask, bind=True, name='send_email')
def send_email(
    self,
    to: str,
    subject: str,
    template: str,
    context: Dict[str, Any]
) -> Dict[str, Any]:
    try:
        # Update state
        self.update_state(state='PROGRESS', meta={'progress': 10})

        # Render template
        html = render_template(template, context)
        self.update_state(state='PROGRESS', meta={'progress': 50})

        # Send email
        message_id = email_service.send(to=to, subject=subject, html=html)
        self.update_state(state='PROGRESS', meta={'progress': 100})

        return {'sent': True, 'message_id': message_id}

    except ConnectionError as exc:
        raise self.retry(exc=exc, countdown=60)

# Image processing with chaining
@app.task(base=BaseTask, bind=True, name='process_image')
def process_image(self, image_id: str, operations: list) -> Dict[str, Any]:
    image = load_image(image_id)

    for i, op in enumerate(operations):
        progress = int((i + 1) / len(operations) * 100)
        self.update_state(state='PROGRESS', meta={'progress': progress, 'operation': op['type']})

        if op['type'] == 'resize':
            image = resize_image(image, **op['params'])
        elif op['type'] == 'crop':
            image = crop_image(image, **op['params'])
        elif op['type'] == 'compress':
            image = compress_image(image, **op['params'])

    url = save_image(image, image_id)
    return {'url': url, 'operations_count': len(operations)}

# Task chaining example
from celery import chain, group, chord

def process_order(order_id: str):
    """Process order with chained tasks."""
    workflow = chain(
        validate_order.s(order_id),
        reserve_inventory.s(),
        process_payment.s(),
        send_confirmation.s(),
    )
    return workflow.apply_async()

def process_bulk_images(image_ids: list):
    """Process multiple images in parallel, then aggregate results."""
    workflow = chord(
        group(process_image.s(img_id, [{'type': 'resize', 'params': {'width': 800}}])
              for img_id in image_ids),
        aggregate_results.s()
    )
    return workflow.apply_async()

Sidekiq (Ruby):

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
  config.death_handlers << ->(job, ex) do
    # Handle job failure
    ErrorReporter.report(ex, job: job)
  end
end

Sidekiq.configure_client do |config|
  config.redis = { url: ENV['REDIS_URL'], network_timeout: 5 }
end

# app/workers/email_worker.rb
class EmailWorker
  include Sidekiq::Worker

  sidekiq_options queue: :default,
                  retry: 5,
                  backtrace: true,
                  dead: true

  sidekiq_retry_in do |count, exception|
    # Exponential backoff: 1, 8, 27, 64, 125 seconds
    (count + 1) ** 3
  end

  sidekiq_retries_exhausted do |msg, exception|
    Rails.logger.error "Job #{msg['jid']} exhausted retries: #{exception.message}"
    DeadJobNotifier.notify(msg, exception)
  end

  def perform(to, subject, template, context)
    html = ApplicationController.render(
      template: template,
      locals: context.symbolize_keys
    )

    EmailService.send(to: to, subject: subject, html: html)
  end
end

# app/workers/batch_worker.rb
class BatchWorker
  include Sidekiq::Worker

  def perform(batch_id)
    batch = Batch.find(batch_id)

    batch.items.find_each do |item|
      ItemProcessor.perform_async(item.id)
    end
  end
end

# Using Sidekiq Batches (Pro feature)
class ImportWorker
  include Sidekiq::Worker

  def perform(import_id)
    import = Import.find(import_id)

    batch = Sidekiq::Batch.new
    batch.description = "Import #{import_id}"
    batch.on(:complete, ImportCallbacks, import_id: import_id)

    batch.jobs do
      import.rows.each_with_index do |row, index|
        ImportRowWorker.perform_async(import_id, index, row)
      end
    end
  end
end

class ImportCallbacks
  def on_complete(status, options)
    import = Import.find(options['import_id'])

    if status.failures.zero?
      import.update!(status: 'completed')
    else
      import.update!(status: 'completed_with_errors', error_count: status.failures)
    end
  end
end

Scheduled Jobs and Cron Patterns

// Bull scheduler
import Queue from "bull";

const scheduledQueue = new Queue("scheduled-tasks", process.env.REDIS_URL);

// Repeatable jobs
async function setupScheduledJobs(): Promise<void> {
  // Clean up every hour
  await scheduledQueue.add(
    "cleanup",
    {},
    {
      repeat: { cron: "0 * * * *" }, // Every hour
      jobId: "cleanup-hourly",
    },
  );

  // Daily report at 9 AM
  await scheduledQueue.add(
    "daily-report",
    {},
    {
      repeat: { cron: "0 9 * * *" },
      jobId: "daily-report",
    },
  );

  // Every 5 minutes
  await scheduledQueue.add(
    "health-check",
    {},
    {
      repeat: { every: 5 * 60 * 1000 }, // 5 minutes in ms
      jobId: "health-check",
    },
  );

  // Weekly on Sunday at midnight
  await scheduledQueue.add(
    "weekly-cleanup",
    {},
    {
      repeat: { cron: "0 0 * * 0" },
      jobId: "weekly-cleanup",
    },
  );
}

// Process scheduled jobs
scheduledQueue.process("cleanup", async (job) => {
  await cleanupOldRecords();
  return { cleaned: true };
});

scheduledQueue.process("daily-report", async (job) => {
  const report = await generateDailyReport();
  await sendReportEmail(report);
  return { reportId: report.id };
});

// List scheduled jobs
async function getScheduledJobs(): Promise<
  Array<{ name: string; next: Date; cron: string }>
> {
  const repeatableJobs = await scheduledQueue.getRepeatableJobs();

  return repeatableJobs.map((job) => ({
    name: job.name,
    next: new Date(job.next),
    cron: job.cron || `Every ${job.every}ms`,
  }));
}

// Remove scheduled job
async function removeScheduledJob(jobId: string): Promise<void> {
  const jobs = await scheduledQueue.getRepeatableJobs();
  const job = jobs.find((j) => j.id === jobId);

  if (job) {
    await scheduledQueue.removeRepeatableByKey(job.key);
  }
}
# Celery Beat scheduler
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')

app.conf.beat_schedule = {
    # Every hour
    'cleanup-hourly': {
        'task': 'tasks.cleanup',
        'schedule': crontab(minute=0),  # Every hour at minute 0
    },

    # Daily at 9 AM
    'daily-report': {
        'task': 'tasks.daily_report',
        'schedule': crontab(hour=9, minute=0),
    },

    # Every 5 minutes
    'health-check': {
        'task': 'tasks.health_check',
        'schedule': 300.0,  # 5 minutes in seconds
    },

    # Weekly on Sunday at midnight
    'weekly-cleanup': {
        'task': 'tasks.weekly_cleanup',
        'schedule': crontab(hour=0, minute=0, day_of_week=0),
    },

    # First day of month at 6 AM
    'monthly-report': {
        'task': 'tasks.monthly_report',
        'schedule': crontab(hour=6, minute=0, day_of_month=1),
    },

    # With arguments
    'check-expiring-subscriptions': {
        'task': 'tasks.check_subscriptions',
        'schedule': crontab(hour=8, minute=0),
        'args': ('expiring',),
        'kwargs': {'days_ahead': 7},
    },
}

# Dynamic schedule with database
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

def create_scheduled_task(name: str, task: str, cron: str, args: list = None, kwargs: dict = None):
    """Create a scheduled task dynamically."""
    # Parse cron expression
    minute, hour, day_of_month, month, day_of_week = cron.split()

    schedule, _ = CrontabSchedule.objects.get_or_create(
        minute=minute,
        hour=hour,
        day_of_month=day_of_month,
        month_of_year=month,
        day_of_week=day_of_week,
    )

    PeriodicTask.objects.update_or_create(
        name=name,
        defaults={
            'task': task,
            'crontab': schedule,
            'args': json.dumps(args or []),
            'kwargs': json.dumps(kwargs or {}),
            'enabled': True,
        },
    )

Worker Pool Management

import Queue, { Job } from "bull";
import os from "os";

interface WorkerPoolConfig {
  concurrency: number;
  limiter?: {
    max: number;
    duration: number;
  };
}

class WorkerPool {
  private queues: Map<string, Queue.Queue> = new Map();
  private isShuttingDown = false;

  constructor(private config: WorkerPoolConfig) {
    // Graceful shutdown
    process.on("SIGTERM", () => this.shutdown());
    process.on("SIGINT", () => this.shutdown());
  }

  registerQueue<T>(
    name: string,
    processor: (job: Job<T>) => Promise<unknown>,
  ): Queue.Queue<T> {
    const queue = new Queue<T>(name, process.env.REDIS_URL!, {
      limiter: this.config.limiter,
    });

    // Process with concurrency
    queue.process(this.config.concurrency, async (job: Job<T>) => {
      if (this.isShuttingDown) {
        throw new Error("Worker shutting down");
      }
      return processor(job);
    });

    // Event handlers
    queue.on("completed", (job, result) => {
      console.log(`Job ${job.id} completed:`, result);
    });

    queue.on("failed", (job, err) => {
      console.error(`Job ${job?.id} failed:`, err);
    });

    queue.on("stalled", (job) => {
      console.warn(`Job ${job} stalled`);
    });

    this.queues.set(name, queue);
    return queue;
  }

  async shutdown(): Promise<void> {
    console.log("Initiating graceful shutdown...");
    this.isShuttingDown = true;

    // Stop accepting new jobs
    const closePromises = Array.from(this.queues.values()).map(
      async (queue) => {
        await queue.pause(true); // Pause and wait for active jobs
        await queue.close();
      },
    );

    await Promise.all(closePromises);
    console.log("All queues closed");
    process.exit(0);
  }

  async getStats(): Promise<Record<string, QueueStats>> {
    const stats: Record<string, QueueStats> = {};

    for (const [name, queue] of this.queues) {
      const [waiting, active, completed, failed, delayed] = await Promise.all([
        queue.getWaitingCount(),
        queue.getActiveCount(),
        queue.getCompletedCount(),
        queue.getFailedCount(),
        queue.getDelayedCount(),
      ]);

      stats[name] = { waiting, active, completed, failed, delayed };
    }

    return stats;
  }
}

interface QueueStats {
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
}

// Usage
const pool = new WorkerPool({
  concurrency: os.cpus().length,
  limiter: {
    max: 100, // Max 100 jobs
    duration: 1000, // Per second
  },
});

pool.registerQueue<EmailJobData>("email", async (job) => {
  await sendEmail(job.data);
});

pool.registerQueue<ImageProcessingJobData>("images", async (job) => {
  await processImage(job.data);
});
# Celery worker management
from celery import Celery
from celery.signals import worker_process_init, worker_shutdown
import multiprocessing

app = Celery('tasks')

# Worker configuration
app.conf.update(
    worker_concurrency=multiprocessing.cpu_count(),
    worker_prefetch_multiplier=2,  # Prefetch 2 tasks per worker
    worker_max_tasks_per_child=1000,  # Restart worker after 1000 tasks
    worker_max_memory_per_child=200000,  # 200MB limit
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

# Per-worker initialization
@worker_process_init.connect
def init_worker(**kwargs):
    """Initialize resources for each worker process."""
    # Initialize database connection pool
    db.connect()
    # Warm up caches
    cache.warm_up()

@worker_shutdown.connect
def cleanup_worker(**kwargs):
    """Clean up resources on worker shutdown."""
    db.close()
    cache.flush()

# Task routing for specialized workers
app.conf.task_routes = {
    'tasks.send_email': {'queue': 'email'},
    'tasks.process_image': {'queue': 'images'},
    'tasks.heavy_computation': {'queue': 'compute'},
    'tasks.*': {'queue': 'default'},
}

# Queue-specific worker command:
# celery -A tasks worker -Q email --concurrency=4
# celery -A tasks worker -Q images --concurrency=2
# celery -A tasks worker -Q compute --concurrency=1

# Auto-scaling with Celery
app.conf.worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
app.conf.worker_autoscale_max = 10
app.conf.worker_autoscale_min = 2

Job Priorities and Fairness

// Priority queues with Bull
interface PriorityJobData {
  type: string;
  payload: unknown;
  priority: "critical" | "high" | "normal" | "low";
}

const priorityMap = {
  critical: 1, // Highest priority (processed first)
  high: 5,
  normal: 10,
  low: 20,
};

async function addPriorityJob(
  data: PriorityJobData,
): Promise<Job<PriorityJobData>> {
  return queue.add(data, {
    priority: priorityMap[data.priority],
    // Critical jobs don't wait
    delay: data.priority === "critical" ? 0 : undefined,
  });
}

// Fair scheduling with multiple queues
class FairScheduler {
  private queues: Map<string, Queue.Queue> = new Map();
  private weights: Map<string, number> = new Map();

  constructor(queueConfigs: Array<{ name: string; weight: number }>) {
    for (const config of queueConfigs) {
      const queue = new Queue(config.name, process.env.REDIS_URL!);
      this.queues.set(config.name, queue);
      this.weights.set(config.name, config.weight);
    }
  }

  // Weighted round-robin processing
  async process(
    handler: (queueName: string, job: Job) => Promise<void>,
  ): Promise<void> {
    const totalWeight = Array.from(this.weights.values()).reduce(
      (a, b) => a + b,
      0,
    );

    for (const [name, queue] of this.queues) {
      const weight = this.weights.get(name)!;
      const concurrency = Math.max(1, Math.floor((weight / totalWeight) * 10));

      queue.process(concurrency, async (job) => {
        await handler(name, job);
      });
    }
  }
}

// Usage: Process premium customers first
const scheduler = new FairScheduler([
  { name: "premium", weight: 5 }, // 50% of capacity
  { name: "standard", weight: 3 }, // 30% of capacity
  { name: "free", weight: 2 }, // 20% of capacity
]);

await scheduler.process(async (queueName, job) => {
  console.log(`Processing ${queueName} job:`, job.id);
  await processJob(job);
});

Idempotency and Retry Strategies

import Queue, { Job, JobOptions } from "bull";
import { createHash } from "crypto";

// Idempotency key generation
function generateIdempotencyKey(data: unknown): string {
  const hash = createHash("sha256");
  hash.update(JSON.stringify(data));
  return hash.digest("hex");
}

// Idempotent job processor
class IdempotentProcessor<T> {
  private processedKeys: Set<string> = new Set();
  private redis: Redis;

  constructor(
    private queue: Queue.Queue<T>,
    redis: Redis,
  ) {
    this.redis = redis;
  }

  async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
    this.queue.process(async (job: Job<T>) => {
      const idempotencyKey = job.opts.jobId || generateIdempotencyKey(job.data);

      // Check if already processed
      const existing = await this.redis.get(`processed:${idempotencyKey}`);
      if (existing) {
        console.log(`Job ${job.id} already processed, skipping`);
        return JSON.parse(existing);
      }

      // Process job
      const result = await handler(job);

      // Mark as processed with TTL
      await this.redis.setex(
        `processed:${idempotencyKey}`,
        86400, // 24 hours
        JSON.stringify(result),
      );

      return result;
    });
  }
}

// Custom retry strategies
interface RetryStrategy {
  type: "exponential" | "linear" | "fixed" | "custom";
  baseDelay: number;
  maxDelay?: number;
  maxRetries: number;
  jitter?: boolean;
  retryOn?: (error: Error) => boolean;
}

function calculateDelay(strategy: RetryStrategy, attempt: number): number {
  let delay: number;

  switch (strategy.type) {
    case "exponential":
      delay = strategy.baseDelay * Math.pow(2, attempt - 1);
      break;
    case "linear":
      delay = strategy.baseDelay * attempt;
      break;
    case "fixed":
      delay = strategy.baseDelay;
      break;
    default:
      delay = strategy.baseDelay;
  }

  // Apply max delay cap
  if (strategy.maxDelay) {
    delay = Math.min(delay, strategy.maxDelay);
  }

  // Add jitter (up to 20% variation)
  if (strategy.jitter) {
    const jitterFactor = 0.8 + Math.random() * 0.4; // 0.8 to 1.2
    delay = Math.floor(delay * jitterFactor);
  }

  return delay;
}

// Retry with dead letter queue
class RetryableQueue<T> {
  private mainQueue: Queue.Queue<T>;
  private dlq: Queue.Queue<T>;
  private strategy: RetryStrategy;

  constructor(name: string, strategy: RetryStrategy) {
    this.mainQueue = new Queue<T>(name, process.env.REDIS_URL!);
    this.dlq = new Queue<T>(`${name}-dlq`, process.env.REDIS_URL!);
    this.strategy = strategy;
  }

  async process(handler: (job: Job<T>) => Promise<unknown>): Promise<void> {
    this.mainQueue.process(async (job: Job<T>) => {
      const attempts = job.attemptsMade;

      try {
        return await handler(job);
      } catch (error) {
        const err = error as Error;

        // Check if error is retryable
        if (this.strategy.retryOn && !this.strategy.retryOn(err)) {
          await this.moveToDLQ(job, err);
          throw err;
        }

        // Check max retries
        if (attempts >= this.strategy.maxRetries) {
          await this.moveToDLQ(job, err);
          throw err;
        }

        // Retry with calculated delay
        const delay = calculateDelay(this.strategy, attempts + 1);
        throw new Error(`Retry in ${delay}ms: ${err.message}`);
      }
    });
  }

  private async moveToDLQ(job: Job<T>, error: Error): Promise<void> {
    await this.dlq.add({
      originalJob: job.data,
      error: error.message,
      failedAt: new Date().toISOString(),
      attempts: job.attemptsMade,
    } as unknown as T);
  }

  async retryFromDLQ(jobId: string): Promise<void> {
    const job = await this.dlq.getJob(jobId);
    if (!job) return;

    const dlqData = job.data as unknown as { originalJob: T };
    await this.mainQueue.add(dlqData.originalJob);
    await job.remove();
  }
}

Job Monitoring and Dead Jobs

import Queue, { Job, JobCounts, JobStatus } from "bull";
import { EventEmitter } from "events";

interface JobMetrics {
  queue: string;
  counts: JobCounts;
  latency: {
    avg: number;
    p50: number;
    p95: number;
    p99: number;
  };
  throughput: number; // jobs per minute
  errorRate: number;
}

class JobMonitor extends EventEmitter {
  private queues: Queue.Queue[] = [];
  private metricsHistory: Map<string, number[]> = new Map();

  addQueue(queue: Queue.Queue): void {
    this.queues.push(queue);

    queue.on("completed", (job, result) => {
      this.recordMetric(queue.name, "completed", job);
      this.emit("job:completed", { queue: queue.name, job, result });
    });

    queue.on("failed", (job, err) => {
      this.recordMetric(queue.name, "failed", job!);
      this.emit("job:failed", { queue: queue.name, job, error: err });

      // Alert on high failure rate
      this.checkErrorRate(queue.name);
    });

    queue.on("stalled", (job) => {
      this.emit("job:stalled", { queue: queue.name, jobId: job });
    });
  }

  private recordMetric(queueName: string, type: string, job: Job): void {
    const duration = Date.now() - job.timestamp;
    const key = `${queueName}:${type}:duration`;

    const history = this.metricsHistory.get(key) || [];
    history.push(duration);

    // Keep last 1000 samples
    if (history.length > 1000) {
      history.shift();
    }

    this.metricsHistory.set(key, history);
  }

  private checkErrorRate(queueName: string): void {
    const completed =
      this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
    const failed =
      this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;

    if (completed + failed > 10) {
      const errorRate = failed / (completed + failed);
      if (errorRate > 0.1) {
        // > 10% error rate
        this.emit("alert:high_error_rate", { queue: queueName, errorRate });
      }
    }
  }

  async getMetrics(queueName: string): Promise<JobMetrics> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`Queue ${queueName} not found`);

    const counts = await queue.getJobCounts();
    const durations =
      this.metricsHistory.get(`${queueName}:completed:duration`) || [];

    return {
      queue: queueName,
      counts,
      latency: this.calculateLatencyPercentiles(durations),
      throughput: this.calculateThroughput(durations),
      errorRate: this.calculateErrorRate(queueName),
    };
  }

  private calculateLatencyPercentiles(
    durations: number[],
  ): JobMetrics["latency"] {
    if (durations.length === 0) {
      return { avg: 0, p50: 0, p95: 0, p99: 0 };
    }

    const sorted = [...durations].sort((a, b) => a - b);
    const avg = sorted.reduce((a, b) => a + b, 0) / sorted.length;

    return {
      avg: Math.round(avg),
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)],
    };
  }

  private calculateThroughput(durations: number[]): number {
    // Jobs completed in last minute
    const oneMinuteAgo = Date.now() - 60000;
    const recentJobs = durations.filter((_, i) => i > durations.length - 100);
    return recentJobs.length;
  }

  private calculateErrorRate(queueName: string): number {
    const completed =
      this.metricsHistory.get(`${queueName}:completed:duration`)?.length || 0;
    const failed =
      this.metricsHistory.get(`${queueName}:failed:duration`)?.length || 0;
    const total = completed + failed;
    return total > 0 ? failed / total : 0;
  }

  // Dead job management
  async getDeadJobs(queueName: string, limit: number = 100): Promise<Job[]> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`Queue ${queueName} not found`);

    return queue.getFailed(0, limit);
  }

  async retryDeadJob(queueName: string, jobId: string): Promise<void> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`Queue ${queueName} not found`);

    const job = await queue.getJob(jobId);
    if (!job) throw new Error(`Job ${jobId} not found`);

    await job.retry();
  }

  async retryAllDeadJobs(queueName: string): Promise<number> {
    const deadJobs = await this.getDeadJobs(queueName);
    let retried = 0;

    for (const job of deadJobs) {
      try {
        await job.retry();
        retried++;
      } catch (error) {
        console.error(`Failed to retry job ${job.id}:`, error);
      }
    }

    return retried;
  }

  async cleanDeadJobs(
    queueName: string,
    olderThan: number = 86400000,
  ): Promise<number> {
    const queue = this.queues.find((q) => q.name === queueName);
    if (!queue) throw new Error(`Queue ${queueName} not found`);

    const cleaned = await queue.clean(olderThan, "failed");
    return cleaned.length;
  }
}

// Dashboard API endpoints
import express from "express";

function createMonitoringRouter(monitor: JobMonitor): express.Router {
  const router = express.Router();

  router.get("/queues/:name/metrics", async (req, res) => {
    try {
      const metrics = await monitor.getMetrics(req.params.name);
      res.json(metrics);
    } catch (error) {
      res.status(404).json({ error: (error as Error).message });
    }
  });

  router.get("/queues/:name/dead", async (req, res) => {
    const limit = parseInt(req.query.limit as string) || 100;
    const jobs = await monitor.getDeadJobs(req.params.name, limit);
    res.json(
      jobs.map((j) => ({
        id: j.id,
        data: j.data,
        failedReason: j.failedReason,
        attemptsMade: j.attemptsMade,
        timestamp: j.timestamp,
      })),
    );
  });

  router.post("/queues/:name/dead/:jobId/retry", async (req, res) => {
    try {
      await monitor.retryDeadJob(req.params.name, req.params.jobId);
      res.json({ success: true });
    } catch (error) {
      res.status(400).json({ error: (error as Error).message });
    }
  });

  router.post("/queues/:name/dead/retry-all", async (req, res) => {
    const retried = await monitor.retryAllDeadJobs(req.params.name);
    res.json({ retried });
  });

  router.delete("/queues/:name/dead", async (req, res) => {
    const olderThan = parseInt(req.query.olderThan as string) || 86400000;
    const cleaned = await monitor.cleanDeadJobs(req.params.name, olderThan);
    res.json({ cleaned });
  });

  return router;
}

Best Practices

  1. Idempotency

    • Design jobs to be safely re-executed
    • Use unique job IDs for deduplication
    • Store processed state externally
  2. Retry Strategies

    • Use exponential backoff with jitter
    • Set maximum retry limits
    • Distinguish between retryable and non-retryable errors
  3. Monitoring

    • Track queue depths and processing latency
    • Alert on high error rates or growing queues
    • Monitor worker health and memory usage
  4. Graceful Shutdown

    • Complete in-progress jobs before shutdown
    • Use signals (SIGTERM, SIGINT) properly
    • Set reasonable timeouts for job completion
  5. Resource Management

    • Set appropriate concurrency limits
    • Use worker pools for CPU-bound tasks
    • Implement rate limiting for external APIs

Examples

Complete Worker Service

import Queue, { Job } from "bull";
import { Redis } from "ioredis";

interface WorkerConfig {
  queues: Array<{
    name: string;
    concurrency: number;
    processor: (job: Job) => Promise<unknown>;
  }>;
  redis: Redis;
  shutdownTimeout: number;
}

class WorkerService {
  private queues: Map<string, Queue.Queue> = new Map();
  private isShuttingDown = false;
  private activeJobs = 0;

  constructor(private config: WorkerConfig) {}

  async start(): Promise<void> {
    // Setup queues
    for (const queueConfig of this.config.queues) {
      const queue = new Queue(queueConfig.name, {
        createClient: () => this.config.redis.duplicate(),
      });

      queue.process(queueConfig.concurrency, async (job) => {
        if (this.isShuttingDown) {
          throw new Error("Worker shutting down");
        }

        this.activeJobs++;
        try {
          return await queueConfig.processor(job);
        } finally {
          this.activeJobs--;
        }
      });

      this.queues.set(queueConfig.name, queue);
    }

    // Setup graceful shutdown
    process.on("SIGTERM", () => this.shutdown());
    process.on("SIGINT", () => this.shutdown());

    console.log("Worker service started");
  }

  private async shutdown(): Promise<void> {
    if (this.isShuttingDown) return;
    this.isShuttingDown = true;

    console.log("Shutting down worker service...");

    // Pause all queues
    await Promise.all(
      Array.from(this.queues.values()).map((q) => q.pause(true)),
    );

    // Wait for active jobs to complete
    const startTime = Date.now();
    while (
      this.activeJobs > 0 &&
      Date.now() - startTime < this.config.shutdownTimeout
    ) {
      await new Promise((r) => setTimeout(r, 100));
    }

    if (this.activeJobs > 0) {
      console.warn(`Forcing shutdown with ${this.activeJobs} active jobs`);
    }

    // Close all queues
    await Promise.all(Array.from(this.queues.values()).map((q) => q.close()));

    console.log("Worker service stopped");
    process.exit(0);
  }
}

// Usage
const worker = new WorkerService({
  redis: new Redis(process.env.REDIS_URL),
  shutdownTimeout: 30000,
  queues: [
    {
      name: "email",
      concurrency: 5,
      processor: async (job) => {
        await sendEmail(job.data);
      },
    },
    {
      name: "images",
      concurrency: 2,
      processor: async (job) => {
        await processImage(job.data);
      },
    },
  ],
});

worker.start();