Claude Code Plugins

Community-maintained marketplace

Feedback

concurrency-patterns

@aj-geddes/useful-ai-prompts
4
0

Implement thread-safe code, mutexes, semaphores, async/await patterns, and concurrent data structures. Use when handling parallel operations, race conditions, or building high-performance concurrent systems.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name concurrency-patterns
description Implement thread-safe code, mutexes, semaphores, async/await patterns, and concurrent data structures. Use when handling parallel operations, race conditions, or building high-performance concurrent systems.

Concurrency Patterns

Overview

Implement safe concurrent code using proper synchronization primitives and patterns for parallel execution.

When to Use

  • Multi-threaded applications
  • Parallel data processing
  • Race condition prevention
  • Resource pooling
  • Task coordination
  • High-performance systems
  • Async operations
  • Worker pools

Implementation Examples

1. Promise Pool (TypeScript)

class PromisePool {
  private queue: Array<() => Promise<any>> = [];
  private active = 0;

  constructor(private concurrency: number) {}

  async add<T>(fn: () => Promise<T>): Promise<T> {
    while (this.active >= this.concurrency) {
      await this.waitForSlot();
    }

    this.active++;

    try {
      return await fn();
    } finally {
      this.active--;
    }
  }

  private async waitForSlot(): Promise<void> {
    return new Promise(resolve => {
      const checkSlot = () => {
        if (this.active < this.concurrency) {
          resolve();
        } else {
          setTimeout(checkSlot, 10);
        }
      };
      checkSlot();
    });
  }

  async map<T, R>(
    items: T[],
    fn: (item: T) => Promise<R>
  ): Promise<R[]> {
    return Promise.all(
      items.map(item => this.add(() => fn(item)))
    );
  }
}

// Usage
const pool = new PromisePool(5);

const urls = Array.from({ length: 100 }, (_, i) =>
  `https://api.example.com/item/${i}`
);

const results = await pool.map(urls, async (url) => {
  const response = await fetch(url);
  return response.json();
});

2. Mutex and Semaphore (TypeScript)

class Mutex {
  private locked = false;
  private queue: Array<() => void> = [];

  async acquire(): Promise<void> {
    if (!this.locked) {
      this.locked = true;
      return;
    }

    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }

  release(): void {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift()!;
      resolve();
    } else {
      this.locked = false;
    }
  }

  async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

class Semaphore {
  private available: number;
  private queue: Array<() => void> = [];

  constructor(private max: number) {
    this.available = max;
  }

  async acquire(): Promise<void> {
    if (this.available > 0) {
      this.available--;
      return;
    }

    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }

  release(): void {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift()!;
      resolve();
    } else {
      this.available++;
    }
  }

  async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

// Usage
const mutex = new Mutex();
let counter = 0;

async function incrementCounter() {
  await mutex.runExclusive(async () => {
    const current = counter;
    await new Promise(resolve => setTimeout(resolve, 10));
    counter = current + 1;
  });
}

// Database connection pool with semaphore
const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections

async function queryDatabase(query: string) {
  return dbSemaphore.runExclusive(async () => {
    // Execute query
    return executeQuery(query);
  });
}

async function executeQuery(query: string) {
  // Query logic
}

3. Worker Pool (Node.js)

import { Worker } from 'worker_threads';

interface Task<T> {
  id: string;
  data: any;
  resolve: (value: T) => void;
  reject: (error: Error) => void;
}

class WorkerPool {
  private workers: Worker[] = [];
  private availableWorkers: Worker[] = [];
  private taskQueue: Task<any>[] = [];

  constructor(
    private workerScript: string,
    private poolSize: number
  ) {
    this.initializeWorkers();
  }

  private initializeWorkers(): void {
    for (let i = 0; i < this.poolSize; i++) {
      const worker = new Worker(this.workerScript);

      worker.on('message', (result) => {
        this.handleWorkerMessage(worker, result);
      });

      worker.on('error', (error) => {
        console.error('Worker error:', error);
      });

      this.workers.push(worker);
      this.availableWorkers.push(worker);
    }
  }

