Claude Code Plugins

Community-maintained marketplace

Feedback

async-patterns-and-concurrency

@tachyon-beep/skillpacks
2
0

async/await mastery, asyncio patterns, TaskGroup (3.11+), structured concurrency, event loop management, common pitfalls, concurrent.futures

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name async-patterns-and-concurrency
description async/await mastery, asyncio patterns, TaskGroup (3.11+), structured concurrency, event loop management, common pitfalls, concurrent.futures

Async Patterns and Concurrency

Overview

Core Principle: Async code is about I/O concurrency, not CPU parallelism. Use async when waiting for network, files, or databases. Don't use async to speed up CPU-bound work.

Python's async/await (asyncio) enables single-threaded concurrency through cooperative multitasking. Structured concurrency (TaskGroup in 3.11+) makes async code safer and easier to reason about. The most common mistake: blocking the event loop with synchronous operations.

When to Use

Use this skill when:

  • "asyncio not working"
  • "async/await errors"
  • "Event loop issues"
  • "Coroutine never awaited"
  • "How to use TaskGroup?"
  • "When to use async?"
  • "Async code is slow"
  • "Blocking the event loop"

Don't use when:

  • CPU-bound work (use multiprocessing or threads)
  • Setting up project (use project-structure-and-tooling)
  • Profiling needed (use debugging-and-profiling first)

Symptoms triggering this skill:

  • RuntimeWarning: coroutine was never awaited
  • Event loop errors
  • Async functions not running concurrently
  • Need to parallelize I/O operations

Async Fundamentals

When to Use Async vs Sync

# ❌ WRONG: Using async for CPU-bound work
async def calculate_fibonacci(n: int) -> int:
    if n < 2:
        return n
    return await calculate_fibonacci(n-1) + await calculate_fibonacci(n-2)
# Problem: No I/O, just CPU work. Async adds overhead without benefit.

# ✅ CORRECT: Use regular function for CPU work
def calculate_fibonacci(n: int) -> int:
    if n < 2:
        return n
    return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)

# ✅ CORRECT: Use async for I/O-bound work
async def fetch_user(user_id: int) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/users/{user_id}") as resp:
            return await resp.json()
# Async shines: waiting for network response, can do other work

# ✅ CORRECT: Use async when orchestrating multiple I/O operations
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user(session, uid) for uid in user_ids]
        return await asyncio.gather(*tasks)
# Multiple network calls run concurrently

Why this matters: Async adds complexity. Only use when you benefit from I/O concurrency. For CPU work, use threads or multiprocessing.

Basic async/await Syntax

# ❌ WRONG: Forgetting await
async def get_data():
    return fetch_from_api()  # Returns coroutine, doesn't execute!

result = get_data()  # RuntimeWarning: coroutine never awaited
print(result)  # Prints <coroutine object>, not data

# ✅ CORRECT: Always await async functions
async def get_data():
    return await fetch_from_api()

# ✅ CORRECT: Running from sync code
import asyncio

def main():
    result = asyncio.run(get_data())
    print(result)

# ✅ CORRECT: Running from async code
async def main():
    result = await get_data()
    print(result)

asyncio.run(main())

Why this matters: Async functions return coroutines. Must await them to execute. asyncio.run() bridges sync and async worlds.

Running the Event Loop

# ❌ WRONG: Running event loop multiple times
import asyncio

asyncio.run(task1())
asyncio.run(task2())  # Creates new event loop, inefficient

# ✅ CORRECT: Single event loop for all async work
async def main():
    await task1()
    await task2()

asyncio.run(main())

# ❌ WRONG: Mixing asyncio.run and manual loop management
loop = asyncio.get_event_loop()
loop.run_until_complete(task1())
asyncio.run(task2())  # Error: loop already running

# ✅ CORRECT: Use asyncio.run() (Python 3.7+)
asyncio.run(main())

# ✅ CORRECT: For advanced cases, manual loop management
async def main():
    await task1()
    await task2()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    loop.close()

Why this matters: asyncio.run() handles loop creation and cleanup automatically. Prefer it unless you need fine-grained control.


Structured Concurrency with TaskGroup (Python 3.11+)

TaskGroup Basics

# ❌ WRONG: Creating tasks without proper cleanup (old style)
async def fetch_all_old(urls: list[str]) -> list[str]:
    tasks = []
    for url in urls:
        task = asyncio.create_task(fetch(url))
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    return results
# Problem: If one task fails, others continue. No automatic cleanup.

