Claude Code Plugins

Community-maintained marketplace

Feedback

Python asyncio - Modern concurrent programming with async/await, event loops, tasks, coroutines, primitives, aiohttp, and FastAPI async patterns

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name asyncio
description Python asyncio - Modern concurrent programming with async/await, event loops, tasks, coroutines, primitives, aiohttp, and FastAPI async patterns
version 1.0.0
category toolchain
author Claude MPM Team
license MIT
progressive_disclosure [object Object]
context_limit 700
tags asyncio, async, concurrency, coroutines, aiohttp, fastapi, async-database, event-loop, tasks
requires_tools

Python asyncio - Async/Await Concurrency

Overview

Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.

Key Features:

  • async/await syntax for readable concurrent code
  • Event loop for managing concurrent operations
  • Tasks for running multiple coroutines concurrently
  • Primitives: locks, semaphores, events, queues
  • HTTP client/server with aiohttp
  • Database async support (asyncpg, aiomysql, motor)
  • FastAPI async endpoints
  • WebSocket support
  • Background task management

Installation:

# asyncio is built-in (Python 3.7+)

# Async HTTP client
pip install aiohttp

# Async HTTP requests (alternative)
pip install httpx

# Async database drivers
pip install asyncpg aiomysql motor  # PostgreSQL, MySQL, MongoDB

# FastAPI with async support
pip install fastapi uvicorn[standard]

# Async testing
pip install pytest-asyncio

Basic Async/Await Patterns

1. Simple Async Function

import asyncio

async def hello():
    """Basic async function (coroutine)."""
    print("Hello")
    await asyncio.sleep(1)  # Async sleep (non-blocking)
    print("World")
    return "Done"

# Run async function
result = asyncio.run(hello())
print(result)  # "Done"

Key Points:

  • async def defines a coroutine function
  • await suspends execution until awaitable completes
  • asyncio.run() is the entry point for async programs
  • Coroutines must be awaited or scheduled

2. Multiple Concurrent Tasks

import asyncio
import time

async def task(name, duration):
    """Simulate async task."""
    print(f"{name}: Starting (duration: {duration}s)")
    await asyncio.sleep(duration)
    print(f"{name}: Complete")
    return f"{name} result"

async def run_concurrent():
    """Run multiple tasks concurrently."""
    start = time.time()

    # Sequential (slow) - 6 seconds total
    # result1 = await task("Task 1", 3)
    # result2 = await task("Task 2", 2)
    # result3 = await task("Task 3", 1)

    # Concurrent (fast) - 3 seconds total
    results = await asyncio.gather(
        task("Task 1", 3),
        task("Task 2", 2),
        task("Task 3", 1)
    )

    elapsed = time.time() - start
    print(f"Total time: {elapsed:.2f}s")
    print(f"Results: {results}")

asyncio.run(run_concurrent())
# Output: Total time: 3.00s (tasks ran concurrently)

3. Task Creation and Management

import asyncio

async def background_task(name):
    """Long-running background task."""
    for i in range(5):
        print(f"{name}: iteration {i}")
        await asyncio.sleep(1)
    return f"{name} complete"

async def main():
    # Create task (starts immediately)
    task1 = asyncio.create_task(background_task("Task-1"))
    task2 = asyncio.create_task(background_task("Task-2"))

    # Do other work while tasks run
    print("Main: doing other work")
    await asyncio.sleep(2)

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())

4. Error Handling in Async Code

import asyncio

async def risky_operation(fail=False):
    """Operation that might fail."""
    await asyncio.sleep(1)
    if fail:
        raise ValueError("Operation failed")
    return "Success"

