Claude Code Plugins

Community-maintained marketplace

Feedback

python-async-patterns

@clostaunau/holiday-card
0
0

Comprehensive guide to Python async/await patterns, best practices, and anti-patterns. Covers asyncio fundamentals, coroutines, async context managers, task management, common libraries (aiohttp, aiofiles, asyncpg), framework integration (FastAPI, Django), performance considerations, and proper exception handling. Use when reviewing or writing asynchronous Python code.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name python-async-patterns
description Comprehensive guide to Python async/await patterns, best practices, and anti-patterns. Covers asyncio fundamentals, coroutines, async context managers, task management, common libraries (aiohttp, aiofiles, asyncpg), framework integration (FastAPI, Django), performance considerations, and proper exception handling. Use when reviewing or writing asynchronous Python code.
allowed-tools Read, Grep, Glob

Python Async/Await Patterns and Best Practices

Purpose

This skill provides comprehensive guidance on Python asynchronous programming patterns for reviewing and writing async code. It covers async/await syntax, best practices, common patterns, anti-patterns, and framework-specific considerations.

Use this skill when:

  • Reviewing async Python code
  • Writing new async functions or services
  • Refactoring sync code to async
  • Troubleshooting async performance issues
  • Evaluating async library usage
  • Implementing async endpoints in web frameworks

Context

Asynchronous programming in Python enables efficient I/O-bound operations by allowing the event loop to switch between tasks while waiting for I/O operations to complete. However, async code introduces complexity and has specific patterns and pitfalls that must be understood for correct implementation.

Key Principles:

  • Async is beneficial for I/O-bound operations, not CPU-bound tasks
  • The event loop must never be blocked with synchronous operations
  • Coroutines must be properly awaited
  • Resources must be cleaned up correctly in async contexts
  • Mixing sync and async code requires careful consideration

Python Version Requirements:

  • async/await: Python 3.5+
  • asyncio.run(): Python 3.7+
  • asyncio.create_task(): Python 3.7+
  • Task Groups (asyncio.TaskGroup): Python 3.11+
  • Exception Groups: Python 3.11+

Async Fundamentals

1. Async/Await Syntax

Defining Async Functions:

# Async function (coroutine function)
async def fetch_data(url: str) -> dict:
    """Coroutine that fetches data asynchronously."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Regular function (synchronous)
def process_data(data: dict) -> list:
    """Regular synchronous function."""
    return [item['value'] for item in data['items']]

Key Differences:

  • async def creates a coroutine function
  • Calling an async function returns a coroutine object (not the result)
  • Must use await to execute the coroutine and get the result
  • Can only use await inside async def functions

2. Coroutines vs Regular Functions

# Coroutine function
async def async_operation():
    await asyncio.sleep(1)
    return "done"

# This creates a coroutine object but doesn't execute it
coro = async_operation()  # RuntimeWarning: coroutine was never awaited

# Must await to execute
result = await async_operation()  # Correct

# Or run from synchronous code
result = asyncio.run(async_operation())  # Correct (Python 3.7+)

Coroutine Characteristics:

  • Lazy evaluation: not executed until awaited
  • Can be suspended and resumed
  • Return values through await
  • Must be awaited or scheduled for execution

3. Event Loop Fundamentals

The event loop is the core of async execution:

import asyncio

# High-level API (Python 3.7+) - PREFERRED
async def main():
    result = await some_async_operation()
    return result

# Run the event loop
if __name__ == "__main__":
    result = asyncio.run(main())  # Creates, runs, and closes loop

Event Loop Responsibilities:

  • Schedules and executes coroutines
  • Manages I/O operations
  • Handles callbacks and tasks
  • Coordinates concurrent execution

4. Using asyncio.run()

# GOOD: Modern approach (Python 3.7+)
async def main():
    result = await fetch_data()
    processed = await process_data(result)
    return processed

if __name__ == "__main__":
    result = asyncio.run(main())
# AVOID: Manual loop management (legacy, pre-3.7)
loop = asyncio.get_event_loop()
try:
    result = loop.run_until_complete(main())
finally:
    loop.close()

Why prefer asyncio.run():

  • Automatically creates and closes the loop
  • Handles cleanup properly
  • Simpler and less error-prone
  • Cancels remaining tasks on shutdown

Common Async Patterns

1. Async Context Managers

Async context managers use async with for resource management:

class AsyncDatabaseConnection:
    """Example async context manager."""

    async def __aenter__(self):
        """Called when entering async with block."""
        self.connection = await self._connect()
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Called when exiting async with block."""
        await self.connection.close()
        return False  # Don't suppress exceptions

    async def _connect(self):
        """Establish connection."""
        await asyncio.sleep(0.1)  # Simulated async connection
        return {"connected": True}

