Claude Code Plugins

Community-maintained marketplace

Feedback

batch-processing-jobs

@aj-geddes/useful-ai-prompts
4
0

Implement robust batch processing systems with job queues, schedulers, background tasks, and distributed workers. Use when processing large datasets, scheduled tasks, async operations, or resource-intensive computations.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name batch-processing-jobs
description Implement robust batch processing systems with job queues, schedulers, background tasks, and distributed workers. Use when processing large datasets, scheduled tasks, async operations, or resource-intensive computations.

Batch Processing Jobs

Overview

Implement scalable batch processing systems for handling large-scale data processing, scheduled tasks, and async operations efficiently.

When to Use

  • Processing large datasets
  • Scheduled report generation
  • Email/notification campaigns
  • Data imports and exports
  • Image/video processing
  • ETL pipelines
  • Cleanup and maintenance tasks
  • Long-running computations
  • Bulk data updates

Architecture Patterns

┌─────────────┐      ┌─────────────┐      ┌──────────┐
│  Producer   │─────▶│    Queue    │─────▶│  Worker  │
└─────────────┘      └─────────────┘      └──────────┘
                           │                     │
                           │                     ▼
                           │              ┌──────────┐
                           └─────────────▶│  Result  │
                                         │  Storage │
                                         └──────────┘

Implementation Examples

1. Bull Queue (Node.js)

import Queue from 'bull';
import { v4 as uuidv4 } from 'uuid';

interface JobData {
  id: string;
  type: string;
  payload: any;
  userId?: string;
  metadata?: Record<string, any>;
}

interface JobResult {
  success: boolean;
  data?: any;
  error?: string;
  processedAt: number;
  duration: number;
}

class BatchProcessor {
  private queue: Queue.Queue<JobData>;
  private resultQueue: Queue.Queue<JobResult>;