async def handle_errors():
    # Individual try/except
    try:
        result = await risky_operation(fail=True)
    except ValueError as e:
        print(f"Caught error: {e}")
        result = "Fallback value"

    # Gather with error handling
    results = await asyncio.gather(
        risky_operation(fail=False),
        risky_operation(fail=True),
        risky_operation(fail=False),
        return_exceptions=True  # Return exceptions instead of raising
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(handle_errors())

Event Loop Fundamentals

1. Event Loop Lifecycle

import asyncio

# Modern approach (Python 3.7+)
async def main():
    print("Main coroutine")
    await asyncio.sleep(1)

asyncio.run(main())  # Creates loop, runs main, closes loop

# Manual loop management (advanced use cases)
async def manual_example():
    loop = asyncio.get_event_loop()

    # Schedule coroutine
    task = loop.create_task(some_coroutine())

    # Schedule callback
    loop.call_later(5, callback_function)

    # Run until complete
    result = await task

    return result

# Get current event loop
async def get_current_loop():
    loop = asyncio.get_running_loop()
    print(f"Loop: {loop}")

    # Schedule callback in event loop
    loop.call_soon(lambda: print("Callback executed"))

    await asyncio.sleep(0)  # Let callback execute

2. Loop Scheduling and Callbacks

import asyncio
from datetime import datetime

def callback(name, loop):
    """Callback function (not async)."""
    print(f"{datetime.now()}: {name} callback executed")

    # Stop loop after callback
    # loop.stop()

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # Schedule immediate callback
    loop.call_soon(callback, "Immediate", loop)

    # Schedule callback after delay
    loop.call_later(2, callback, "Delayed 2s", loop)

    # Schedule callback at specific time
    loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

    # Wait for callbacks to execute
    await asyncio.sleep(5)

asyncio.run(schedule_callbacks())

3. Running Blocking Code

import asyncio
import time

def blocking_io():
    """CPU-intensive or blocking I/O operation."""
    print("Blocking operation started")
    time.sleep(2)  # Blocks thread
    print("Blocking operation complete")
    return "Blocking result"

async def run_in_executor():
    """Run blocking code in thread pool."""
    loop = asyncio.get_running_loop()

    # Run in default executor (thread pool)
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io
    )

    print(f"Result: {result}")

# Run blocking operations concurrently
async def concurrent_blocking():
    loop = asyncio.get_running_loop()

    # These run in thread pool, don't block event loop
    results = await asyncio.gather(
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io)
    )

    print(f"All results: {results}")

asyncio.run(concurrent_blocking())

Asyncio Primitives

1. Locks for Mutual Exclusion

import asyncio

# Shared resource
counter = 0
lock = asyncio.Lock()

async def increment_with_lock(name):
    """Increment counter with lock protection."""
    global counter

    async with lock:
        # Critical section - only one task at a time
        print(f"{name}: acquired lock")
        current = counter
        await asyncio.sleep(0.1)  # Simulate processing
        counter = current + 1
        print(f"{name}: released lock, counter={counter}")

async def increment_without_lock(name):
    """Increment without lock - race condition!"""
    global counter

    current = counter
    await asyncio.sleep(0.1)  # Race condition window
    counter = current + 1
    print(f"{name}: counter={counter}")

async def test_locks():
    global counter

    # Without lock (race condition)
    counter = 0
    await asyncio.gather(
        increment_without_lock("Task-1"),
        increment_without_lock("Task-2"),
        increment_without_lock("Task-3")
    )
    print(f"Without lock: {counter}")  # Often wrong (< 3)

    # With lock (correct)
    counter = 0
    await asyncio.gather(
        increment_with_lock("Task-1"),
        increment_with_lock("Task-2"),
        increment_with_lock("Task-3")
    )
    print(f"With lock: {counter}")  # Always 3

asyncio.run(test_locks())

2. Semaphores for Resource Limiting

import asyncio

# Limit concurrent operations
semaphore = asyncio.Semaphore(2)  # Max 2 concurrent

async def limited_operation(name):
    """Operation limited by semaphore."""
    print(f"{name}: waiting for semaphore")

    async with semaphore:
        print(f"{name}: acquired semaphore")
        await asyncio.sleep(2)  # Simulate work
        print(f"{name}: releasing semaphore")