# Usage
async def query_database():
    async with AsyncDatabaseConnection() as conn:
        # Connection automatically closed after block
        result = await execute_query(conn)
    return result

Common Async Context Managers:

  • aiohttp.ClientSession(): HTTP client sessions
  • aiofiles.open(): Async file operations
  • Database connection pools (asyncpg, motor)
  • Lock primitives (asyncio.Lock())

2. Async Iterators and Generators

class AsyncDataStream:
    """Async iterator for streaming data."""

    def __init__(self, count: int):
        self.count = count
        self.index = 0

    def __aiter__(self):
        """Return the async iterator."""
        return self

    async def __anext__(self):
        """Get next item asynchronously."""
        if self.index >= self.count:
            raise StopAsyncIteration

        await asyncio.sleep(0.1)  # Simulated async operation
        value = f"item_{self.index}"
        self.index += 1
        return value

# Usage
async def process_stream():
    async for item in AsyncDataStream(5):
        print(item)

# Async generator (simpler approach)
async def async_range(count: int):
    """Async generator function."""
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

# Usage
async def use_generator():
    async for value in async_range(5):
        print(value)

3. Async Comprehensions

# Async list comprehension
async def fetch_all_urls(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        # Create tasks for all URLs
        results = [
            await fetch_one(session, url)
            for url in urls
        ]
        return results

# Async generator expression
async def process_stream_data(stream):
    processed = (
        transform(item)
        async for item in stream
        if await validate(item)
    )
    return processed

# Async dict comprehension
async def fetch_user_data(user_ids: list[int]) -> dict[int, dict]:
    return {
        user_id: await fetch_user(user_id)
        for user_id in user_ids
    }

4. Concurrent Task Execution

Using asyncio.gather()

# Run multiple coroutines concurrently
async def fetch_multiple_resources():
    # All tasks run concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_posts(1),
        fetch_comments(1)
    )
    user, posts, comments = results
    return user, posts, comments

# With error handling
async def fetch_with_error_handling():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_posts(1),
        fetch_comments(1),
        return_exceptions=True  # Return exceptions instead of raising
    )

    # Check for exceptions
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")

Using asyncio.create_task()

# Create tasks explicitly for more control
async def fetch_with_tasks():
    # Create tasks (they start executing immediately)
    user_task = asyncio.create_task(fetch_user(1))
    posts_task = asyncio.create_task(fetch_posts(1))
    comments_task = asyncio.create_task(fetch_comments(1))

    # Wait for all tasks
    user = await user_task
    posts = await posts_task
    comments = await comments_task

    return user, posts, comments

# Cancel tasks if needed
async def fetch_with_cancellation():
    task = asyncio.create_task(long_running_operation())

    try:
        result = await asyncio.wait_for(task, timeout=5.0)
        return result
    except asyncio.TimeoutError:
        task.cancel()  # Cancel the task
        raise

Using Task Groups (Python 3.11+)

# Task groups provide better error handling and cancellation
async def fetch_with_task_group():
    async with asyncio.TaskGroup() as tg:
        user_task = tg.create_task(fetch_user(1))
        posts_task = tg.create_task(fetch_posts(1))
        comments_task = tg.create_task(fetch_comments(1))

    # All tasks complete here (or exception raised)
    return user_task.result(), posts_task.result(), comments_task.result()

# If any task fails, all tasks are cancelled
async def task_group_error_handling():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_that_fails())
            tg.create_task(task_that_succeeds())
    except ExceptionGroup as eg:
        # All exceptions collected in ExceptionGroup
        for exc in eg.exceptions:
            print(f"Task failed: {exc}")

5. Task Coordination Patterns

Wait for first completion

