Claude Code Plugins

Community-maintained marketplace

Feedback
17
0

Work with message queues for async data processing. Use when implementing pub/sub patterns, processing data asynchronously, or building distributed data pipelines. NOT needed for simple batch file ETL.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name message-queues
description Work with message queues for async data processing. Use when implementing pub/sub patterns, processing data asynchronously, or building distributed data pipelines. NOT needed for simple batch file ETL.

Message Queues

Provides patterns for message queue integration in data pipelines.

In-Memory Queue

from queue import Queue
from threading import Thread
from typing import Callable, Any

class SimpleMessageQueue:
    def __init__(self, num_workers: int = 1):
        self.queue = Queue()
        self.workers = []
        self.num_workers = num_workers
        self.running = False

    def start(self, handler: Callable[[Any], None]):
        self.running = True
        for _ in range(self.num_workers):
            worker = Thread(target=self._worker, args=(handler,))
            worker.daemon = True
            worker.start()
            self.workers.append(worker)

    def _worker(self, handler: Callable):
        while self.running:
            try:
                message = self.queue.get(timeout=1)
                handler(message)
                self.queue.task_done()
            except:
                continue

    def publish(self, message: Any):
        self.queue.put(message)

    def stop(self):
        self.running = False
        self.queue.join()

Pub/Sub Pattern

from typing import Dict, List, Callable, Any

class PubSub:
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}

    def subscribe(self, topic: str, callback: Callable[[Any], None]):
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(callback)

    def publish(self, topic: str, message: Any):
        for callback in self.subscribers.get(topic, []):
            callback(message)

Note

Message queues are typically used for distributed systems and real-time processing. For batch ETL pipelines processing local files, direct file processing is usually more appropriate.