async def test_semaphore():
    # Create 5 tasks, but only 2 run concurrently
    await asyncio.gather(
        limited_operation("Task-1"),
        limited_operation("Task-2"),
        limited_operation("Task-3"),
        limited_operation("Task-4"),
        limited_operation("Task-5")
    )

asyncio.run(test_semaphore())
# Only 2 tasks hold semaphore at any time

3. Events for Signaling

import asyncio

event = asyncio.Event()

async def waiter(name):
    """Wait for event to be set."""
    print(f"{name}: waiting for event")
    await event.wait()  # Block until event is set
    print(f"{name}: event received!")

async def setter():
    """Set event after delay."""
    await asyncio.sleep(2)
    print("Setter: setting event")
    event.set()  # Wake up all waiters

async def test_event():
    # Create waiters
    await asyncio.gather(
        waiter("Waiter-1"),
        waiter("Waiter-2"),
        waiter("Waiter-3"),
        setter()
    )

asyncio.run(test_event())

4. Queues for Task Distribution

import asyncio
import random

async def producer(queue, name):
    """Produce items and add to queue."""
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name}: produced {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    """Consume items from queue."""
    while True:
        item = await queue.get()  # Block until item available

        if item is None:  # Shutdown signal
            queue.task_done()
            break

        print(f"{name}: consumed {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def test_queue():
    queue = asyncio.Queue(maxsize=10)

    # Create producers and consumers
    await asyncio.gather(
        producer(queue, "Producer-1"),
        producer(queue, "Producer-2"),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
        consumer(queue, "Consumer-3")
    )

    # Wait for all items to be processed
    await queue.join()
    print("All tasks complete")

asyncio.run(test_queue())

5. Condition Variables

import asyncio

condition = asyncio.Condition()
items = []

async def consumer(name):
    """Wait for items to be available."""
    async with condition:
        # Wait until items are available
        await condition.wait_for(lambda: len(items) > 0)

        item = items.pop(0)
        print(f"{name}: consumed {item}")

async def producer(name):
    """Add items and notify consumers."""
    async with condition:
        item = f"{name}-item"
        items.append(item)
        print(f"{name}: produced {item}")

        # Notify one waiting consumer
        condition.notify(n=1)
        # Or notify all: condition.notify_all()

async def test_condition():
    await asyncio.gather(
        consumer("Consumer-1"),
        consumer("Consumer-2"),
        producer("Producer-1"),
        producer("Producer-2")
    )

asyncio.run(test_condition())

Async HTTP with aiohttp

1. Basic HTTP Client

import asyncio
import aiohttp

async def fetch_url(session, url):
    """Fetch single URL."""
    async with session.get(url) as response:
        status = response.status
        text = await response.text()
        return {"url": url, "status": status, "length": len(text)}

async def fetch_multiple_urls():
    """Fetch multiple URLs concurrently."""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/json",
    ]

    async with aiohttp.ClientSession() as session:
        # Concurrent requests
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")

asyncio.run(fetch_multiple_urls())

2. HTTP Client with Error Handling

import asyncio
import aiohttp
from typing import Dict, Any

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Dict[str, Any]:
    """Fetch URL with retry logic."""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                response.raise_for_status()  # Raise for 4xx/5xx
                data = await response.json()
                return {"success": True, "data": data}

        except aiohttp.ClientError as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                return {"success": False, "error": str(e)}

            # Exponential backoff
            await asyncio.sleep(2 ** attempt)

        except asyncio.TimeoutError:
            print(f"Attempt {attempt + 1} timed out")
            if attempt == max_retries - 1:
                return {"success": False, "error": "Timeout"}

            await asyncio.sleep(2 ** attempt)

async def parallel_api_calls():
    """Make parallel API calls with error handling."""
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/status/500",  # Will fail
        "https://httpbin.org/delay/1",
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_with_retry(session, url) for url in urls],
            return_exceptions=True  # Don't stop on errors
        )

        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"{url}: Exception - {result}")
            elif result["success"]:
                print(f"{url}: Success")
            else:
                print(f"{url}: Failed - {result['error']}")