# ✅ CORRECT: TaskGroup (Python 3.11+)
async def fetch_all(urls: list[str]) -> list[str]:
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(url)) for url in urls]

    # When exiting context, all tasks guaranteed complete or cancelled
    return [task.result() for task in tasks]

# Why this matters: TaskGroup ensures:
# 1. All tasks complete before proceeding
# 2. If any task fails, all others cancelled
# 3. Automatic cleanup, no leaked tasks

Handling Errors with TaskGroup

# ❌ WRONG: Silent failures with gather
async def process_all_gather(items: list[str]) -> list[str]:
    tasks = [asyncio.create_task(process(item)) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return [r for r in results if not isinstance(r, Exception)]
# Problem: Errors silently ignored, hard to debug

# ✅ CORRECT: TaskGroup raises ExceptionGroup
async def process_all(items: list[str]) -> list[str]:
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(process(item)) for item in items]
    return [task.result() for task in tasks]

# Usage with error handling
try:
    results = await process_all(items)
except* ValueError as eg:
    # Handle all ValueErrors
    for exc in eg.exceptions:
        log.error(f"Validation error: {exc}")
except* ConnectionError as eg:
    # Handle all ConnectionErrors
    for exc in eg.exceptions:
        log.error(f"Network error: {exc}")

# ✅ CORRECT: Selective error handling with gather
async def process_with_fallback(items: list[str]) -> list[str]:
    tasks = [asyncio.create_task(process(item)) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    processed = []
    for item, result in zip(items, results):
        if isinstance(result, Exception):
            log.warning(f"Failed to process {item}: {result}")
            processed.append(None)  # Or default value
        else:
            processed.append(result)
    return processed

Why this matters: TaskGroup provides structured concurrency with automatic cleanup. Use gather when you need partial results despite failures.

Timeout Handling

# ❌ WRONG: No timeout on I/O operations
async def fetch_data(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()
# Problem: Can hang forever if server doesn't respond

# ✅ CORRECT: Timeout with asyncio.timeout (Python 3.11+)
async def fetch_data(url: str) -> str:
    async with asyncio.timeout(10.0):  # 10 second timeout
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.text()
# Raises TimeoutError after 10 seconds

# ✅ CORRECT: Timeout on TaskGroup
async def fetch_all_with_timeout(urls: list[str]) -> list[str]:
    async with asyncio.timeout(30.0):  # Total timeout
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(fetch_data(url)) for url in urls]
        return [task.result() for task in tasks]

# ✅ CORRECT: Individual timeouts (Python <3.11)
async def fetch_with_timeout_old(url: str) -> str:
    try:
        return await asyncio.wait_for(fetch_data(url), timeout=10.0)
    except asyncio.TimeoutError:
        log.error(f"Timeout fetching {url}")
        raise

Why this matters: Always timeout I/O operations. Network calls can hang indefinitely. asyncio.timeout() (3.11+) is cleaner than wait_for().


Async Context Managers

Basic Async Context Manager

# ❌ WRONG: Using sync context manager in async code
class DatabaseConnection:
    def __enter__(self):
        self.conn = connect_to_db()  # Blocking I/O!
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.conn.close()  # Blocking I/O!

async def query():
    with DatabaseConnection() as conn:  # Blocks event loop
        return await conn.query("SELECT * FROM users")

# ✅ CORRECT: Async context manager
class AsyncDatabaseConnection:
    async def __aenter__(self):
        self.conn = await async_connect_to_db()
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()

async def query():
    async with AsyncDatabaseConnection() as conn:
        return await conn.query("SELECT * FROM users")

Using contextlib for Async Context Managers

from contextlib import asynccontextmanager

# ✅ CORRECT: Simple async context manager with decorator
@asynccontextmanager
async def database_connection(host: str):
    conn = await connect_to_database(host)
    try:
        yield conn
    finally:
        await conn.close()

# Usage
async def fetch_users():
    async with database_connection("localhost") as conn:
        return await conn.query("SELECT * FROM users")

# ✅ CORRECT: Resource pool management
@asynccontextmanager
async def http_session():
    session = aiohttp.ClientSession()
    try:
        yield session
    finally:
        await session.close()

async def fetch_multiple(urls: list[str]):
    async with http_session() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

Why this matters: Async context managers ensure resources cleaned up properly. Use @asynccontextmanager for simple cases, __aenter__/__aexit__ for complex ones.


Async Iterators and Generators

Async Iterators

# ❌ WRONG: Sync iterator doing async work
class DataFetcher:
    def __init__(self, ids: list[int]):
        self.ids = ids
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index >= len(self.ids):
            raise StopIteration
        data = asyncio.run(fetch_data(self.ids[self.index]))  # Don't do this!
        self.index += 1
        return data

# ✅ CORRECT: Async iterator
class AsyncDataFetcher:
    def __init__(self, ids: list[int]):
        self.ids = ids
        self.index = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.index >= len(self.ids):
            raise StopAsyncIteration
        data = await fetch_data(self.ids[self.index])
        self.index += 1
        return data

# Usage
async def process_all():
    async for data in AsyncDataFetcher([1, 2, 3, 4]):
        print(data)

Async Generators

# ✅ CORRECT: Async generator (simpler than iterator)
async def fetch_users_paginated(page_size: int = 100):
    page = 0
    while True:
        users = await fetch_page(page, page_size)
        if not users:
            break
        for user in users:
            yield user
        page += 1

# Usage
async def process_all_users():
    async for user in fetch_users_paginated():
        await process_user(user)

# ✅ CORRECT: Async generator with cleanup
async def stream_file_lines(path: str):
    async with aiofiles.open(path) as f:
        async for line in f:
            yield line.strip()

# Usage with async comprehension
async def load_data(path: str) -> list[str]:
    return [line async for line in stream_file_lines(path)]

Why this matters: Async iterators/generators enable streaming I/O-bound data without loading everything into memory. Essential for large datasets.


Common Async Pitfalls

Blocking the Event Loop

# ❌ WRONG: Blocking operation in async function
import time
import requests

async def fetch_data(url: str) -> str:
    # Blocks entire event loop for 2 seconds!
    time.sleep(2)

    # Also blocks event loop (requests is synchronous)
    response = requests.get(url)
    return response.text

# ✅ CORRECT: Use async sleep and async HTTP
import asyncio
import aiohttp

async def fetch_data(url: str) -> str:
    await asyncio.sleep(2)  # Non-blocking sleep

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

# ✅ CORRECT: If must use blocking code, run in executor
import asyncio
import requests

async def fetch_data_sync(url: str) -> str:
    loop = asyncio.get_running_loop()

    # Run blocking code in thread pool
    response = await loop.run_in_executor(
        None,  # Use default executor
        requests.get,
        url
    )
    return response.text

# ✅ CORRECT: CPU-bound work in process pool
async def heavy_computation(data: bytes) -> bytes:
    loop = asyncio.get_running_loop()

    # Run in process pool for CPU work
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, process_data, data)
    return result