  async execute<T>(data: any): Promise<T> {
    return new Promise((resolve, reject) => {
      const task: Task<T> = {
        id: Math.random().toString(36),
        data,
        resolve,
        reject
      };

      this.taskQueue.push(task);
      this.processQueue();
    });
  }

  private processQueue(): void {
    while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
      const task = this.taskQueue.shift()!;
      const worker = this.availableWorkers.shift()!;

      worker.postMessage({
        taskId: task.id,
        data: task.data
      });

      (worker as any).currentTask = task;
    }
  }

  private handleWorkerMessage(worker: Worker, result: any): void {
    const task = (worker as any).currentTask as Task<any>;

    if (!task) return;

    if (result.error) {
      task.reject(new Error(result.error));
    } else {
      task.resolve(result.data);
    }

    delete (worker as any).currentTask;
    this.availableWorkers.push(worker);
    this.processQueue();
  }

  async terminate(): Promise<void> {
    await Promise.all(
      this.workers.map(worker => worker.terminate())
    );
  }
}

// worker.js
// const { parentPort } = require('worker_threads');
//
// parentPort.on('message', async ({ taskId, data }) => {
//   try {
//     const result = await processData(data);
//     parentPort.postMessage({ taskId, data: result });
//   } catch (error) {
//     parentPort.postMessage({ taskId, error: error.message });
//   }
// });

4. Python Threading Patterns

import threading
from queue import Queue
from typing import Callable, List, TypeVar, Generic
import time

T = TypeVar('T')
R = TypeVar('R')

class ThreadPool(Generic[T, R]):
    def __init__(self, num_threads: int):
        self.num_threads = num_threads
        self.tasks: Queue = Queue()
        self.results: List[R] = []
        self.lock = threading.Lock()
        self.workers: List[threading.Thread] = []

    def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
        """Map function over items using thread pool."""
        # Add tasks to queue
        for item in items:
            self.tasks.put(item)

        # Start workers
        for _ in range(self.num_threads):
            worker = threading.Thread(
                target=self._worker,
                args=(func,)
            )
            worker.start()
            self.workers.append(worker)

        # Wait for completion
        self.tasks.join()

        # Stop workers
        for _ in range(self.num_threads):
            self.tasks.put(None)

        for worker in self.workers:
            worker.join()

        return self.results

    def _worker(self, func: Callable[[T], R]):
        """Worker thread."""
        while True:
            item = self.tasks.get()

            if item is None:
                self.tasks.task_done()
                break

            try:
                result = func(item)

                with self.lock:
                    self.results.append(result)
            finally:
                self.tasks.task_done()


class Mutex:
    def __init__(self):
        self._lock = threading.Lock()

    def __enter__(self):
        self._lock.acquire()
        return self

    def __exit__(self, *args):
        self._lock.release()


class Semaphore:
    def __init__(self, max_count: int):
        self._semaphore = threading.Semaphore(max_count)

    def __enter__(self):
        self._semaphore.acquire()
        return self

    def __exit__(self, *args):
        self._semaphore.release()


# Usage
def process_item(item: int) -> int:
    time.sleep(0.1)
    return item * 2

pool = ThreadPool(num_threads=5)
items = list(range(100))
results = pool.map(process_item, items)
print(f"Processed {len(results)} items")

# Mutex example
counter = 0
mutex = Mutex()

def increment():
    global counter
    with mutex:
        current = counter
        time.sleep(0.001)
        counter = current + 1

threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter: {counter}")  # Should be 100

# Semaphore example
db_connections = Semaphore(max_count=10)

def query_database(query: str):
    with db_connections:
        # Execute query with limited connections
        time.sleep(0.1)
        print(f"Executing: {query}")

5. Async Patterns (Python asyncio)

import asyncio
from typing import Callable, List, TypeVar, Awaitable

T = TypeVar('T')
R = TypeVar('R')