asyncio.run(parallel_api_calls())

3. HTTP Server with aiohttp

from aiohttp import web
import asyncio

async def handle_hello(request):
    """Simple GET handler."""
    name = request.query.get("name", "World")
    return web.json_response({"message": f"Hello, {name}!"})

async def handle_post(request):
    """POST handler with JSON body."""
    data = await request.json()

    # Simulate async processing
    await asyncio.sleep(1)

    return web.json_response({
        "received": data,
        "status": "processed"
    })

async def handle_stream(request):
    """Streaming response."""
    response = web.StreamResponse()
    await response.prepare(request)

    for i in range(10):
        await response.write(f"Chunk {i}\n".encode())
        await asyncio.sleep(0.5)

    await response.write_eof()
    return response

# Create application
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)

# Run server
if __name__ == "__main__":
    web.run_app(app, host="0.0.0.0", port=8080)

4. WebSocket Client

import asyncio
import aiohttp

async def websocket_client():
    """Connect to WebSocket server."""
    url = "wss://echo.websocket.org"

    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(url) as ws:
            # Send messages
            await ws.send_str("Hello WebSocket")
            await ws.send_json({"type": "greeting", "data": "test"})

            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received: {msg.data}")

                    if msg.data == "close":
                        await ws.close()
                        break

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"Error: {ws.exception()}")
                    break

asyncio.run(websocket_client())

Async Database Operations

1. PostgreSQL with asyncpg

import asyncio
import asyncpg

async def database_operations():
    """Async PostgreSQL operations."""
    # Create connection pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="user",
        password="password",
        min_size=5,
        max_size=20
    )

    try:
        # Acquire connection from pool
        async with pool.acquire() as conn:
            # Execute query
            rows = await conn.fetch(
                "SELECT id, name, email FROM users WHERE active = $1",
                True
            )

            for row in rows:
                print(f"User: {row['name']} ({row['email']})")

            # Insert data
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Alice", "alice@example.com"
            )

            # Transaction
            async with conn.transaction():
                await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
                await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")

    finally:
        await pool.close()

asyncio.run(database_operations())

2. MongoDB with motor

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def mongodb_operations():
    """Async MongoDB operations."""
    # Create client
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    try:
        # Insert document
        result = await collection.insert_one({
            "name": "Alice",
            "email": "alice@example.com",
            "age": 30
        })
        print(f"Inserted ID: {result.inserted_id}")

        # Find documents
        cursor = collection.find({"age": {"$gte": 25}})
        async for document in cursor:
            print(f"User: {document['name']}")

        # Update document
        await collection.update_one(
            {"name": "Alice"},
            {"$set": {"age": 31}}
        )

        # Aggregation pipeline
        pipeline = [
            {"$match": {"age": {"$gte": 25}}},
            {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
        ]
        async for result in collection.aggregate(pipeline):
            print(f"Average age: {result['avg_age']}")

    finally:
        client.close()

asyncio.run(mongodb_operations())

3. Connection Pool Pattern

import asyncio
import asyncpg
from typing import Optional

class DatabasePool:
    """Async database connection pool manager."""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        """Create connection pool."""
        self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)

    async def close(self):
        """Close connection pool."""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """Execute query."""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """Fetch multiple rows."""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args):
        """Fetch single row."""
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

# Usage
async def use_pool():
    db = DatabasePool("postgresql://user:pass@localhost/mydb")
    await db.connect()

    try:
        # Execute operations
        rows = await db.fetch("SELECT * FROM users")
        for row in rows:
            print(row)
    finally:
        await db.close()