Why this matters: Blocking the event loop stops ALL async code. Use async libraries (aiohttp not requests), async sleep, or run_in_executor for blocking code.

Forgetting to Await

# ❌ WRONG: Not awaiting async functions
async def main():
    fetch_data()  # Returns coroutine, doesn't run!
    print("Done")

# ✅ CORRECT: Always await
async def main():
    await fetch_data()
    print("Done")

# ❌ WRONG: Collecting coroutines without running them
async def process_all(items: list[str]):
    results = [process_item(item) for item in items]  # List of coroutines!
    return results

# ✅ CORRECT: Await or gather
async def process_all(items: list[str]):
    tasks = [asyncio.create_task(process_item(item)) for item in items]
    return await asyncio.gather(*tasks)

# ✅ BETTER: TaskGroup (Python 3.11+)
async def process_all(items: list[str]):
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(process_item(item)) for item in items]
    return [task.result() for task in tasks]

Shared Mutable State

# ❌ WRONG: Shared mutable state without locks
counter = 0

async def increment():
    global counter
    temp = counter
    await asyncio.sleep(0)  # Yield control
    counter = temp + 1  # Race condition!

async def main():
    await asyncio.gather(*[increment() for _ in range(100)])
    print(counter)  # Not 100! Lost updates due to race

# ✅ CORRECT: Use asyncio.Lock
counter = 0
lock = asyncio.Lock()

async def increment():
    global counter
    async with lock:
        temp = counter
        await asyncio.sleep(0)
        counter = temp + 1

async def main():
    await asyncio.gather(*[increment() for _ in range(100)])
    print(counter)  # 100, as expected

# ✅ BETTER: Avoid shared state
async def increment(current: int) -> int:
    await asyncio.sleep(0)
    return current + 1

async def main():
    results = await asyncio.gather(*[increment(i) for i in range(100)])
    print(sum(results))

Why this matters: Async code is concurrent. Race conditions exist. Use locks or avoid shared mutable state.


Async Patterns

Fire and Forget

