| name | message-queues |
| description | Work with message queues for async data processing. Use when implementing pub/sub patterns, processing data asynchronously, or building distributed data pipelines. NOT needed for simple batch file ETL. |
Message Queues
Provides patterns for message queue integration in data pipelines.
In-Memory Queue
from queue import Queue
from threading import Thread
from typing import Callable, Any
class SimpleMessageQueue:
def __init__(self, num_workers: int = 1):
self.queue = Queue()
self.workers = []
self.num_workers = num_workers
self.running = False
def start(self, handler: Callable[[Any], None]):
self.running = True
for _ in range(self.num_workers):
worker = Thread(target=self._worker, args=(handler,))
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker(self, handler: Callable):
while self.running:
try:
message = self.queue.get(timeout=1)
handler(message)
self.queue.task_done()
except:
continue
def publish(self, message: Any):
self.queue.put(message)
def stop(self):
self.running = False
self.queue.join()
Pub/Sub Pattern
from typing import Dict, List, Callable, Any
class PubSub:
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, topic: str, callback: Callable[[Any], None]):
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
def publish(self, topic: str, message: Any):
for callback in self.subscribers.get(topic, []):
callback(message)
Note
Message queues are typically used for distributed systems and real-time processing. For batch ETL pipelines processing local files, direct file processing is usually more appropriate.