asyncio.run(use_pool())

FastAPI Async Patterns

1. Async Endpoints

from fastapi import FastAPI, HTTPException
import asyncio
import httpx

app = FastAPI()

@app.get("/")
async def root():
    """Simple async endpoint."""
    return {"message": "Hello World"}

@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
    """Endpoint with async delay."""
    await asyncio.sleep(seconds)
    return {"message": f"Waited {seconds} seconds"}

@app.get("/fetch")
async def fetch_external():
    """Fetch data from external API."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/json")
        return response.json()

@app.get("/parallel")
async def parallel_requests():
    """Make parallel API calls."""
    async with httpx.AsyncClient() as client:
        responses = await asyncio.gather(
            client.get("https://httpbin.org/delay/1"),
            client.get("https://httpbin.org/delay/2"),
            client.get("https://httpbin.org/json")
        )

        return {
            "results": [r.json() for r in responses]
        }

2. Background Tasks

from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def send_email(email: str, message: str):
    """Simulate sending email."""
    print(f"Sending email to {email}")
    await asyncio.sleep(5)  # Simulate slow email service
    print(f"Email sent to {email}: {message}")

@app.post("/send-notification")
async def send_notification(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    """Send notification in background."""
    # Add task to background
    background_tasks.add_task(send_email, email, message)

    # Return immediately
    return {"status": "notification queued"}

# Alternative: manual task creation
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
    """Create background task manually."""
    asyncio.create_task(send_email(email, message))
    return {"status": "notification queued"}

3. Async Dependencies

from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()

# Database pool (global)
db_pool = None

async def get_db():
    """Dependency: database connection."""
    async with db_pool.acquire() as conn:
        yield conn

@app.on_event("startup")
async def startup():
    """Initialize database pool on startup."""
    global db_pool
    db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb"
    )

@app.on_event("shutdown")
async def shutdown():
    """Close database pool on shutdown."""
    await db_pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
    """Get user with async database dependency."""
    user = await conn.fetchrow(
        "SELECT * FROM users WHERE id = $1",
        user_id
    )

    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return dict(user)

4. WebSocket Endpoints

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio

app = FastAPI()

# Active connections
active_connections: List[WebSocket] = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket endpoint."""
    await websocket.accept()
    active_connections.append(websocket)

    try:
        while True:
            # Receive message
            data = await websocket.receive_text()

            # Broadcast to all connections
            for connection in active_connections:
                await connection.send_text(f"Broadcast: {data}")

    except WebSocketDisconnect:
        active_connections.remove(websocket)
        print("Client disconnected")

# Background task to send periodic updates
async def broadcast_updates():
    """Send periodic updates to all clients."""
    while True:
        await asyncio.sleep(5)

        for connection in active_connections:
            try:
                await connection.send_text("Periodic update")
            except:
                active_connections.remove(connection)

@app.on_event("startup")
async def startup():
    """Start background update task."""
    asyncio.create_task(broadcast_updates())

Common Patterns and Best Practices

1. Timeout Handling

import asyncio

async def slow_operation():
    """Slow operation."""
    await asyncio.sleep(10)
    return "Result"

async def with_timeout():
    """Run operation with timeout."""
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
        print(f"Result: {result}")

    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())

2. Cancellation Handling

import asyncio

async def cancellable_task():
    """Task that can be cancelled."""
    try:
        for i in range(10):
            print(f"Working: {i}")
            await asyncio.sleep(1)

        return "Complete"

    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise  # Re-raise to propagate cancellation

async def cancel_example():
    """Example of task cancellation."""
    task = asyncio.create_task(cancellable_task())

    # Let it run for a bit
    await asyncio.sleep(3)

    # Cancel the task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed: task was cancelled")

asyncio.run(cancel_example())

3. Resource Cleanup with Context Managers

import asyncio