  constructor(redisUrl: string) {
    // Main processing queue
    this.queue = new Queue('batch-jobs', redisUrl, {
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000
        },
        removeOnComplete: 1000,
        removeOnFail: 5000,
        timeout: 300000 // 5 minutes
      },
      settings: {
        maxStalledCount: 2,
        stalledInterval: 30000
      }
    });

    // Results queue
    this.resultQueue = new Queue('batch-results', redisUrl);

    this.setupProcessors();
    this.setupEvents();
  }

  private setupProcessors(): void {
    // Data processing job
    this.queue.process('process-data', 10, async (job) => {
      const startTime = Date.now();
      const { payload } = job.data;

      job.log(`Processing ${payload.items?.length || 0} items`);

      try {
        // Update progress
        await job.progress(0);

        const results = await this.processDataBatch(
          payload.items,
          (progress) => job.progress(progress)
        );

        const duration = Date.now() - startTime;

        return {
          success: true,
          data: results,
          processedAt: Date.now(),
          duration
        };
      } catch (error: any) {
        const duration = Date.now() - startTime;
        throw new Error(`Processing failed: ${error.message}`);
      }
    });

    // Report generation job
    this.queue.process('generate-report', 2, async (job) => {
      const { payload } = job.data;

      const report = await this.generateReport(
        payload.type,
        payload.filters,
        payload.format
      );

      return {
        success: true,
        data: {
          reportId: uuidv4(),
          url: report.url,
          size: report.size
        },
        processedAt: Date.now(),
        duration: 0
      };
    });

    // Email batch job
    this.queue.process('send-emails', 5, async (job) => {
      const { payload } = job.data;
      const { recipients, template, data } = payload;

      const results = await this.sendEmailBatch(
        recipients,
        template,
        data
      );

      return {
        success: true,
        data: {
          sent: results.successful,
          failed: results.failed
        },
        processedAt: Date.now(),
        duration: 0
      };
    });
  }

  private setupEvents(): void {
    this.queue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed:`, result);

      // Store result
      this.resultQueue.add({
        jobId: job.id,
        ...result
      });
    });

    this.queue.on('failed', (job, error) => {
      console.error(`Job ${job?.id} failed:`, error.message);

      // Store failure
      this.resultQueue.add({
        jobId: job?.id,
        success: false,
        error: error.message,
        processedAt: Date.now(),
        duration: 0
      });
    });

    this.queue.on('progress', (job, progress) => {
      console.log(`Job ${job.id} progress: ${progress}%`);
    });

    this.queue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled`);
    });
  }

  async addJob(
    type: string,
    payload: any,
    options?: Queue.JobOptions
  ): Promise<Queue.Job<JobData>> {
    const jobData: JobData = {
      id: uuidv4(),
      type,
      payload,
      metadata: {
        createdAt: Date.now()
      }
    };

    return this.queue.add(type, jobData, options);
  }

  async addBulkJobs(
    jobs: Array<{ type: string; payload: any; options?: Queue.JobOptions }>
  ): Promise<Queue.Job<JobData>[]> {
    const bulkData = jobs.map(({ type, payload, options }) => ({
      name: type,
      data: {
        id: uuidv4(),
        type,
        payload,
        metadata: { createdAt: Date.now() }
      },
      opts: options || {}
    }));

    return this.queue.addBulk(bulkData);
  }

  async scheduleJob(
    type: string,
    payload: any,
    cronExpression: string
  ): Promise<Queue.Job<JobData>> {
    return this.addJob(type, payload, {
      repeat: {
        cron: cronExpression
      }
    });
  }

  private async processDataBatch(
    items: any[],
    onProgress: (progress: number) => Promise<void>
  ): Promise<any[]> {
    const results = [];
    const total = items.length;

    for (let i = 0; i < total; i++) {
      const result = await this.processItem(items[i]);
      results.push(result);

      // Update progress
      const progress = Math.round(((i + 1) / total) * 100);
      await onProgress(progress);
    }

    return results;
  }

  private async processItem(item: any): Promise<any> {
    // Simulate processing
    await new Promise(resolve => setTimeout(resolve, 100));
    return { ...item, processed: true };
  }

  private async generateReport(
    type: string,
    filters: any,
    format: string
  ): Promise<any> {
    // Simulate report generation
    return {
      url: `https://cdn.example.com/reports/${uuidv4()}.${format}`,
      size: 1024 * 1024
    };
  }

  private async sendEmailBatch(
    recipients: string[],
    template: string,
    data: any
  ): Promise<{ successful: number; failed: number }> {
    // Simulate email sending
    return {
      successful: recipients.length,
      failed: 0
    };
  }

  async getJobStatus(jobId: string): Promise<any> {
    const job = await this.queue.getJob(jobId);
    if (!job) return null;

    const state = await job.getState();
    const logs = await this.queue.getJobLogs(jobId);

    return {
      id: job.id,
      name: job.name,
      data: job.data,
      state,
      progress: job.progress(),
      attempts: job.attemptsMade,
      failedReason: job.failedReason,
      finishedOn: job.finishedOn,
      processedOn: job.processedOn,
      logs: logs.logs
    };
  }

  async getQueueStats(): Promise<any> {
    const [
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    ] = await Promise.all([
      this.queue.getWaitingCount(),
      this.queue.getActiveCount(),
      this.queue.getCompletedCount(),
      this.queue.getFailedCount(),
      this.queue.getDelayedCount(),
      this.queue.getPausedCount()
    ]);

    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    };
  }

  async pause(): Promise<void> {
    await this.queue.pause();
  }

  async resume(): Promise<void> {
    await this.queue.resume();
  }

  async clean(grace: number = 0): Promise<void> {
    await this.queue.clean(grace, 'completed');
    await this.queue.clean(grace, 'failed');
  }

  async close(): Promise<void> {
    await this.queue.close();
    await this.resultQueue.close();
  }
}

// Usage
const processor = new BatchProcessor('redis://localhost:6379');

// Add single job
const job = await processor.addJob('process-data', {
  items: [{ id: 1 }, { id: 2 }, { id: 3 }]
});

// Add bulk jobs
await processor.addBulkJobs([
  {
    type: 'process-data',
    payload: { items: [/* ... */] }
  },
  {
    type: 'generate-report',
    payload: { type: 'sales', format: 'pdf' }
  }
]);

// Schedule recurring job
await processor.scheduleJob(
  'generate-report',
  { type: 'daily-summary' },
  '0 0 * * *' // Daily at midnight
);