# ❌ WRONG: Creating task without tracking it
async def main():
    asyncio.create_task(background_job())  # Task may not complete!
    return "Done"

# ✅ CORRECT: Track background tasks
background_tasks = set()

async def main():
    task = asyncio.create_task(background_job())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)
    return "Done"

# ✅ CORRECT: Wait for background tasks before exit
async def main():
    task = asyncio.create_task(background_job())
    try:
        return "Done"
    finally:
        await task

Retry with Exponential Backoff

# ❌ WRONG: Retry without delay
async def fetch_with_retry(url: str, max_retries: int = 3) -> str:
    for attempt in range(max_retries):
        try:
            return await fetch_data(url)
        except Exception:
            if attempt == max_retries - 1:
                raise
    # Hammers server, no backoff

# ✅ CORRECT: Exponential backoff with jitter
async def fetch_with_retry(
    url: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> str:
    for attempt in range(max_retries):
        try:
            return await fetch_data(url)
        except Exception as e:
            if attempt == max_retries - 1:
                raise

            # Exponential backoff with jitter
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            log.warning(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
            await asyncio.sleep(delay)

    raise RuntimeError("Unreachable")

Rate Limiting

# ❌ WRONG: No rate limiting
async def fetch_all(urls: list[str]) -> list[str]:
    tasks = [asyncio.create_task(fetch(url)) for url in urls]
    return await asyncio.gather(*tasks)
# Can overwhelm server with 1000s of concurrent requests

# ✅ CORRECT: Semaphore for concurrent request limit
async def fetch_all(urls: list[str], max_concurrent: int = 10) -> list[str]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_sem(url: str) -> str:
        async with semaphore:
            return await fetch(url)

    tasks = [asyncio.create_task(fetch_with_sem(url)) for url in urls]
    return await asyncio.gather(*tasks)

# ✅ CORRECT: Token bucket rate limiting
class RateLimiter:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # Tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = asyncio.get_event_loop().time()
        self.lock = asyncio.Lock()

    async def acquire(self):
        async with self.lock:
            now = asyncio.get_event_loop().time()
            elapsed = now - self.last_update
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now

            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1

# Usage
rate_limiter = RateLimiter(rate=10.0, capacity=10)  # 10 req/sec

async def fetch_with_limit(url: str) -> str:
    await rate_limiter.acquire()
    return await fetch(url)

Why this matters: Rate limiting prevents overwhelming servers and respects API limits. Semaphore limits concurrency, token bucket smooths bursts.

Async Queue for Producer/Consumer

# ✅ CORRECT: Producer/consumer with asyncio.Queue
import asyncio

async def producer(queue: asyncio.Queue, items: list[str]):
    for item in items:
        await queue.put(item)
        await asyncio.sleep(0.1)  # Simulate work

    # Signal completion
    await queue.put(None)

async def consumer(queue: asyncio.Queue, consumer_id: int):
    while True:
        item = await queue.get()

        if item is None:
            # Re-queue sentinel for other consumers
            await queue.put(None)
            break

        print(f"Consumer {consumer_id} processing {item}")
        await asyncio.sleep(0.2)  # Simulate work
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    items = [f"item_{i}" for i in range(20)]

    # Start producer and consumers
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, items))
        for i in range(3):
            tg.create_task(consumer(queue, i))

    # Wait for all items processed
    await queue.join()