# Wait for first task to complete
async def fetch_from_multiple_sources(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_one(session, url))
            for url in urls
        ]

        # Wait for first completion
        done, pending = await asyncio.wait(
            tasks,
            return_when=asyncio.FIRST_COMPLETED
        )

        # Cancel remaining tasks
        for task in pending:
            task.cancel()

        # Return first result
        return done.pop().result()

Semaphore for rate limiting

# Limit concurrent operations
async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_sem(url: str):
        async with semaphore:
            return await fetch_one(url)

    results = await asyncio.gather(*[
        fetch_with_sem(url) for url in urls
    ])
    return results

Best Practices

1. Avoid Blocking the Event Loop

# BAD: Blocking operations in async function
async def bad_async_function():
    # These block the event loop!
    time.sleep(1)  # WRONG
    requests.get(url)  # WRONG
    open('file.txt').read()  # WRONG
    some_cpu_intensive_task()  # WRONG

# GOOD: Use async alternatives
async def good_async_function():
    await asyncio.sleep(1)  # Non-blocking sleep
    async with aiohttp.ClientSession() as session:
        await session.get(url)  # Async HTTP
    async with aiofiles.open('file.txt') as f:
        await f.read()  # Async file I/O
    # For CPU-bound: use asyncio.to_thread()
    result = await asyncio.to_thread(cpu_intensive_task)

2. Using asyncio.to_thread for Blocking I/O

import asyncio
from functools import partial

# For blocking sync operations (Python 3.9+)
async def async_wrapper_for_blocking():
    # Run blocking operation in thread pool
    result = await asyncio.to_thread(
        blocking_operation,
        arg1,
        arg2
    )
    return result

# For CPU-bound tasks
async def process_data_async(data: list):
    # Offload CPU-intensive work to thread
    processed = await asyncio.to_thread(
        cpu_intensive_processing,
        data
    )
    return processed

# With partial for complex arguments
async def complex_blocking_call():
    func = partial(
        blocking_function,
        timeout=30,
        retry=3
    )
    result = await asyncio.to_thread(func)
    return result

3. Proper Exception Handling

# Handle exceptions in individual coroutines
async def fetch_with_exception_handling(url: str):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
    except aiohttp.ClientError as e:
        # Handle specific async HTTP errors
        print(f"HTTP error: {e}")
        raise
    except asyncio.TimeoutError:
        # Handle timeout
        print(f"Timeout fetching {url}")
        raise
    except Exception as e:
        # Handle unexpected errors
        print(f"Unexpected error: {e}")
        raise

# Handle exceptions in gather()
async def fetch_multiple_with_errors(urls: list[str]):
    results = await asyncio.gather(
        *[fetch_with_exception_handling(url) for url in urls],
        return_exceptions=True
    )

    # Process results and exceptions
    successful = []
    failed = []

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            failed.append((url, result))
        else:
            successful.append(result)

    return successful, failed

# Exception groups (Python 3.11+)
async def handle_exception_groups():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task1())
            tg.create_task(task2())
    except* ValueError as eg:
        # Handle all ValueErrors
        print(f"Value errors: {eg.exceptions}")
    except* TypeError as eg:
        # Handle all TypeErrors
        print(f"Type errors: {eg.exceptions}")

4. Cancellation Handling

# Proper cancellation handling
async def cancellable_operation():
    try:
        await long_running_task()
    except asyncio.CancelledError:
        # Clean up resources
        await cleanup()
        # Re-raise to propagate cancellation
        raise

# Shielding from cancellation
async def critical_operation():
    # This won't be cancelled
    await asyncio.shield(important_task())