class AsyncResource:
    """Async context manager for resource management."""

    async def __aenter__(self):
        """Async setup."""
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async setup
        self.connection = "connected"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async cleanup."""
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async cleanup
        self.connection = None

async def use_resource():
    """Use async resource."""
    async with AsyncResource() as resource:
        print(f"Using resource: {resource.connection}")
        await asyncio.sleep(1)
    # Resource automatically cleaned up

asyncio.run(use_resource())

4. Debouncing and Throttling

import asyncio
from datetime import datetime

class Debouncer:
    """Debounce async function calls."""

    def __init__(self, delay: float):
        self.delay = delay
        self.task = None

    async def call(self, func, *args, **kwargs):
        """Debounced function call."""
        # Cancel previous task
        if self.task:
            self.task.cancel()

        # Create new task
        async def delayed_call():
            await asyncio.sleep(self.delay)
            await func(*args, **kwargs)

        self.task = asyncio.create_task(delayed_call())

async def api_call(query: str):
    """Simulated API call."""
    print(f"{datetime.now()}: API call with query: {query}")

async def debounce_example():
    """Example of debouncing."""
    debouncer = Debouncer(delay=1.0)

    # Rapid calls - only last one executes
    await debouncer.call(api_call, "query1")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query2")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query3")

    # Wait for debounced call
    await asyncio.sleep(2)

asyncio.run(debounce_example())
# Output: Only "query3" API call executes

5. Rate Limiting

import asyncio
from datetime import datetime

class RateLimiter:
    """Limit rate of async operations."""

    def __init__(self, max_calls: int, period: float):
        self.max_calls = max_calls
        self.period = period
        self.semaphore = asyncio.Semaphore(max_calls)
        self.calls = []

    async def __aenter__(self):
        """Acquire rate limit slot."""
        await self.semaphore.acquire()

        now = asyncio.get_event_loop().time()

        # Remove old calls outside period
        self.calls = [t for t in self.calls if now - t < self.period]

        if len(self.calls) >= self.max_calls:
            # Wait until oldest call expires
            sleep_time = self.period - (now - self.calls[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.calls.append(now)
        return self

    async def __aexit__(self, *args):
        """Release semaphore."""
        self.semaphore.release()

async def rate_limited_operation(limiter, name):
    """Operation with rate limiting."""
    async with limiter:
        print(f"{datetime.now()}: {name}")
        await asyncio.sleep(0.1)

async def rate_limit_example():
    """Example of rate limiting."""
    # Max 3 calls per 2 seconds
    limiter = RateLimiter(max_calls=3, period=2.0)

    # Try to make 6 calls
    await asyncio.gather(*[
        rate_limited_operation(limiter, f"Call-{i}")
        for i in range(6)
    ])

asyncio.run(rate_limit_example())

Debugging Async Code

1. Enable Debug Mode

import asyncio
import logging

# Enable asyncio debug mode
asyncio.run(main(), debug=True)

# Or set environment variable:
# PYTHONASYNCIODEBUG=1 python script.py

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def debug_example():
    logger.debug("Starting operation")
    await asyncio.sleep(1)
    logger.debug("Operation complete")

2. Detect Blocking Code

import asyncio
import time

async def problematic_code():
    """Code with blocking operation."""
    print("Starting")

    # BAD: Blocking sleep
    time.sleep(2)  # This blocks the event loop!

    print("Complete")

# Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
# Warning: Executing <Task> took 2.001 seconds

3. Track Pending Tasks

import asyncio

async def track_tasks():
    """Track all pending tasks."""
    # Get all tasks
    tasks = asyncio.all_tasks()

    print(f"Total tasks: {len(tasks)}")
    for task in tasks:
        print(f"  - {task.get_name()}: {task}")

        # Check if task is done
        if task.done():
            try:
                result = task.result()
                print(f"    Result: {result}")
            except Exception as e:
                print(f"    Exception: {e}")

# Create some tasks
async def main():
    task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
    task2 = asyncio.create_task(track_tasks(), name="tracking")

    await task2
    task1.cancel()

asyncio.run(main())

Testing Async Code

1. pytest-asyncio Setup

# test_async.py
import pytest
import asyncio

# Mark test as async
@pytest.mark.asyncio
async def test_async_function():
    """Test async function."""
    result = await some_async_function()
    assert result == "expected"

@pytest.mark.asyncio
async def test_async_http():
    """Test async HTTP client."""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/json") as response:
            assert response.status == 200
            data = await response.json()
            assert "slideshow" in data

# Async fixture
@pytest.fixture
async def async_client():
    """Async test fixture."""
    client = await create_async_client()
    yield client
    await client.close()

@pytest.mark.asyncio
async def test_with_fixture(async_client):
    """Test using async fixture."""
    result = await async_client.fetch_data()
    assert result is not None

2. Mocking Async Functions

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    """Test with async mock."""
    # Create async mock
    mock_func = AsyncMock(return_value="mocked result")

    result = await mock_func()
    assert result == "mocked result"
    mock_func.assert_called_once()

@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
    """Test with patched async function."""
    mock_async.return_value = {"status": "success"}

    result = await some_function_that_calls_async()

    assert result["status"] == "success"
    mock_async.assert_called_once()

Performance Optimization

1. Use asyncio.gather() for Parallelism

import asyncio
import time

async def slow_task(n):
    await asyncio.sleep(1)
    return n * 2

async def optimized():
    """Parallel execution."""
    start = time.time()

    # Sequential (slow) - 5 seconds
    # results = []
    # for i in range(5):
    #     result = await slow_task(i)
    #     results.append(result)

    # Parallel (fast) - 1 second
    results = await asyncio.gather(*[slow_task(i) for i in range(5)])

    elapsed = time.time() - start
    print(f"Time: {elapsed:.2f}s, Results: {results}")

asyncio.run(optimized())

2. Connection Pooling

import asyncio
import aiohttp

# BAD: Create new session for each request
async def bad_pattern():
    for i in range(10):
        async with aiohttp.ClientSession() as session:
            async with session.get("https://httpbin.org/json") as response:
                await response.json()

# GOOD: Reuse session with connection pool
async def good_pattern():
    async with aiohttp.ClientSession() as session:
        tasks = [
            session.get("https://httpbin.org/json")
            for i in range(10)
        ]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            await response.json()

3. Avoid Blocking Operations

import asyncio

# BAD: Blocking I/O in async function
async def bad_file_read():
    with open("large_file.txt") as f:  # Blocks event loop!
        data = f.read()
    return data

# GOOD: Use async file I/O or run in executor
async def good_file_read():
    loop = asyncio.get_running_loop()

    # Run blocking operation in thread pool
    data = await loop.run_in_executor(
        None,
        lambda: open("large_file.txt").read()
    )
    return data

# BETTER: Use aiofiles for async file I/O
import aiofiles

async def better_file_read():
    async with aiofiles.open("large_file.txt") as f:
        data = await f.read()
    return data

Common Pitfalls

❌ Anti-Pattern 1: Not Awaiting Coroutines

# WRONG
async def bad():
    result = async_function()  # Returns coroutine, doesn't execute!
    print(result)  # Prints: <coroutine object>

# CORRECT
async def good():
    result = await async_function()  # Actually executes
    print(result)

❌ Anti-Pattern 2: Blocking the Event Loop

# WRONG
import time

async def bad():
    time.sleep(5)  # Blocks entire event loop!

# CORRECT
async def good():
    await asyncio.sleep(5)  # Non-blocking

❌ Anti-Pattern 3: Not Handling Cancellation

# WRONG
async def bad():
    await asyncio.sleep(10)
    # No cleanup if cancelled

# CORRECT
async def good():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # Cleanup resources
        await cleanup()
        raise  # Re-raise to propagate

❌ Anti-Pattern 4: Creating Event Loop Incorrectly

# WRONG (Python 3.7+)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# CORRECT (Python 3.7+)
asyncio.run(main())

❌ Anti-Pattern 5: Not Closing Resources

# WRONG
async def bad():
    session = aiohttp.ClientSession()
    response = await session.get(url)
    # Session never closed - resource leak!

# CORRECT
async def good():
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        # Session automatically closed

Best Practices

  1. Use asyncio.run() for entry point (Python 3.7+)
  2. Always await coroutines - don't forget await
  3. Use async context managers for resource cleanup
  4. Connection pooling for HTTP and database clients
  5. Handle CancelledError for graceful shutdown
  6. Use asyncio.gather() for parallel operations
  7. Avoid blocking operations in async functions
  8. Use timeouts to prevent hanging operations
  9. Debug mode during development to catch issues
  10. Test async code with pytest-asyncio

Quick Reference

Common Commands

# Run async script
python script.py

# Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py

# Run tests
pytest -v --asyncio-mode=auto

# Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio

Essential Imports

import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any

Resources

Related Skills

When using asyncio, consider these complementary skills:

  • fastapi-local-dev: FastAPI async server patterns and production deployment
  • pytest: Testing async code with pytest-asyncio and fixtures
  • systematic-debugging: Debugging async race conditions and deadlocks

Quick FastAPI Async Patterns (Inlined for Standalone Use)

# FastAPI async endpoint pattern
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

app = FastAPI()

# Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    # Async database query
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Background tasks with asyncio
@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
    # Non-blocking background task
    asyncio.create_task(send_email_async(email))
    return {"status": "email queued"}

Quick pytest-asyncio Patterns (Inlined for Standalone Use)

# Testing async functions with pytest
import pytest
import pytest_asyncio
from httpx import AsyncClient

# Async test fixture
@pytest_asyncio.fixture
async def async_client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

# Async test function
@pytest.mark.asyncio
async def test_get_user(async_client):
    response = await async_client.get("/users/1")
    assert response.status_code == 200
    assert response.json()["id"] == 1

# Testing concurrent operations
@pytest.mark.asyncio
async def test_concurrent_requests():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Run 10 requests concurrently
        responses = await asyncio.gather(
            *[client.get(f"/users/{i}") for i in range(1, 11)]
        )
        assert all(r.status_code == 200 for r in responses)

# Mock async dependencies
@pytest_asyncio.fixture
async def mock_db():
    # Setup mock database
    db = AsyncMock()
    yield db
    # Cleanup

Quick Async Debugging Reference (Inlined for Standalone Use)

Common Async Pitfalls:

  1. Blocking the Event Loop

    # ❌ WRONG: Blocking call in async function
    async def bad_function():
        time.sleep(5)  # Blocks entire event loop!
        return "done"
    
    # ✅ CORRECT: Use asyncio.sleep
    async def good_function():
        await asyncio.sleep(5)  # Releases event loop
        return "done"
    
  2. Debugging Race Conditions

    # Add logging to track execution order
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    async def debug_task(name):
        logging.debug(f"{name}: Starting")
        await asyncio.sleep(1)
        logging.debug(f"{name}: Finished")
        return name
    
    # Run with detailed tracing
    asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
    
  3. Deadlock Detection

    # Use timeout to detect deadlocks
    try:
        result = await asyncio.wait_for(some_async_function(), timeout=5.0)
    except asyncio.TimeoutError:
        logging.error("Deadlock detected: operation timed out")
        # Investigate what's blocking
    
  4. Inspecting Running Tasks

    # Check all pending tasks
    tasks = asyncio.all_tasks()
    for task in tasks:
        print(f"Task: {task.get_name()}, Done: {task.done()}")
        if not task.done():
            print(f"  Current coro: {task.get_coro()}")
    

[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]


Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.