// Check status
const status = await processor.getJobStatus(job.id!);
console.log('Job status:', status);

// Get queue stats
const stats = await processor.getQueueStats();
console.log('Queue stats:', stats);

2. Celery-Style Worker (Python)

from celery import Celery, Task
from celery.schedules import crontab
from typing import List, Any, Dict
import time
import logging

# Initialize Celery
app = Celery(
    'batch_processor',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

# Configure Celery
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,  # 5 minutes
    task_soft_time_limit=270,  # 4.5 minutes
    worker_prefetch_multiplier=4,
    worker_max_tasks_per_child=1000,
)

# Periodic tasks
app.conf.beat_schedule = {
    'daily-report': {
        'task': 'tasks.generate_daily_report',
        'schedule': crontab(hour=0, minute=0),
    },
    'cleanup-old-data': {
        'task': 'tasks.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0),
    },
}

logger = logging.getLogger(__name__)


class CallbackTask(Task):
    """Base task with callback support."""

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f"Task {task_id} succeeded: {retval}")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f"Task {task_id} failed: {exc}")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f"Task {task_id} retrying: {exc}")


@app.task(base=CallbackTask, bind=True, max_retries=3)
def process_batch_data(self, items: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Process batch of data items."""
    try:
        results = []
        total = len(items)

        for i, item in enumerate(items):
            # Process item
            result = process_single_item(item)
            results.append(result)

            # Update progress
            progress = int((i + 1) / total * 100)
            self.update_state(
                state='PROGRESS',
                meta={'current': i + 1, 'total': total, 'percent': progress}
            )

        return {
            'processed': len(results),
            'success': True,
            'results': results
        }

    except Exception as exc:
        logger.error(f"Batch processing failed: {exc}")
        raise self.retry(exc=exc, countdown=60)  # Retry after 1 minute


@app.task
def process_single_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """Process single item."""
    # Simulate processing
    time.sleep(0.1)
    return {
        'id': item.get('id'),
        'processed': True,
        'timestamp': time.time()
    }


@app.task(bind=True)
def generate_report(
    self,
    report_type: str,
    filters: Dict[str, Any],
    format: str = 'pdf'
) -> Dict[str, str]:
    """Generate report."""
    logger.info(f"Generating {report_type} report in {format} format")

    self.update_state(state='PROGRESS', meta={'step': 'gathering_data'})
    # Gather data
    time.sleep(2)

    self.update_state(state='PROGRESS', meta={'step': 'processing'})
    # Process data
    time.sleep(2)

    self.update_state(state='PROGRESS', meta={'step': 'generating'})
    # Generate report
    time.sleep(2)

    return {
        'report_id': f"report-{int(time.time())}",
        'url': f"https://cdn.example.com/reports/report.{format}",
        'format': format
    }


@app.task
def send_email_batch(
    recipients: List[str],
    template: str,
    context: Dict[str, Any]
) -> Dict[str, int]:
    """Send batch of emails."""
    successful = 0
    failed = 0

    for recipient in recipients:
        try:
            send_email(recipient, template, context)
            successful += 1
        except Exception as e:
            logger.error(f"Failed to send email to {recipient}: {e}")
            failed += 1

    return {
        'successful': successful,
        'failed': failed,
        'total': len(recipients)
    }


@app.task
def generate_daily_report():
    """Scheduled task: Generate daily report."""
    logger.info("Generating daily report")
    generate_report.delay('daily', {}, 'pdf')


@app.task
def cleanup_old_data():
    """Scheduled task: Clean up old data."""
    logger.info("Cleaning up old data")
    # Cleanup logic here


def send_email(recipient: str, template: str, context: Dict[str, Any]):
    """Send single email."""
    logger.info(f"Sending email to {recipient}")
    # Email sending logic


# Task chaining and grouping
from celery import chain, group, chord

def process_in_chunks(items: List[Any], chunk_size: int = 100):
    """Process items in parallel chunks."""
    chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]

    # Process chunks in parallel
    job = group(process_batch_data.s(chunk) for chunk in chunks)
    result = job.apply_async()

    return result


def process_with_callback(items: List[Any]):
    """Process items and call callback when done."""
    callback = send_notification.s()
    header = group(process_batch_data.s(chunk) for chunk in [items])

    # Use chord to call callback after all tasks complete
    job = chord(header)(callback)
    return job


@app.task
def send_notification(results):
    """Callback task after batch processing."""
    logger.info(f"All tasks completed: {len(results)} results")


# Usage examples
if __name__ == '__main__':
    # Enqueue task
    result = process_batch_data.delay([
        {'id': 1, 'value': 'a'},
        {'id': 2, 'value': 'b'}
    ])

    # Check task status
    print(f"Task ID: {result.id}")
    print(f"Status: {result.status}")

    # Wait for result (blocking)
    final_result = result.get(timeout=10)
    print(f"Result: {final_result}")

    # Process in chunks
    items = [{'id': i} for i in range(1000)]
    chunk_result = process_in_chunks(items, chunk_size=100)

    # Check group result
    print(f"Chunks: {len(chunk_result)}")

3. Cron Job Scheduler

import cron from 'node-cron';

interface ScheduledJob {
  name: string;
  schedule: string;
  handler: () => Promise<void>;
  enabled: boolean;
  lastRun?: Date;
  nextRun?: Date;
}

class JobScheduler {
  private jobs: Map<string, cron.ScheduledTask> = new Map();
  private jobConfigs: Map<string, ScheduledJob> = new Map();

  register(job: ScheduledJob): void {
    if (this.jobs.has(job.name)) {
      throw new Error(`Job ${job.name} already registered`);
    }

    // Validate cron expression
    if (!cron.validate(job.schedule)) {
      throw new Error(`Invalid cron expression: ${job.schedule}`);
    }

    const task = cron.schedule(job.schedule, async () => {
      if (!job.enabled) return;

      console.log(`Running job: ${job.name}`);
      const startTime = Date.now();

      try {
        await job.handler();

        const duration = Date.now() - startTime;
        console.log(`Job ${job.name} completed in ${duration}ms`);

        job.lastRun = new Date();
      } catch (error) {
        console.error(`Job ${job.name} failed:`, error);
      }
    });

    this.jobs.set(job.name, task);
    this.jobConfigs.set(job.name, job);

    if (job.enabled) {
      task.start();
    }
  }

  start(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.start();

    const config = this.jobConfigs.get(name)!;
    config.enabled = true;
  }

  stop(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.stop();

    const config = this.jobConfigs.get(name)!;
    config.enabled = false;
  }

  remove(name: string): void {
    const task = this.jobs.get(name);
    if (task) {
      task.destroy();
      this.jobs.delete(name);
      this.jobConfigs.delete(name);
    }
  }

  getJobs(): ScheduledJob[] {
    return Array.from(this.jobConfigs.values());
  }
}

// Usage
const scheduler = new JobScheduler();

// Register jobs
scheduler.register({
  name: 'daily-backup',
  schedule: '0 2 * * *', // 2 AM daily
  enabled: true,
  handler: async () => {
    console.log('Running daily backup...');
    // Backup logic
  }
});

scheduler.register({
  name: 'hourly-cleanup',
  schedule: '0 * * * *', // Every hour
  enabled: true,
  handler: async () => {
    console.log('Running cleanup...');
    // Cleanup logic
  }
});

scheduler.register({
  name: 'weekly-report',
  schedule: '0 9 * * 1', // Monday 9 AM
  enabled: true,
  handler: async () => {
    console.log('Generating weekly report...');
    // Report generation
  }
});

Best Practices

✅ DO

  • Implement idempotency for all jobs
  • Use job queues for distributed processing
  • Monitor job success/failure rates
  • Implement retry logic with exponential backoff
  • Set appropriate timeouts
  • Log job execution details
  • Use dead letter queues for failed jobs
  • Implement job priority levels
  • Batch similar operations together
  • Use connection pooling
  • Implement graceful shutdown
  • Monitor queue depth and processing time

❌ DON'T

  • Process jobs synchronously in request handlers
  • Ignore failed jobs
  • Set unlimited retries
  • Skip monitoring and alerting
  • Process jobs without timeouts
  • Store large payloads in queue
  • Forget to clean up completed jobs

Resources