# Timeout with proper cleanup
async def operation_with_timeout():
    try:
        result = await asyncio.wait_for(
            some_operation(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        # Cleanup happens here
        await cleanup()
        raise

5. Timeout Patterns

# Single operation timeout
async def fetch_with_timeout(url: str, timeout: float = 10.0):
    try:
        async with aiohttp.ClientSession() as session:
            async with asyncio.timeout(timeout):  # Python 3.11+
                async with session.get(url) as response:
                    return await response.json()
    except asyncio.TimeoutError:
        print(f"Timeout after {timeout}s")
        raise

# Legacy timeout (Python < 3.11)
async def fetch_with_timeout_legacy(url: str, timeout: float = 10.0):
    try:
        result = await asyncio.wait_for(
            fetch_one(url),
            timeout=timeout
        )
        return result
    except asyncio.TimeoutError:
        print(f"Timeout after {timeout}s")
        raise

# Multiple timeouts
async def complex_operation_with_timeouts():
    # Per-operation timeouts
    async with asyncio.timeout(30):  # Overall timeout
        result1 = await asyncio.wait_for(op1(), timeout=10)
        result2 = await asyncio.wait_for(op2(), timeout=10)
        result3 = await asyncio.wait_for(op3(), timeout=10)
    return result1, result2, result3

6. Resource Cleanup

# Always use async context managers
async def proper_resource_cleanup():
    # GOOD: Automatic cleanup
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
    # Session and response closed automatically

# For resources without context managers
async def manual_cleanup():
    resource = await acquire_resource()
    try:
        result = await use_resource(resource)
        return result
    finally:
        # Always cleanup
        await resource.close()

# Async cleanup in exception handlers
async def cleanup_on_error():
    resources = []
    try:
        for i in range(10):
            resource = await acquire_resource(i)
            resources.append(resource)
            await use_resource(resource)
    except Exception as e:
        # Cleanup all acquired resources
        await asyncio.gather(
            *[res.close() for res in resources],
            return_exceptions=True
        )
        raise
    finally:
        # Final cleanup
        await asyncio.gather(
            *[res.close() for res in resources],
            return_exceptions=True
        )

Common Libraries

1. aiohttp - Async HTTP Client/Server

import aiohttp
import asyncio

# Client usage
async def fetch_data(url: str) -> dict:
    """Fetch data using aiohttp."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.json()

# With custom headers and timeout
async def fetch_with_options(url: str):
    headers = {"Authorization": "Bearer token"}
    timeout = aiohttp.ClientTimeout(total=30)

    async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
        async with session.get(url) as response:
            return await response.text()

# POST request
async def post_data(url: str, data: dict):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=data) as response:
            return await response.json()

# Concurrent requests with rate limiting
async def fetch_multiple(urls: list[str], max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(session, url):
        async with semaphore:
            async with session.get(url) as response:
                return await response.json()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

2. aiofiles - Async File I/O

import aiofiles

# Read file asynchronously
async def read_file(filepath: str) -> str:
    """Read file contents asynchronously."""
    async with aiofiles.open(filepath, mode='r') as f:
        contents = await f.read()
    return contents

# Write file asynchronously
async def write_file(filepath: str, content: str):
    """Write content to file asynchronously."""
    async with aiofiles.open(filepath, mode='w') as f:
        await f.write(content)

# Read lines
async def read_lines(filepath: str) -> list[str]:
    """Read file line by line."""
    lines = []
    async with aiofiles.open(filepath, mode='r') as f:
        async for line in f:
            lines.append(line.strip())
    return lines

# Process large file
async def process_large_file(filepath: str):
    """Process file line by line without loading all into memory."""
    async with aiofiles.open(filepath, mode='r') as f:
        async for line in f:
            processed = await process_line(line)
            await save_result(processed)

3. asyncpg - Async PostgreSQL Driver

import asyncpg

# Create connection pool
async def create_pool():
    """Create database connection pool."""
    pool = await asyncpg.create_pool(
        host='localhost',
        port=5432,
        user='user',
        password='password',
        database='mydb',
        min_size=10,
        max_size=20
    )
    return pool

# Query with pool
async def fetch_users(pool: asyncpg.Pool):
    """Fetch users from database."""
    async with pool.acquire() as connection:
        rows = await connection.fetch(
            'SELECT id, name, email FROM users WHERE active = $1',
            True
        )
        return [dict(row) for row in rows]

# Transaction
async def create_user_with_profile(pool: asyncpg.Pool, user_data: dict):
    """Create user and profile in transaction."""
    async with pool.acquire() as connection:
        async with connection.transaction():
            user_id = await connection.fetchval(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
                user_data['name'],
                user_data['email']
            )
            await connection.execute(
                'INSERT INTO profiles (user_id, bio) VALUES ($1, $2)',
                user_id,
                user_data['bio']
            )
            return user_id

# Prepared statements for better performance
async def fetch_user_by_id(pool: asyncpg.Pool, user_id: int):
    """Fetch user using prepared statement."""
    async with pool.acquire() as connection:
        stmt = await connection.prepare('SELECT * FROM users WHERE id = $1')
        row = await stmt.fetchrow(user_id)
        return dict(row) if row else None

4. motor - Async MongoDB Driver

from motor.motor_asyncio import AsyncIOMotorClient

# Create client
async def create_mongo_client():
    """Create MongoDB client."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    return client

# Query documents
async def fetch_documents(collection_name: str):
    """Fetch documents from MongoDB."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    cursor = collection.find({'active': True})
    documents = await cursor.to_list(length=100)
    return documents

# Insert document
async def insert_document(collection_name: str, document: dict):
    """Insert document into MongoDB."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    result = await collection.insert_one(document)
    return result.inserted_id

# Update document
async def update_document(collection_name: str, filter_dict: dict, update_dict: dict):
    """Update document in MongoDB."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    result = await collection.update_one(
        filter_dict,
        {'$set': update_dict}
    )
    return result.modified_count

# Async iteration over cursor
async def process_large_collection(collection_name: str):
    """Process large collection with cursor."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    cursor = collection.find({})
    async for document in cursor:
        await process_document(document)

Anti-Patterns to Avoid

1. Not Awaiting Coroutines

# BAD: Coroutine never executed
async def bad_example():
    result = fetch_data()  # RuntimeWarning: coroutine 'fetch_data' was never awaited
    return result

# GOOD: Properly awaited
async def good_example():
    result = await fetch_data()
    return result

2. Mixing Sync and Async Incorrectly

# BAD: Calling async function from sync code without event loop
def sync_function():
    result = await async_function()  # SyntaxError: 'await' outside async function
    return result

# GOOD: Use asyncio.run() to bridge sync/async
def sync_function():
    result = asyncio.run(async_function())
    return result

# BAD: Creating new event loop in async context
async def bad_nested():
    result = asyncio.run(another_async())  # RuntimeError: asyncio.run() cannot be called from a running event loop
    return result

# GOOD: Just await in async context
async def good_nested():
    result = await another_async()
    return result

3. Creating Event Loops Manually

# BAD: Manual loop management (unless necessary)
def bad_approach():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        result = loop.run_until_complete(main())
    finally:
        loop.close()
    return result

# GOOD: Use asyncio.run()
def good_approach():
    result = asyncio.run(main())
    return result

# EXCEPTION: Multiple event loops needed (advanced use case)
# Only create manual loops when you specifically need multiple loops

4. Blocking Operations in Async Functions

# BAD: Blocking the event loop
async def bad_blocking():
    time.sleep(5)  # Blocks entire event loop!
    data = requests.get(url).json()  # Blocks event loop!
    with open('file.txt') as f:
        content = f.read()  # Blocks event loop!
    result = compute_fibonacci(1000000)  # Blocks event loop!
    return result

# GOOD: Use async alternatives
async def good_async():
    await asyncio.sleep(5)  # Non-blocking
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()  # Non-blocking
    async with aiofiles.open('file.txt') as f:
        content = await f.read()  # Non-blocking
    result = await asyncio.to_thread(compute_fibonacci, 1000000)  # Non-blocking
    return result

5. Improper Exception Handling

# BAD: Bare except swallows CancelledError
async def bad_exception_handling():
    try:
        await some_operation()
    except:  # Catches CancelledError, preventing proper cancellation!
        print("Error occurred")

# GOOD: Specific exception handling
async def good_exception_handling():
    try:
        await some_operation()
    except asyncio.CancelledError:
        # Handle cancellation specifically
        await cleanup()
        raise  # Re-raise to propagate
    except Exception as e:
        # Handle other exceptions
        print(f"Error: {e}")
        raise

# BAD: Silencing errors in gather()
async def bad_gather():
    results = await asyncio.gather(
        task1(),
        task2(),
        task3(),
        return_exceptions=True
    )
    return results  # Might contain exceptions that are ignored

# GOOD: Check for exceptions
async def good_gather():
    results = await asyncio.gather(
        task1(),
        task2(),
        task3(),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
            # Handle or raise as appropriate

    return [r for r in results if not isinstance(r, Exception)]

6. Resource Leaks

# BAD: Not closing resources
async def bad_resource_management():
    session = aiohttp.ClientSession()
    response = await session.get(url)
    data = await response.json()
    # Session and response never closed!
    return data

# GOOD: Use context managers
async def good_resource_management():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
    # Automatically closed
    return data

# BAD: Creating session per request
async def bad_session_usage():
    for url in urls:
        async with aiohttp.ClientSession() as session:
            # Creating new session each iteration is wasteful!
            await session.get(url)

# GOOD: Reuse session
async def good_session_usage():
    async with aiohttp.ClientSession() as session:
        for url in urls:
            # Reuse same session
            await session.get(url)

7. Not Using Task Groups for Error Propagation (Python 3.11+)

# BAD: Errors might be swallowed
async def bad_error_propagation():
    tasks = [
        asyncio.create_task(task1()),
        asyncio.create_task(task2()),
        asyncio.create_task(task3())
    ]
    # If task2 fails, task1 and task3 keep running
    await asyncio.gather(*tasks)

# GOOD: Use TaskGroup for automatic cancellation
async def good_error_propagation():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task1())
        tg.create_task(task2())
        tg.create_task(task3())
    # If any task fails, all tasks are cancelled

Framework-Specific Patterns

1. FastAPI Async Endpoints

from fastapi import FastAPI, HTTPException, Depends
from typing import List
import asyncpg

app = FastAPI()

# Async endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)):
    """Async endpoint that queries database."""
    async with db.acquire() as connection:
        user = await connection.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        return dict(user)

# Async endpoint with external API call
@app.get("/external-data")
async def fetch_external_data():
    """Fetch data from external API."""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            return await response.json()

# Background tasks (async)
from fastapi import BackgroundTasks

@app.post("/send-notification")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks
):
    """Send notification in background."""
    background_tasks.add_task(send_email_async, email)
    return {"message": "Notification queued"}

async def send_email_async(email: str):
    """Send email asynchronously."""
    async with aiohttp.ClientSession() as session:
        await session.post(
            'https://email-service.com/send',
            json={'to': email}
        )

# Dependency injection with async
async def get_db_pool() -> asyncpg.Pool:
    """Dependency that provides database pool."""
    pool = await asyncpg.create_pool(...)
    try:
        yield pool
    finally:
        await pool.close()

# Lifespan events
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Startup and shutdown events."""
    # Startup
    app.state.db_pool = await asyncpg.create_pool(...)
    app.state.http_session = aiohttp.ClientSession()
    yield
    # Shutdown
    await app.state.db_pool.close()
    await app.state.http_session.close()

app = FastAPI(lifespan=lifespan)

2. Django Async Views (Django 3.1+)

from django.http import JsonResponse
from django.views import View
from asgiref.sync import sync_to_async
import aiohttp

# Async function-based view
async def async_user_list(request):
    """Async view for listing users."""
    # Use sync_to_async for Django ORM
    users = await sync_to_async(list)(
        User.objects.filter(active=True)
    )
    return JsonResponse({
        'users': [
            {'id': u.id, 'name': u.name}
            for u in users
        ]
    })

# Async class-based view
class AsyncUserDetailView(View):
    """Async class-based view."""

    async def get(self, request, user_id):
        """Get user details asynchronously."""
        # Wrap ORM calls with sync_to_async
        user = await sync_to_async(User.objects.get)(id=user_id)
        return JsonResponse({
            'id': user.id,
            'name': user.name,
            'email': user.email
        })

# Async view with external API
async def fetch_external_data_view(request):
    """Fetch data from external API."""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            data = await response.json()
    return JsonResponse(data)

# Django ORM with async (Django 4.1+)
async def async_orm_view(request):
    """Use async ORM operations."""
    # Django 4.1+ supports async ORM
    users = [user async for user in User.objects.filter(active=True)]
    count = await User.objects.filter(active=True).acount()

    return JsonResponse({
        'count': count,
        'users': [{'id': u.id, 'name': u.name} for u in users]
    })

# Middleware (async)
class AsyncMiddleware:
    """Async middleware example."""

    async def __call__(self, request):
        # Pre-processing
        await log_request_async(request)

        # Call next middleware/view
        response = await self.get_response(request)

        # Post-processing
        await log_response_async(response)

        return response

3. Async Database Operations

# asyncpg with connection pool
class AsyncPostgresService:
    """Service for async PostgreSQL operations."""

    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def get_user(self, user_id: int) -> dict | None:
        """Get user by ID."""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'SELECT * FROM users WHERE id = $1',
                user_id
            )
            return dict(row) if row else None

    async def create_user(self, name: str, email: str) -> int:
        """Create new user."""
        async with self.pool.acquire() as conn:
            user_id = await conn.fetchval(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
                name,
                email
            )
            return user_id

    async def update_user(self, user_id: int, **kwargs) -> bool:
        """Update user fields."""
        if not kwargs:
            return False

        set_clause = ', '.join(f"{k} = ${i+2}" for i, k in enumerate(kwargs.keys()))
        query = f'UPDATE users SET {set_clause} WHERE id = $1'

        async with self.pool.acquire() as conn:
            result = await conn.execute(query, user_id, *kwargs.values())
            return result != 'UPDATE 0'

    async def batch_insert_users(self, users: list[tuple[str, str]]) -> int:
        """Batch insert users."""
        async with self.pool.acquire() as conn:
            result = await conn.executemany(
                'INSERT INTO users (name, email) VALUES ($1, $2)',
                users
            )
            return len(users)

# Motor (MongoDB) service
class AsyncMongoService:
    """Service for async MongoDB operations."""

    def __init__(self, client: AsyncIOMotorClient):
        self.db = client.mydb

    async def get_document(self, collection: str, doc_id: str) -> dict | None:
        """Get document by ID."""
        coll = self.db[collection]
        return await coll.find_one({'_id': doc_id})

    async def insert_document(self, collection: str, document: dict) -> str:
        """Insert document."""
        coll = self.db[collection]
        result = await coll.insert_one(document)
        return str(result.inserted_id)

    async def find_documents(
        self,
        collection: str,
        filter_dict: dict,
        limit: int = 100
    ) -> list[dict]:
        """Find documents matching filter."""
        coll = self.db[collection]
        cursor = coll.find(filter_dict).limit(limit)
        return await cursor.to_list(length=limit)

    async def aggregate(self, collection: str, pipeline: list[dict]) -> list[dict]:
        """Run aggregation pipeline."""
        coll = self.db[collection]
        cursor = coll.aggregate(pipeline)
        return await cursor.to_list(length=None)

Performance Considerations

1. When Async Provides Benefits (I/O-Bound)

Async is beneficial for I/O-bound operations:

# GOOD: I/O-bound operations benefit from async
async def io_bound_operations():
    """These operations benefit from async."""

    # Network requests
    async with aiohttp.ClientSession() as session:
        responses = await asyncio.gather(
            session.get('https://api1.example.com'),
            session.get('https://api2.example.com'),
            session.get('https://api3.example.com')
        )

    # Database queries
    async with pool.acquire() as conn:
        users, posts, comments = await asyncio.gather(
            conn.fetch('SELECT * FROM users'),
            conn.fetch('SELECT * FROM posts'),
            conn.fetch('SELECT * FROM comments')
        )

    # File I/O
    async with aiofiles.open('large_file.txt') as f:
        content = await f.read()

    return responses, users, posts, comments

# Example: 100 HTTP requests
async def fetch_many_urls(urls: list[str]):
    """Async is much faster for concurrent I/O."""
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
    # With async: ~2 seconds
    # With sync requests: ~200 seconds (2s per request × 100)

2. When Async Doesn't Help (CPU-Bound)

Async provides no benefit for CPU-bound operations:

# BAD: CPU-bound operations don't benefit from async
async def cpu_bound_async():
    """This gains nothing from being async."""
    result = compute_fibonacci(1000000)  # Blocks event loop!
    return result

# GOOD: Use asyncio.to_thread for CPU-bound work
async def cpu_bound_with_threads():
    """Offload CPU-bound work to threads."""
    result = await asyncio.to_thread(compute_fibonacci, 1000000)
    return result

# BETTER: Use ProcessPoolExecutor for true parallelism
from concurrent.futures import ProcessPoolExecutor

async def cpu_bound_with_processes():
    """Use processes for CPU-bound parallelism."""
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            compute_fibonacci,
            1000000
        )
    return result

# Example: Multiple CPU-bound tasks
async def parallel_cpu_tasks(numbers: list[int]):
    """Run CPU-bound tasks in parallel using processes."""
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        results = await asyncio.gather(*[
            loop.run_in_executor(pool, compute_fibonacci, n)
            for n in numbers
        ])
    return results

3. Measuring Async Performance

import time
import asyncio
from functools import wraps

# Decorator to measure async function performance
def async_timer(func):
    """Measure execution time of async function."""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = await func(*args, **kwargs)
        end = time.perf_counter()
        print(f"{func.__name__} took {end - start:.2f} seconds")
        return result
    return wrapper

# Usage
@async_timer
async def fetch_data(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Context manager for timing blocks
from contextlib import asynccontextmanager

@asynccontextmanager
async def timer(name: str):
    """Time a block of async code."""
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"{name} took {end - start:.2f} seconds")

# Usage
async def timed_operations():
    async with timer("Database queries"):
        users = await fetch_users()
        posts = await fetch_posts()

    async with timer("External API calls"):
        data = await fetch_external_data()

# Compare sync vs async performance
async def compare_performance():
    """Compare sync and async performance."""
    urls = ['https://example.com'] * 100

    # Sync version (for comparison)
    start = time.perf_counter()
    sync_results = [requests.get(url).json() for url in urls]
    sync_time = time.perf_counter() - start
    print(f"Sync: {sync_time:.2f}s")

    # Async version
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        async_results = await asyncio.gather(*tasks)
    async_time = time.perf_counter() - start
    print(f"Async: {async_time:.2f}s")
    print(f"Speedup: {sync_time / async_time:.1f}x")

Code Review Checklist

Use this checklist when reviewing async Python code:

Syntax and Structure

  • All async functions defined with async def
  • All coroutines properly awaited
  • asyncio.run() used for top-level entry point (Python 3.7+)
  • No manual event loop creation (unless necessary)
  • Async context managers used with async with
  • Async iterators used with async for

Concurrency Patterns

  • asyncio.gather() used for concurrent operations
  • asyncio.create_task() used for fire-and-forget tasks
  • Task groups used for error propagation (Python 3.11+)
  • Semaphores used for rate limiting
  • Proper task cancellation handling

Event Loop

  • No blocking operations in async functions
  • asyncio.to_thread() used for blocking I/O (Python 3.9+)
  • No time.sleep() (use asyncio.sleep())
  • No sync HTTP libraries (use aiohttp)
  • No sync file operations (use aiofiles)

Exception Handling

  • Specific exception types caught
  • asyncio.CancelledError handled and re-raised
  • return_exceptions=True used appropriately in gather()
  • Exceptions in tasks not silently ignored
  • Exception groups used for multiple task errors (Python 3.11+)

Resource Management

  • Async context managers used for resources
  • Resources properly closed in finally blocks
  • No session/connection leaks
  • Sessions reused across multiple requests
  • Connection pools used for databases

Timeouts and Cancellation

  • Timeouts set for external operations
  • asyncio.wait_for() or asyncio.timeout() used
  • Tasks can be cancelled gracefully
  • Critical sections shielded from cancellation if needed
  • Cleanup performed on timeout/cancellation

Library Usage

  • aiohttp used instead of requests
  • aiofiles used for async file I/O
  • Async database drivers used (asyncpg, motor)
  • Library-specific best practices followed

Framework Integration

  • FastAPI async endpoints defined correctly
  • Django ORM calls wrapped with sync_to_async (Django < 4.1)
  • Async ORM operations used (Django 4.1+)
  • Dependency injection works with async

Performance

  • Async used for I/O-bound operations
  • CPU-bound work offloaded to threads/processes
  • Unnecessary async overhead avoided
  • Appropriate concurrency limits set

Type Hints

  • Async functions return type hints
  • Coroutine types annotated
  • Awaitable, Coroutine types used where appropriate

Related Skills

  • python-testing: For testing async code with pytest-asyncio
  • python-type-hints: For properly typing async functions and coroutines
  • python-error-handling: For exception handling patterns
  • fastapi-best-practices: For FastAPI-specific async patterns
  • django-best-practices: For Django async views and ORM

References


Version: 1.0 Last Updated: 2025-12-24 Maintainer: Claude Code Skills