# ✅ CORRECT: Multiple producers, multiple consumers
async def worker(name: str, queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break

        await process_item(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # Create workers
    workers = [asyncio.create_task(worker(f"worker-{i}", queue)) for i in range(5)]

    # Add work
    for item in items:
        await queue.put(item)

    # Wait for all work done
    await queue.join()

    # Stop workers
    for _ in workers:
        await queue.put(None)
    await asyncio.gather(*workers)

Why this matters: asyncio.Queue is thread-safe and async-safe. Perfect for producer/consumer patterns in async code.


Threading vs Async vs Multiprocessing

When to Use What

# CPU-bound work: Use multiprocessing
def cpu_bound(n: int) -> int:
    return sum(i * i for i in range(n))

async def process_cpu_tasks(data: list[int]) -> list[int]:
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        results = await asyncio.gather(*[
            loop.run_in_executor(pool, cpu_bound, n) for n in data
        ])
    return results

# I/O-bound work: Use async
async def io_bound(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

async def process_io_tasks(urls: list[str]) -> list[str]:
    return await asyncio.gather(*[io_bound(url) for url in urls])

# Blocking I/O (no async library): Use threads
def blocking_io(path: str) -> str:
    with open(path) as f:  # Blocking file I/O
        return f.read()

async def process_files(paths: list[str]) -> list[str]:
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        results = await asyncio.gather(*[
            loop.run_in_executor(pool, blocking_io, path) for path in paths
        ])
    return results

Decision tree:

Is work CPU-bound?
├─ Yes → multiprocessing (ProcessPoolExecutor)
└─ No → I/O-bound
    ├─ Async library available? → async/await
    └─ Only sync library? → threads (ThreadPoolExecutor)

Combining Async and Threads

# ✅ CORRECT: Running async code in thread
import threading

def run_async_in_thread(coro):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        return loop.run_until_complete(coro)
    finally:
        loop.close()

def sync_function():
    result = run_async_in_thread(async_operation())
    return result

# ✅ CORRECT: Thread-safe async queue
class AsyncThreadSafeQueue:
    def __init__(self):
        self._queue = queue.Queue()

    async def get(self):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self._queue.get)

    async def put(self, item):
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, self._queue.put, item)

Debugging Async Code

Common Errors and Solutions

# Error: RuntimeWarning: coroutine 'fetch' was never awaited
# ❌ WRONG:
async def main():
    fetch_data()  # Missing await

# ✅ CORRECT:
async def main():
    await fetch_data()

# Error: RuntimeError: Event loop is closed
# ❌ WRONG:
asyncio.run(coro1())
asyncio.run(coro2())  # Creates new loop, first loop closed

# ✅ CORRECT:
async def main():
    await coro1()
    await coro2()
asyncio.run(main())

# Error: RuntimeError: Task got Future attached to different loop
# ❌ WRONG:
loop1 = asyncio.new_event_loop()
task = loop1.create_task(coro())
loop2 = asyncio.new_event_loop()
loop2.run_until_complete(task)  # Task from different loop!

# ✅ CORRECT: Use same loop
loop = asyncio.new_event_loop()
task = loop.create_task(coro())
loop.run_until_complete(task)

Enabling Debug Mode

# Enable asyncio debug mode for better errors
import asyncio
import logging

# Method 1: Environment variable
# PYTHONASYNCIODEBUG=1 python script.py

# Method 2: In code
asyncio.run(main(), debug=True)

# Method 3: For existing loop
loop = asyncio.get_event_loop()
loop.set_debug(True)

# Configure logging
logging.basicConfig(level=logging.DEBUG)

# Debug mode enables:
# - Warnings for slow callbacks (>100ms)
# - Warnings for coroutines never awaited
# - Better stack traces

Detecting Blocking Code

# ✅ CORRECT: Monitor event loop lag
import asyncio
import time

class LoopMonitor:
    def __init__(self, threshold: float = 0.1):
        self.threshold = threshold
        self.last_check = time.monotonic()

    async def monitor(self):
        while True:
            now = time.monotonic()
            lag = now - self.last_check - 1.0  # Expecting 1 second sleep

            if lag > self.threshold:
                log.warning(f"Event loop blocked for {lag:.3f}s")

            self.last_check = now
            await asyncio.sleep(1.0)

async def main():
    monitor = LoopMonitor()
    asyncio.create_task(monitor.monitor())

    # Your async code here
    await run_application()

Async Libraries Ecosystem

Essential Async Libraries

# HTTP client
import aiohttp

async def fetch(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

# File I/O
import aiofiles

async def read_file(path: str) -> str:
    async with aiofiles.open(path) as f:
        return await f.read()

# Database (PostgreSQL)
import asyncpg

async def query_db():
    conn = await asyncpg.connect('postgresql://user@localhost/db')
    try:
        rows = await conn.fetch('SELECT * FROM users')
        return rows
    finally:
        await conn.close()

# Redis
import aioredis

async def cache_get(key: str) -> str | None:
    redis = await aioredis.create_redis_pool('redis://localhost')
    try:
        value = await redis.get(key)
        return value.decode() if value else None
    finally:
        redis.close()
        await redis.wait_closed()

Async Testing with pytest-asyncio

# Install: pip install pytest-asyncio

import pytest

# Mark async test
@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data("https://api.example.com")
    assert result is not None

# Async fixture
@pytest.fixture
async def http_session():
    async with aiohttp.ClientSession() as session:
        yield session

@pytest.mark.asyncio
async def test_with_session(http_session):
    async with http_session.get("https://api.example.com") as resp:
        assert resp.status == 200

Anti-Patterns

Async Over Everything

# ❌ WRONG: Making everything async without reason
async def calculate_total(prices: list[float]) -> float:
    total = 0.0
    for price in prices:
        total += price  # No I/O, no benefit from async
    return total

# ✅ CORRECT: Keep sync when no I/O
def calculate_total(prices: list[float]) -> float:
    return sum(prices)

# ❌ WRONG: Async wrapper for sync function
async def async_sum(numbers: list[int]) -> int:
    return sum(numbers)  # Why?

# ✅ CORRECT: Only async when doing I/O
async def fetch_and_sum(urls: list[str]) -> int:
    results = await asyncio.gather(*[fetch_number(url) for url in urls])
    return sum(results)  # sum() is sync, that's fine

Creating Too Many Tasks

# ❌ WRONG: Creating millions of tasks
async def process_all(items: list[str]):  # 1M items
    tasks = [asyncio.create_task(process(item)) for item in items]
    return await asyncio.gather(*tasks)
# Problem: Creates 1M tasks, high memory usage

# ✅ CORRECT: Batch processing with semaphore
async def process_all(items: list[str], max_concurrent: int = 100):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_with_sem(item: str):
        async with semaphore:
            return await process(item)

    return await asyncio.gather(*[process_with_sem(item) for item in items])

# ✅ BETTER: Process in batches
async def process_all(items: list[str], batch_size: int = 100):
    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(*[process(item) for item in batch])
        results.extend(batch_results)
    return results

Mixing Sync and Async Poorly

# ❌ WRONG: Calling asyncio.run inside async function
async def bad_function():
    result = asyncio.run(some_async_function())  # Error!
    return result

# ✅ CORRECT: Just await
async def good_function():
    result = await some_async_function()
    return result

# ❌ WRONG: Sync wrapper calling async repeatedly
def process_all_sync(items: list[str]) -> list[str]:
    return [asyncio.run(process(item)) for item in items]
# Creates new event loop for each item!

# ✅ CORRECT: Single event loop
def process_all_sync(items: list[str]) -> list[str]:
    async def process_all_async():
        return await asyncio.gather(*[process(item) for item in items])

    return asyncio.run(process_all_async())

Decision Trees

Should I Use Async?

Does my code do I/O? (network, files, database)
├─ No → Don't use async (CPU-bound work)
└─ Yes → Does an async library exist?
    ├─ Yes → Use async/await
    └─ No → Can I use sync library with threads?
        ├─ Yes → Use run_in_executor with ThreadPoolExecutor
        └─ No → Rethink approach or write async wrapper

Concurrent Execution Strategy

What am I waiting for?
├─ Network/database → async/await (asyncio)
├─ File I/O → async/await with aiofiles
├─ CPU computation → multiprocessing (ProcessPoolExecutor)
├─ Blocking library (no async version) → threads (ThreadPoolExecutor)
└─ Nothing (pure computation) → Regular sync code

Error Handling in Concurrent Tasks

Do I need all results?
├─ Yes → TaskGroup (3.11+) or gather without return_exceptions
│   └─ Fails fast on first error
└─ No (partial results OK) → gather with return_exceptions=True
    └─ Filter exceptions from results

Integration with Other Skills

After using this skill:

  • If profiling async code → See @debugging-and-profiling for async profiling
  • If testing async code → See @testing-and-quality for pytest-asyncio
  • If setting up project → See @project-structure-and-tooling for async dependencies

Before using this skill:

  • If code is slow → Use @debugging-and-profiling to verify it's I/O-bound first
  • If starting project → Use @project-structure-and-tooling to set up dependencies

Quick Reference

Python 3.11+ Features

Feature Description When to Use
TaskGroup Structured concurrency Multiple concurrent tasks, automatic cleanup
asyncio.timeout() Context manager for timeouts Cleaner than wait_for()
except* Exception group handling Handle multiple concurrent errors

Common Async Patterns

# Concurrent execution
async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(func(x)) for x in items]
results = [t.result() for t in tasks]

# Timeout
async with asyncio.timeout(10.0):
    result = await long_operation()

# Rate limiting
semaphore = asyncio.Semaphore(10)
async with semaphore:
    await rate_limited_operation()

# Retry with backoff
for attempt in range(max_retries):
    try:
        return await operation()
    except Exception:
        await asyncio.sleep(2 ** attempt)

When NOT to Use Async

  • Pure computation (no I/O)
  • Single I/O operation (overhead not worth it)
  • CPU-bound work (use multiprocessing)
  • When sync code is simpler and performance is acceptable