| name | python-async-patterns |
| description | Comprehensive guide to Python async/await patterns, best practices, and anti-patterns. Covers asyncio fundamentals, coroutines, async context managers, task management, common libraries (aiohttp, aiofiles, asyncpg), framework integration (FastAPI, Django), performance considerations, and proper exception handling. Use when reviewing or writing asynchronous Python code. |
| allowed-tools | Read, Grep, Glob |
Python Async/Await Patterns and Best Practices
Purpose
This skill provides comprehensive guidance on Python asynchronous programming patterns for reviewing and writing async code. It covers async/await syntax, best practices, common patterns, anti-patterns, and framework-specific considerations.
Use this skill when:
- Reviewing async Python code
- Writing new async functions or services
- Refactoring sync code to async
- Troubleshooting async performance issues
- Evaluating async library usage
- Implementing async endpoints in web frameworks
Context
Asynchronous programming in Python enables efficient I/O-bound operations by allowing the event loop to switch between tasks while waiting for I/O operations to complete. However, async code introduces complexity and has specific patterns and pitfalls that must be understood for correct implementation.
Key Principles:
- Async is beneficial for I/O-bound operations, not CPU-bound tasks
- The event loop must never be blocked with synchronous operations
- Coroutines must be properly awaited
- Resources must be cleaned up correctly in async contexts
- Mixing sync and async code requires careful consideration
Python Version Requirements:
async/await: Python 3.5+asyncio.run(): Python 3.7+asyncio.create_task(): Python 3.7+- Task Groups (
asyncio.TaskGroup): Python 3.11+ - Exception Groups: Python 3.11+
Async Fundamentals
1. Async/Await Syntax
Defining Async Functions:
# Async function (coroutine function)
async def fetch_data(url: str) -> dict:
"""Coroutine that fetches data asynchronously."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# Regular function (synchronous)
def process_data(data: dict) -> list:
"""Regular synchronous function."""
return [item['value'] for item in data['items']]
Key Differences:
async defcreates a coroutine function- Calling an async function returns a coroutine object (not the result)
- Must use
awaitto execute the coroutine and get the result - Can only use
awaitinsideasync deffunctions
2. Coroutines vs Regular Functions
# Coroutine function
async def async_operation():
await asyncio.sleep(1)
return "done"
# This creates a coroutine object but doesn't execute it
coro = async_operation() # RuntimeWarning: coroutine was never awaited
# Must await to execute
result = await async_operation() # Correct
# Or run from synchronous code
result = asyncio.run(async_operation()) # Correct (Python 3.7+)
Coroutine Characteristics:
- Lazy evaluation: not executed until awaited
- Can be suspended and resumed
- Return values through
await - Must be awaited or scheduled for execution
3. Event Loop Fundamentals
The event loop is the core of async execution:
import asyncio
# High-level API (Python 3.7+) - PREFERRED
async def main():
result = await some_async_operation()
return result
# Run the event loop
if __name__ == "__main__":
result = asyncio.run(main()) # Creates, runs, and closes loop
Event Loop Responsibilities:
- Schedules and executes coroutines
- Manages I/O operations
- Handles callbacks and tasks
- Coordinates concurrent execution
4. Using asyncio.run()
# GOOD: Modern approach (Python 3.7+)
async def main():
result = await fetch_data()
processed = await process_data(result)
return processed
if __name__ == "__main__":
result = asyncio.run(main())
# AVOID: Manual loop management (legacy, pre-3.7)
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(main())
finally:
loop.close()
Why prefer asyncio.run():
- Automatically creates and closes the loop
- Handles cleanup properly
- Simpler and less error-prone
- Cancels remaining tasks on shutdown
Common Async Patterns
1. Async Context Managers
Async context managers use async with for resource management:
class AsyncDatabaseConnection:
"""Example async context manager."""
async def __aenter__(self):
"""Called when entering async with block."""
self.connection = await self._connect()
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Called when exiting async with block."""
await self.connection.close()
return False # Don't suppress exceptions
async def _connect(self):
"""Establish connection."""
await asyncio.sleep(0.1) # Simulated async connection
return {"connected": True}
# Usage
async def query_database():
async with AsyncDatabaseConnection() as conn:
# Connection automatically closed after block
result = await execute_query(conn)
return result
Common Async Context Managers:
aiohttp.ClientSession(): HTTP client sessionsaiofiles.open(): Async file operations- Database connection pools (asyncpg, motor)
- Lock primitives (
asyncio.Lock())
2. Async Iterators and Generators
class AsyncDataStream:
"""Async iterator for streaming data."""
def __init__(self, count: int):
self.count = count
self.index = 0
def __aiter__(self):
"""Return the async iterator."""
return self
async def __anext__(self):
"""Get next item asynchronously."""
if self.index >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulated async operation
value = f"item_{self.index}"
self.index += 1
return value
# Usage
async def process_stream():
async for item in AsyncDataStream(5):
print(item)
# Async generator (simpler approach)
async def async_range(count: int):
"""Async generator function."""
for i in range(count):
await asyncio.sleep(0.1)
yield i
# Usage
async def use_generator():
async for value in async_range(5):
print(value)
3. Async Comprehensions
# Async list comprehension
async def fetch_all_urls(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Create tasks for all URLs
results = [
await fetch_one(session, url)
for url in urls
]
return results
# Async generator expression
async def process_stream_data(stream):
processed = (
transform(item)
async for item in stream
if await validate(item)
)
return processed
# Async dict comprehension
async def fetch_user_data(user_ids: list[int]) -> dict[int, dict]:
return {
user_id: await fetch_user(user_id)
for user_id in user_ids
}
4. Concurrent Task Execution
Using asyncio.gather()
# Run multiple coroutines concurrently
async def fetch_multiple_resources():
# All tasks run concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_posts(1),
fetch_comments(1)
)
user, posts, comments = results
return user, posts, comments
# With error handling
async def fetch_with_error_handling():
results = await asyncio.gather(
fetch_user(1),
fetch_posts(1),
fetch_comments(1),
return_exceptions=True # Return exceptions instead of raising
)
# Check for exceptions
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
Using asyncio.create_task()
# Create tasks explicitly for more control
async def fetch_with_tasks():
# Create tasks (they start executing immediately)
user_task = asyncio.create_task(fetch_user(1))
posts_task = asyncio.create_task(fetch_posts(1))
comments_task = asyncio.create_task(fetch_comments(1))
# Wait for all tasks
user = await user_task
posts = await posts_task
comments = await comments_task
return user, posts, comments
# Cancel tasks if needed
async def fetch_with_cancellation():
task = asyncio.create_task(long_running_operation())
try:
result = await asyncio.wait_for(task, timeout=5.0)
return result
except asyncio.TimeoutError:
task.cancel() # Cancel the task
raise
Using Task Groups (Python 3.11+)
# Task groups provide better error handling and cancellation
async def fetch_with_task_group():
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(1))
posts_task = tg.create_task(fetch_posts(1))
comments_task = tg.create_task(fetch_comments(1))
# All tasks complete here (or exception raised)
return user_task.result(), posts_task.result(), comments_task.result()
# If any task fails, all tasks are cancelled
async def task_group_error_handling():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_that_fails())
tg.create_task(task_that_succeeds())
except ExceptionGroup as eg:
# All exceptions collected in ExceptionGroup
for exc in eg.exceptions:
print(f"Task failed: {exc}")
5. Task Coordination Patterns
Wait for first completion
# Wait for first task to complete
async def fetch_from_multiple_sources(urls: list[str]):
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(fetch_one(session, url))
for url in urls
]
# Wait for first completion
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
# Return first result
return done.pop().result()
Semaphore for rate limiting
# Limit concurrent operations
async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_sem(url: str):
async with semaphore:
return await fetch_one(url)
results = await asyncio.gather(*[
fetch_with_sem(url) for url in urls
])
return results
Best Practices
1. Avoid Blocking the Event Loop
# BAD: Blocking operations in async function
async def bad_async_function():
# These block the event loop!
time.sleep(1) # WRONG
requests.get(url) # WRONG
open('file.txt').read() # WRONG
some_cpu_intensive_task() # WRONG
# GOOD: Use async alternatives
async def good_async_function():
await asyncio.sleep(1) # Non-blocking sleep
async with aiohttp.ClientSession() as session:
await session.get(url) # Async HTTP
async with aiofiles.open('file.txt') as f:
await f.read() # Async file I/O
# For CPU-bound: use asyncio.to_thread()
result = await asyncio.to_thread(cpu_intensive_task)
2. Using asyncio.to_thread for Blocking I/O
import asyncio
from functools import partial
# For blocking sync operations (Python 3.9+)
async def async_wrapper_for_blocking():
# Run blocking operation in thread pool
result = await asyncio.to_thread(
blocking_operation,
arg1,
arg2
)
return result
# For CPU-bound tasks
async def process_data_async(data: list):
# Offload CPU-intensive work to thread
processed = await asyncio.to_thread(
cpu_intensive_processing,
data
)
return processed
# With partial for complex arguments
async def complex_blocking_call():
func = partial(
blocking_function,
timeout=30,
retry=3
)
result = await asyncio.to_thread(func)
return result
3. Proper Exception Handling
# Handle exceptions in individual coroutines
async def fetch_with_exception_handling(url: str):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
# Handle specific async HTTP errors
print(f"HTTP error: {e}")
raise
except asyncio.TimeoutError:
# Handle timeout
print(f"Timeout fetching {url}")
raise
except Exception as e:
# Handle unexpected errors
print(f"Unexpected error: {e}")
raise
# Handle exceptions in gather()
async def fetch_multiple_with_errors(urls: list[str]):
results = await asyncio.gather(
*[fetch_with_exception_handling(url) for url in urls],
return_exceptions=True
)
# Process results and exceptions
successful = []
failed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
failed.append((url, result))
else:
successful.append(result)
return successful, failed
# Exception groups (Python 3.11+)
async def handle_exception_groups():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
except* ValueError as eg:
# Handle all ValueErrors
print(f"Value errors: {eg.exceptions}")
except* TypeError as eg:
# Handle all TypeErrors
print(f"Type errors: {eg.exceptions}")
4. Cancellation Handling
# Proper cancellation handling
async def cancellable_operation():
try:
await long_running_task()
except asyncio.CancelledError:
# Clean up resources
await cleanup()
# Re-raise to propagate cancellation
raise
# Shielding from cancellation
async def critical_operation():
# This won't be cancelled
await asyncio.shield(important_task())
# Timeout with proper cleanup
async def operation_with_timeout():
try:
result = await asyncio.wait_for(
some_operation(),
timeout=5.0
)
return result
except asyncio.TimeoutError:
# Cleanup happens here
await cleanup()
raise
5. Timeout Patterns
# Single operation timeout
async def fetch_with_timeout(url: str, timeout: float = 10.0):
try:
async with aiohttp.ClientSession() as session:
async with asyncio.timeout(timeout): # Python 3.11+
async with session.get(url) as response:
return await response.json()
except asyncio.TimeoutError:
print(f"Timeout after {timeout}s")
raise
# Legacy timeout (Python < 3.11)
async def fetch_with_timeout_legacy(url: str, timeout: float = 10.0):
try:
result = await asyncio.wait_for(
fetch_one(url),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f"Timeout after {timeout}s")
raise
# Multiple timeouts
async def complex_operation_with_timeouts():
# Per-operation timeouts
async with asyncio.timeout(30): # Overall timeout
result1 = await asyncio.wait_for(op1(), timeout=10)
result2 = await asyncio.wait_for(op2(), timeout=10)
result3 = await asyncio.wait_for(op3(), timeout=10)
return result1, result2, result3
6. Resource Cleanup
# Always use async context managers
async def proper_resource_cleanup():
# GOOD: Automatic cleanup
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
# Session and response closed automatically
# For resources without context managers
async def manual_cleanup():
resource = await acquire_resource()
try:
result = await use_resource(resource)
return result
finally:
# Always cleanup
await resource.close()
# Async cleanup in exception handlers
async def cleanup_on_error():
resources = []
try:
for i in range(10):
resource = await acquire_resource(i)
resources.append(resource)
await use_resource(resource)
except Exception as e:
# Cleanup all acquired resources
await asyncio.gather(
*[res.close() for res in resources],
return_exceptions=True
)
raise
finally:
# Final cleanup
await asyncio.gather(
*[res.close() for res in resources],
return_exceptions=True
)
Common Libraries
1. aiohttp - Async HTTP Client/Server
import aiohttp
import asyncio
# Client usage
async def fetch_data(url: str) -> dict:
"""Fetch data using aiohttp."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
# With custom headers and timeout
async def fetch_with_options(url: str):
headers = {"Authorization": "Bearer token"}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.get(url) as response:
return await response.text()
# POST request
async def post_data(url: str, data: dict):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
return await response.json()
# Concurrent requests with rate limiting
async def fetch_multiple(urls: list[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.json()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
2. aiofiles - Async File I/O
import aiofiles
# Read file asynchronously
async def read_file(filepath: str) -> str:
"""Read file contents asynchronously."""
async with aiofiles.open(filepath, mode='r') as f:
contents = await f.read()
return contents
# Write file asynchronously
async def write_file(filepath: str, content: str):
"""Write content to file asynchronously."""
async with aiofiles.open(filepath, mode='w') as f:
await f.write(content)
# Read lines
async def read_lines(filepath: str) -> list[str]:
"""Read file line by line."""
lines = []
async with aiofiles.open(filepath, mode='r') as f:
async for line in f:
lines.append(line.strip())
return lines
# Process large file
async def process_large_file(filepath: str):
"""Process file line by line without loading all into memory."""
async with aiofiles.open(filepath, mode='r') as f:
async for line in f:
processed = await process_line(line)
await save_result(processed)
3. asyncpg - Async PostgreSQL Driver
import asyncpg
# Create connection pool
async def create_pool():
"""Create database connection pool."""
pool = await asyncpg.create_pool(
host='localhost',
port=5432,
user='user',
password='password',
database='mydb',
min_size=10,
max_size=20
)
return pool
# Query with pool
async def fetch_users(pool: asyncpg.Pool):
"""Fetch users from database."""
async with pool.acquire() as connection:
rows = await connection.fetch(
'SELECT id, name, email FROM users WHERE active = $1',
True
)
return [dict(row) for row in rows]
# Transaction
async def create_user_with_profile(pool: asyncpg.Pool, user_data: dict):
"""Create user and profile in transaction."""
async with pool.acquire() as connection:
async with connection.transaction():
user_id = await connection.fetchval(
'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
user_data['name'],
user_data['email']
)
await connection.execute(
'INSERT INTO profiles (user_id, bio) VALUES ($1, $2)',
user_id,
user_data['bio']
)
return user_id
# Prepared statements for better performance
async def fetch_user_by_id(pool: asyncpg.Pool, user_id: int):
"""Fetch user using prepared statement."""
async with pool.acquire() as connection:
stmt = await connection.prepare('SELECT * FROM users WHERE id = $1')
row = await stmt.fetchrow(user_id)
return dict(row) if row else None
4. motor - Async MongoDB Driver
from motor.motor_asyncio import AsyncIOMotorClient
# Create client
async def create_mongo_client():
"""Create MongoDB client."""
client = AsyncIOMotorClient('mongodb://localhost:27017')
return client
# Query documents
async def fetch_documents(collection_name: str):
"""Fetch documents from MongoDB."""
client = AsyncIOMotorClient('mongodb://localhost:27017')
db = client.mydb
collection = db[collection_name]
cursor = collection.find({'active': True})
documents = await cursor.to_list(length=100)
return documents
# Insert document
async def insert_document(collection_name: str, document: dict):
"""Insert document into MongoDB."""
client = AsyncIOMotorClient('mongodb://localhost:27017')
db = client.mydb
collection = db[collection_name]
result = await collection.insert_one(document)
return result.inserted_id
# Update document
async def update_document(collection_name: str, filter_dict: dict, update_dict: dict):
"""Update document in MongoDB."""
client = AsyncIOMotorClient('mongodb://localhost:27017')
db = client.mydb
collection = db[collection_name]
result = await collection.update_one(
filter_dict,
{'$set': update_dict}
)
return result.modified_count
# Async iteration over cursor
async def process_large_collection(collection_name: str):
"""Process large collection with cursor."""
client = AsyncIOMotorClient('mongodb://localhost:27017')
db = client.mydb
collection = db[collection_name]
cursor = collection.find({})
async for document in cursor:
await process_document(document)
Anti-Patterns to Avoid
1. Not Awaiting Coroutines
# BAD: Coroutine never executed
async def bad_example():
result = fetch_data() # RuntimeWarning: coroutine 'fetch_data' was never awaited
return result
# GOOD: Properly awaited
async def good_example():
result = await fetch_data()
return result
2. Mixing Sync and Async Incorrectly
# BAD: Calling async function from sync code without event loop
def sync_function():
result = await async_function() # SyntaxError: 'await' outside async function
return result
# GOOD: Use asyncio.run() to bridge sync/async
def sync_function():
result = asyncio.run(async_function())
return result
# BAD: Creating new event loop in async context
async def bad_nested():
result = asyncio.run(another_async()) # RuntimeError: asyncio.run() cannot be called from a running event loop
return result
# GOOD: Just await in async context
async def good_nested():
result = await another_async()
return result
3. Creating Event Loops Manually
# BAD: Manual loop management (unless necessary)
def bad_approach():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(main())
finally:
loop.close()
return result
# GOOD: Use asyncio.run()
def good_approach():
result = asyncio.run(main())
return result
# EXCEPTION: Multiple event loops needed (advanced use case)
# Only create manual loops when you specifically need multiple loops
4. Blocking Operations in Async Functions
# BAD: Blocking the event loop
async def bad_blocking():
time.sleep(5) # Blocks entire event loop!
data = requests.get(url).json() # Blocks event loop!
with open('file.txt') as f:
content = f.read() # Blocks event loop!
result = compute_fibonacci(1000000) # Blocks event loop!
return result
# GOOD: Use async alternatives
async def good_async():
await asyncio.sleep(5) # Non-blocking
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json() # Non-blocking
async with aiofiles.open('file.txt') as f:
content = await f.read() # Non-blocking
result = await asyncio.to_thread(compute_fibonacci, 1000000) # Non-blocking
return result
5. Improper Exception Handling
# BAD: Bare except swallows CancelledError
async def bad_exception_handling():
try:
await some_operation()
except: # Catches CancelledError, preventing proper cancellation!
print("Error occurred")
# GOOD: Specific exception handling
async def good_exception_handling():
try:
await some_operation()
except asyncio.CancelledError:
# Handle cancellation specifically
await cleanup()
raise # Re-raise to propagate
except Exception as e:
# Handle other exceptions
print(f"Error: {e}")
raise
# BAD: Silencing errors in gather()
async def bad_gather():
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True
)
return results # Might contain exceptions that are ignored
# GOOD: Check for exceptions
async def good_gather():
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
# Handle or raise as appropriate
return [r for r in results if not isinstance(r, Exception)]
6. Resource Leaks
# BAD: Not closing resources
async def bad_resource_management():
session = aiohttp.ClientSession()
response = await session.get(url)
data = await response.json()
# Session and response never closed!
return data
# GOOD: Use context managers
async def good_resource_management():
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
# Automatically closed
return data
# BAD: Creating session per request
async def bad_session_usage():
for url in urls:
async with aiohttp.ClientSession() as session:
# Creating new session each iteration is wasteful!
await session.get(url)
# GOOD: Reuse session
async def good_session_usage():
async with aiohttp.ClientSession() as session:
for url in urls:
# Reuse same session
await session.get(url)
7. Not Using Task Groups for Error Propagation (Python 3.11+)
# BAD: Errors might be swallowed
async def bad_error_propagation():
tasks = [
asyncio.create_task(task1()),
asyncio.create_task(task2()),
asyncio.create_task(task3())
]
# If task2 fails, task1 and task3 keep running
await asyncio.gather(*tasks)
# GOOD: Use TaskGroup for automatic cancellation
async def good_error_propagation():
async with asyncio.TaskGroup() as tg:
tg.create_task(task1())
tg.create_task(task2())
tg.create_task(task3())
# If any task fails, all tasks are cancelled
Framework-Specific Patterns
1. FastAPI Async Endpoints
from fastapi import FastAPI, HTTPException, Depends
from typing import List
import asyncpg
app = FastAPI()
# Async endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)):
"""Async endpoint that queries database."""
async with db.acquire() as connection:
user = await connection.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
# Async endpoint with external API call
@app.get("/external-data")
async def fetch_external_data():
"""Fetch data from external API."""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
return await response.json()
# Background tasks (async)
from fastapi import BackgroundTasks
@app.post("/send-notification")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
"""Send notification in background."""
background_tasks.add_task(send_email_async, email)
return {"message": "Notification queued"}
async def send_email_async(email: str):
"""Send email asynchronously."""
async with aiohttp.ClientSession() as session:
await session.post(
'https://email-service.com/send',
json={'to': email}
)
# Dependency injection with async
async def get_db_pool() -> asyncpg.Pool:
"""Dependency that provides database pool."""
pool = await asyncpg.create_pool(...)
try:
yield pool
finally:
await pool.close()
# Lifespan events
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown events."""
# Startup
app.state.db_pool = await asyncpg.create_pool(...)
app.state.http_session = aiohttp.ClientSession()
yield
# Shutdown
await app.state.db_pool.close()
await app.state.http_session.close()
app = FastAPI(lifespan=lifespan)
2. Django Async Views (Django 3.1+)
from django.http import JsonResponse
from django.views import View
from asgiref.sync import sync_to_async
import aiohttp
# Async function-based view
async def async_user_list(request):
"""Async view for listing users."""
# Use sync_to_async for Django ORM
users = await sync_to_async(list)(
User.objects.filter(active=True)
)
return JsonResponse({
'users': [
{'id': u.id, 'name': u.name}
for u in users
]
})
# Async class-based view
class AsyncUserDetailView(View):
"""Async class-based view."""
async def get(self, request, user_id):
"""Get user details asynchronously."""
# Wrap ORM calls with sync_to_async
user = await sync_to_async(User.objects.get)(id=user_id)
return JsonResponse({
'id': user.id,
'name': user.name,
'email': user.email
})
# Async view with external API
async def fetch_external_data_view(request):
"""Fetch data from external API."""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
return JsonResponse(data)
# Django ORM with async (Django 4.1+)
async def async_orm_view(request):
"""Use async ORM operations."""
# Django 4.1+ supports async ORM
users = [user async for user in User.objects.filter(active=True)]
count = await User.objects.filter(active=True).acount()
return JsonResponse({
'count': count,
'users': [{'id': u.id, 'name': u.name} for u in users]
})
# Middleware (async)
class AsyncMiddleware:
"""Async middleware example."""
async def __call__(self, request):
# Pre-processing
await log_request_async(request)
# Call next middleware/view
response = await self.get_response(request)
# Post-processing
await log_response_async(response)
return response
3. Async Database Operations
# asyncpg with connection pool
class AsyncPostgresService:
"""Service for async PostgreSQL operations."""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def get_user(self, user_id: int) -> dict | None:
"""Get user by ID."""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1',
user_id
)
return dict(row) if row else None
async def create_user(self, name: str, email: str) -> int:
"""Create new user."""
async with self.pool.acquire() as conn:
user_id = await conn.fetchval(
'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
name,
email
)
return user_id
async def update_user(self, user_id: int, **kwargs) -> bool:
"""Update user fields."""
if not kwargs:
return False
set_clause = ', '.join(f"{k} = ${i+2}" for i, k in enumerate(kwargs.keys()))
query = f'UPDATE users SET {set_clause} WHERE id = $1'
async with self.pool.acquire() as conn:
result = await conn.execute(query, user_id, *kwargs.values())
return result != 'UPDATE 0'
async def batch_insert_users(self, users: list[tuple[str, str]]) -> int:
"""Batch insert users."""
async with self.pool.acquire() as conn:
result = await conn.executemany(
'INSERT INTO users (name, email) VALUES ($1, $2)',
users
)
return len(users)
# Motor (MongoDB) service
class AsyncMongoService:
"""Service for async MongoDB operations."""
def __init__(self, client: AsyncIOMotorClient):
self.db = client.mydb
async def get_document(self, collection: str, doc_id: str) -> dict | None:
"""Get document by ID."""
coll = self.db[collection]
return await coll.find_one({'_id': doc_id})
async def insert_document(self, collection: str, document: dict) -> str:
"""Insert document."""
coll = self.db[collection]
result = await coll.insert_one(document)
return str(result.inserted_id)
async def find_documents(
self,
collection: str,
filter_dict: dict,
limit: int = 100
) -> list[dict]:
"""Find documents matching filter."""
coll = self.db[collection]
cursor = coll.find(filter_dict).limit(limit)
return await cursor.to_list(length=limit)
async def aggregate(self, collection: str, pipeline: list[dict]) -> list[dict]:
"""Run aggregation pipeline."""
coll = self.db[collection]
cursor = coll.aggregate(pipeline)
return await cursor.to_list(length=None)
Performance Considerations
1. When Async Provides Benefits (I/O-Bound)
Async is beneficial for I/O-bound operations:
# GOOD: I/O-bound operations benefit from async
async def io_bound_operations():
"""These operations benefit from async."""
# Network requests
async with aiohttp.ClientSession() as session:
responses = await asyncio.gather(
session.get('https://api1.example.com'),
session.get('https://api2.example.com'),
session.get('https://api3.example.com')
)
# Database queries
async with pool.acquire() as conn:
users, posts, comments = await asyncio.gather(
conn.fetch('SELECT * FROM users'),
conn.fetch('SELECT * FROM posts'),
conn.fetch('SELECT * FROM comments')
)
# File I/O
async with aiofiles.open('large_file.txt') as f:
content = await f.read()
return responses, users, posts, comments
# Example: 100 HTTP requests
async def fetch_many_urls(urls: list[str]):
"""Async is much faster for concurrent I/O."""
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
# With async: ~2 seconds
# With sync requests: ~200 seconds (2s per request × 100)
2. When Async Doesn't Help (CPU-Bound)
Async provides no benefit for CPU-bound operations:
# BAD: CPU-bound operations don't benefit from async
async def cpu_bound_async():
"""This gains nothing from being async."""
result = compute_fibonacci(1000000) # Blocks event loop!
return result
# GOOD: Use asyncio.to_thread for CPU-bound work
async def cpu_bound_with_threads():
"""Offload CPU-bound work to threads."""
result = await asyncio.to_thread(compute_fibonacci, 1000000)
return result
# BETTER: Use ProcessPoolExecutor for true parallelism
from concurrent.futures import ProcessPoolExecutor
async def cpu_bound_with_processes():
"""Use processes for CPU-bound parallelism."""
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
compute_fibonacci,
1000000
)
return result
# Example: Multiple CPU-bound tasks
async def parallel_cpu_tasks(numbers: list[int]):
"""Run CPU-bound tasks in parallel using processes."""
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
results = await asyncio.gather(*[
loop.run_in_executor(pool, compute_fibonacci, n)
for n in numbers
])
return results
3. Measuring Async Performance
import time
import asyncio
from functools import wraps
# Decorator to measure async function performance
def async_timer(func):
"""Measure execution time of async function."""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
result = await func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} took {end - start:.2f} seconds")
return result
return wrapper
# Usage
@async_timer
async def fetch_data(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# Context manager for timing blocks
from contextlib import asynccontextmanager
@asynccontextmanager
async def timer(name: str):
"""Time a block of async code."""
start = time.perf_counter()
yield
end = time.perf_counter()
print(f"{name} took {end - start:.2f} seconds")
# Usage
async def timed_operations():
async with timer("Database queries"):
users = await fetch_users()
posts = await fetch_posts()
async with timer("External API calls"):
data = await fetch_external_data()
# Compare sync vs async performance
async def compare_performance():
"""Compare sync and async performance."""
urls = ['https://example.com'] * 100
# Sync version (for comparison)
start = time.perf_counter()
sync_results = [requests.get(url).json() for url in urls]
sync_time = time.perf_counter() - start
print(f"Sync: {sync_time:.2f}s")
# Async version
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
async_results = await asyncio.gather(*tasks)
async_time = time.perf_counter() - start
print(f"Async: {async_time:.2f}s")
print(f"Speedup: {sync_time / async_time:.1f}x")
Code Review Checklist
Use this checklist when reviewing async Python code:
Syntax and Structure
- All async functions defined with
async def - All coroutines properly awaited
-
asyncio.run()used for top-level entry point (Python 3.7+) - No manual event loop creation (unless necessary)
- Async context managers used with
async with - Async iterators used with
async for
Concurrency Patterns
-
asyncio.gather()used for concurrent operations -
asyncio.create_task()used for fire-and-forget tasks - Task groups used for error propagation (Python 3.11+)
- Semaphores used for rate limiting
- Proper task cancellation handling
Event Loop
- No blocking operations in async functions
-
asyncio.to_thread()used for blocking I/O (Python 3.9+) - No
time.sleep()(useasyncio.sleep()) - No sync HTTP libraries (use
aiohttp) - No sync file operations (use
aiofiles)
Exception Handling
- Specific exception types caught
-
asyncio.CancelledErrorhandled and re-raised -
return_exceptions=Trueused appropriately ingather() - Exceptions in tasks not silently ignored
- Exception groups used for multiple task errors (Python 3.11+)
Resource Management
- Async context managers used for resources
- Resources properly closed in finally blocks
- No session/connection leaks
- Sessions reused across multiple requests
- Connection pools used for databases
Timeouts and Cancellation
- Timeouts set for external operations
-
asyncio.wait_for()orasyncio.timeout()used - Tasks can be cancelled gracefully
- Critical sections shielded from cancellation if needed
- Cleanup performed on timeout/cancellation
Library Usage
-
aiohttpused instead ofrequests -
aiofilesused for async file I/O - Async database drivers used (
asyncpg,motor) - Library-specific best practices followed
Framework Integration
- FastAPI async endpoints defined correctly
- Django ORM calls wrapped with
sync_to_async(Django < 4.1) - Async ORM operations used (Django 4.1+)
- Dependency injection works with async
Performance
- Async used for I/O-bound operations
- CPU-bound work offloaded to threads/processes
- Unnecessary async overhead avoided
- Appropriate concurrency limits set
Type Hints
- Async functions return type hints
- Coroutine types annotated
-
Awaitable,Coroutinetypes used where appropriate
Related Skills
- python-testing: For testing async code with pytest-asyncio
- python-type-hints: For properly typing async functions and coroutines
- python-error-handling: For exception handling patterns
- fastapi-best-practices: For FastAPI-specific async patterns
- django-best-practices: For Django async views and ORM
References
- PEP 492 - Coroutines with async and await syntax
- PEP 3156 - Asynchronous IO Support Rebooted: the "asyncio" Module
- Python asyncio Documentation
- aiohttp Documentation
- Real Python - Async IO in Python
- FastAPI Async Documentation
- Django Async Documentation
Version: 1.0 Last Updated: 2025-12-24 Maintainer: Claude Code Skills