class AsyncPool:
    def __init__(self, concurrency: int):
        self.semaphore = asyncio.Semaphore(concurrency)

    async def map(
        self,
        func: Callable[[T], Awaitable[R]],
        items: List[T]
    ) -> List[R]:
        """Map async function over items with concurrency limit."""
        async def bounded_func(item: T) -> R:
            async with self.semaphore:
                return await func(item)

        return await asyncio.gather(*[
            bounded_func(item) for item in items
        ])


class AsyncQueue:
    def __init__(self, max_size: int = 0):
        self.queue = asyncio.Queue(maxsize=max_size)

    async def put(self, item):
        await self.queue.put(item)

    async def get(self):
        return await self.queue.get()

    def task_done(self):
        self.queue.task_done()

    async def join(self):
        await self.queue.join()


# Producer-Consumer pattern
async def producer(queue: AsyncQueue, items: List[int]):
    """Produce items."""
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.1)

async def consumer(queue: AsyncQueue, name: str):
    """Consume items."""
    while True:
        item = await queue.get()

        if item is None:
            queue.task_done()
            break

        print(f"{name} consuming: {item}")
        await asyncio.sleep(0.2)
        queue.task_done()

async def main():
    queue = AsyncQueue(max_size=10)

    # Start consumers
    consumers = [
        asyncio.create_task(consumer(queue, f"Consumer-{i}"))
        for i in range(3)
    ]

    # Start producer
    await producer(queue, list(range(20)))

    # Wait for all items to be processed
    await queue.join()

    # Stop consumers
    for _ in range(3):
        await queue.put(None)

    await asyncio.gather(*consumers)

asyncio.run(main())

6. Go-Style Channels (Simulation)

class Channel<T> {
  private buffer: T[] = [];
  private senders: Array<{ value: T; resolve: () => void }> = [];
  private receivers: Array<(value: T) => void> = [];
  private closed = false;

  constructor(private bufferSize: number = 0) {}

  async send(value: T): Promise<void> {
    if (this.closed) {
      throw new Error('Channel is closed');
    }

    if (this.receivers.length > 0) {
      const receiver = this.receivers.shift()!;
      receiver(value);
      return;
    }

    if (this.buffer.length < this.bufferSize) {
      this.buffer.push(value);
      return;
    }

    return new Promise(resolve => {
      this.senders.push({ value, resolve });
    });
  }

  async receive(): Promise<T | undefined> {
    if (this.buffer.length > 0) {
      const value = this.buffer.shift()!;

      if (this.senders.length > 0) {
        const sender = this.senders.shift()!;
        this.buffer.push(sender.value);
        sender.resolve();
      }

      return value;
    }

    if (this.senders.length > 0) {
      const sender = this.senders.shift()!;
      sender.resolve();
      return sender.value;
    }

    if (this.closed) {
      return undefined;
    }

    return new Promise(resolve => {
      this.receivers.push(resolve);
    });
  }

  close(): void {
    this.closed = true;
    this.receivers.forEach(receiver => receiver(undefined as any));
    this.receivers = [];
  }
}

// Usage
async function example() {
  const channel = new Channel<number>(5);

  // Producer
  async function producer() {
    for (let i = 0; i < 10; i++) {
      await channel.send(i);
      console.log(`Sent: ${i}`);
    }
    channel.close();
  }

  // Consumer
  async function consumer() {
    while (true) {
      const value = await channel.receive();
      if (value === undefined) break;
      console.log(`Received: ${value}`);
    }
  }

  await Promise.all([
    producer(),
    consumer()
  ]);
}

Best Practices

✅ DO

  • Use proper synchronization primitives
  • Limit concurrency to avoid resource exhaustion
  • Handle errors in concurrent operations
  • Use immutable data when possible
  • Test concurrent code thoroughly
  • Profile concurrent performance
  • Document thread-safety guarantees

❌ DON'T

  • Share mutable state without synchronization
  • Use sleep/polling for coordination
  • Create unlimited threads/workers
  • Ignore race conditions
  • Block event loops in async code
  • Forget to clean up resources

Resources