| name | asyncio |
| description | Python asyncio - Modern concurrent programming with async/await, event loops, tasks, coroutines, primitives, aiohttp, and FastAPI async patterns |
| version | 1.0.0 |
| category | toolchain |
| author | Claude MPM Team |
| license | MIT |
| progressive_disclosure | [object Object] |
| context_limit | 700 |
| tags | asyncio, async, concurrency, coroutines, aiohttp, fastapi, async-database, event-loop, tasks |
| requires_tools |
Python asyncio - Async/Await Concurrency
Overview
Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features:
- async/await syntax for readable concurrent code
- Event loop for managing concurrent operations
- Tasks for running multiple coroutines concurrently
- Primitives: locks, semaphores, events, queues
- HTTP client/server with aiohttp
- Database async support (asyncpg, aiomysql, motor)
- FastAPI async endpoints
- WebSocket support
- Background task management
Installation:
# asyncio is built-in (Python 3.7+)
# Async HTTP client
pip install aiohttp
# Async HTTP requests (alternative)
pip install httpx
# Async database drivers
pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB
# FastAPI with async support
pip install fastapi uvicorn[standard]
# Async testing
pip install pytest-asyncio
Basic Async/Await Patterns
1. Simple Async Function
import asyncio
async def hello():
"""Basic async function (coroutine)."""
print("Hello")
await asyncio.sleep(1) # Async sleep (non-blocking)
print("World")
return "Done"
# Run async function
result = asyncio.run(hello())
print(result) # "Done"
Key Points:
async defdefines a coroutine functionawaitsuspends execution until awaitable completesasyncio.run()is the entry point for async programs- Coroutines must be awaited or scheduled
2. Multiple Concurrent Tasks
import asyncio
import time
async def task(name, duration):
"""Simulate async task."""
print(f"{name}: Starting (duration: {duration}s)")
await asyncio.sleep(duration)
print(f"{name}: Complete")
return f"{name} result"
async def run_concurrent():
"""Run multiple tasks concurrently."""
start = time.time()
# Sequential (slow) - 6 seconds total
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# Concurrent (fast) - 3 seconds total
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")
asyncio.run(run_concurrent())
# Output: Total time: 3.00s (tasks ran concurrently)
3. Task Creation and Management
import asyncio
async def background_task(name):
"""Long-running background task."""
for i in range(5):
print(f"{name}: iteration {i}")
await asyncio.sleep(1)
return f"{name} complete"
async def main():
# Create task (starts immediately)
task1 = asyncio.create_task(background_task("Task-1"))
task2 = asyncio.create_task(background_task("Task-2"))
# Do other work while tasks run
print("Main: doing other work")
await asyncio.sleep(2)
# Wait for tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
4. Error Handling in Async Code
import asyncio
async def risky_operation(fail=False):
"""Operation that might fail."""
await asyncio.sleep(1)
if fail:
raise ValueError("Operation failed")
return "Success"
async def handle_errors():
# Individual try/except
try:
result = await risky_operation(fail=True)
except ValueError as e:
print(f"Caught error: {e}")
result = "Fallback value"
# Gather with error handling
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # Return exceptions instead of raising
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(handle_errors())
Event Loop Fundamentals
1. Event Loop Lifecycle
import asyncio
# Modern approach (Python 3.7+)
async def main():
print("Main coroutine")
await asyncio.sleep(1)
asyncio.run(main()) # Creates loop, runs main, closes loop
# Manual loop management (advanced use cases)
async def manual_example():
loop = asyncio.get_event_loop()
# Schedule coroutine
task = loop.create_task(some_coroutine())
# Schedule callback
loop.call_later(5, callback_function)
# Run until complete
result = await task
return result
# Get current event loop
async def get_current_loop():
loop = asyncio.get_running_loop()
print(f"Loop: {loop}")
# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))
await asyncio.sleep(0) # Let callback execute
2. Loop Scheduling and Callbacks
import asyncio
from datetime import datetime
def callback(name, loop):
"""Callback function (not async)."""
print(f"{datetime.now()}: {name} callback executed")
# Stop loop after callback
# loop.stop()
async def schedule_callbacks():
loop = asyncio.get_running_loop()
# Schedule immediate callback
loop.call_soon(callback, "Immediate", loop)
# Schedule callback after delay
loop.call_later(2, callback, "Delayed 2s", loop)
# Schedule callback at specific time
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# Wait for callbacks to execute
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())
3. Running Blocking Code
import asyncio
import time
def blocking_io():
"""CPU-intensive or blocking I/O operation."""
print("Blocking operation started")
time.sleep(2) # Blocks thread
print("Blocking operation complete")
return "Blocking result"
async def run_in_executor():
"""Run blocking code in thread pool."""
loop = asyncio.get_running_loop()
# Run in default executor (thread pool)
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
print(f"Result: {result}")
# Run blocking operations concurrently
async def concurrent_blocking():
loop = asyncio.get_running_loop()
# These run in thread pool, don't block event loop
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"All results: {results}")
asyncio.run(concurrent_blocking())
Asyncio Primitives
1. Locks for Mutual Exclusion
import asyncio
# Shared resource
counter = 0
lock = asyncio.Lock()
async def increment_with_lock(name):
"""Increment counter with lock protection."""
global counter
async with lock:
# Critical section - only one task at a time
print(f"{name}: acquired lock")
current = counter
await asyncio.sleep(0.1) # Simulate processing
counter = current + 1
print(f"{name}: released lock, counter={counter}")
async def increment_without_lock(name):
"""Increment without lock - race condition!"""
global counter
current = counter
await asyncio.sleep(0.1) # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks():
global counter
# Without lock (race condition)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"Without lock: {counter}") # Often wrong (< 3)
# With lock (correct)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"With lock: {counter}") # Always 3
asyncio.run(test_locks())
2. Semaphores for Resource Limiting
import asyncio
# Limit concurrent operations
semaphore = asyncio.Semaphore(2) # Max 2 concurrent
async def limited_operation(name):
"""Operation limited by semaphore."""
print(f"{name}: waiting for semaphore")
async with semaphore:
print(f"{name}: acquired semaphore")
await asyncio.sleep(2) # Simulate work
print(f"{name}: releasing semaphore")
async def test_semaphore():
# Create 5 tasks, but only 2 run concurrently
await asyncio.gather(
limited_operation("Task-1"),
limited_operation("Task-2"),
limited_operation("Task-3"),
limited_operation("Task-4"),
limited_operation("Task-5")
)
asyncio.run(test_semaphore())
# Only 2 tasks hold semaphore at any time
3. Events for Signaling
import asyncio
event = asyncio.Event()
async def waiter(name):
"""Wait for event to be set."""
print(f"{name}: waiting for event")
await event.wait() # Block until event is set
print(f"{name}: event received!")
async def setter():
"""Set event after delay."""
await asyncio.sleep(2)
print("Setter: setting event")
event.set() # Wake up all waiters
async def test_event():
# Create waiters
await asyncio.gather(
waiter("Waiter-1"),
waiter("Waiter-2"),
waiter("Waiter-3"),
setter()
)
asyncio.run(test_event())
4. Queues for Task Distribution
import asyncio
import random
async def producer(queue, name):
"""Produce items and add to queue."""
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name}: produced {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# Signal completion
await queue.put(None)
async def consumer(queue, name):
"""Consume items from queue."""
while True:
item = await queue.get() # Block until item available
if item is None: # Shutdown signal
queue.task_done()
break
print(f"{name}: consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# Wait for all items to be processed
await queue.join()
print("All tasks complete")
asyncio.run(test_queue())
5. Condition Variables
import asyncio
condition = asyncio.Condition()
items = []
async def consumer(name):
"""Wait for items to be available."""
async with condition:
# Wait until items are available
await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: consumed {item}")
async def producer(name):
"""Add items and notify consumers."""
async with condition:
item = f"{name}-item"
items.append(item)
print(f"{name}: produced {item}")
# Notify one waiting consumer
condition.notify(n=1)
# Or notify all: condition.notify_all()
async def test_condition():
await asyncio.gather(
consumer("Consumer-1"),
consumer("Consumer-2"),
producer("Producer-1"),
producer("Producer-2")
)
asyncio.run(test_condition())
Async HTTP with aiohttp
1. Basic HTTP Client
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch single URL."""
async with session.get(url) as response:
status = response.status
text = await response.text()
return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls():
"""Fetch multiple URLs concurrently."""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
]
async with aiohttp.ClientSession() as session:
# Concurrent requests
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(fetch_multiple_urls())
2. HTTP Client with Error Handling
import asyncio
import aiohttp
from typing import Dict, Any
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""Fetch URL with retry logic."""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status() # Raise for 4xx/5xx
data = await response.json()
return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
return {"success": False, "error": "Timeout"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls():
"""Make parallel API calls with error handling."""
urls = [
"https://httpbin.org/json",
"https://httpbin.org/status/500", # Will fail
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # Don't stop on errors
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Exception - {result}")
elif result["success"]:
print(f"{url}: Success")
else:
print(f"{url}: Failed - {result['error']}")
asyncio.run(parallel_api_calls())
3. HTTP Server with aiohttp
from aiohttp import web
import asyncio
async def handle_hello(request):
"""Simple GET handler."""
name = request.query.get("name", "World")
return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request):
"""POST handler with JSON body."""
data = await request.json()
# Simulate async processing
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request):
"""Streaming response."""
response = web.StreamResponse()
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
# Create application
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)
# Run server
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=8080)
4. WebSocket Client
import asyncio
import aiohttp
async def websocket_client():
"""Connect to WebSocket server."""
url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# Send messages
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
break
asyncio.run(websocket_client())
Async Database Operations
1. PostgreSQL with asyncpg
import asyncio
import asyncpg
async def database_operations():
"""Async PostgreSQL operations."""
# Create connection pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20
)
try:
# Acquire connection from pool
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())
2. MongoDB with motor
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations():
"""Async MongoDB operations."""
# Create client
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
try:
# Insert document
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"Inserted ID: {result.inserted_id}")
# Find documents
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"User: {document['name']}")
# Update document
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# Aggregation pipeline
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"Average age: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())
3. Connection Pool Pattern
import asyncio
import asyncpg
from typing import Optional
class DatabasePool:
"""Async database connection pool manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Create connection pool."""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""Execute query."""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""Fetch multiple rows."""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""Fetch single row."""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
# Usage
async def use_pool():
db = DatabasePool("postgresql://user:pass@localhost/mydb")
await db.connect()
try:
# Execute operations
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()
asyncio.run(use_pool())
FastAPI Async Patterns
1. Async Endpoints
from fastapi import FastAPI, HTTPException
import asyncio
import httpx
app = FastAPI()
@app.get("/")
async def root():
"""Simple async endpoint."""
return {"message": "Hello World"}
@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
"""Endpoint with async delay."""
await asyncio.sleep(seconds)
return {"message": f"Waited {seconds} seconds"}
@app.get("/fetch")
async def fetch_external():
"""Fetch data from external API."""
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/json")
return response.json()
@app.get("/parallel")
async def parallel_requests():
"""Make parallel API calls."""
async with httpx.AsyncClient() as client:
responses = await asyncio.gather(
client.get("https://httpbin.org/delay/1"),
client.get("https://httpbin.org/delay/2"),
client.get("https://httpbin.org/json")
)
return {
"results": [r.json() for r in responses]
}
2. Background Tasks
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(email: str, message: str):
"""Simulate sending email."""
print(f"Sending email to {email}")
await asyncio.sleep(5) # Simulate slow email service
print(f"Email sent to {email}: {message}")
@app.post("/send-notification")
async def send_notification(
email: str,
message: str,
background_tasks: BackgroundTasks
):
"""Send notification in background."""
# Add task to background
background_tasks.add_task(send_email, email, message)
# Return immediately
return {"status": "notification queued"}
# Alternative: manual task creation
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
"""Create background task manually."""
asyncio.create_task(send_email(email, message))
return {"status": "notification queued"}
3. Async Dependencies
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()
# Database pool (global)
db_pool = None
async def get_db():
"""Dependency: database connection."""
async with db_pool.acquire() as conn:
yield conn
@app.on_event("startup")
async def startup():
"""Initialize database pool on startup."""
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb"
)
@app.on_event("shutdown")
async def shutdown():
"""Close database pool on shutdown."""
await db_pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
"""Get user with async database dependency."""
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)
4. WebSocket Endpoints
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
app = FastAPI()
# Active connections
active_connections: List[WebSocket] = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint."""
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Broadcast to all connections
for connection in active_connections:
await connection.send_text(f"Broadcast: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("Client disconnected")
# Background task to send periodic updates
async def broadcast_updates():
"""Send periodic updates to all clients."""
while True:
await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("Periodic update")
except:
active_connections.remove(connection)
@app.on_event("startup")
async def startup():
"""Start background update task."""
asyncio.create_task(broadcast_updates())
Common Patterns and Best Practices
1. Timeout Handling
import asyncio
async def slow_operation():
"""Slow operation."""
await asyncio.sleep(10)
return "Result"
async def with_timeout():
"""Run operation with timeout."""
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
2. Cancellation Handling
import asyncio
async def cancellable_task():
"""Task that can be cancelled."""
try:
for i in range(10):
print(f"Working: {i}")
await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise # Re-raise to propagate cancellation
async def cancel_example():
"""Example of task cancellation."""
task = asyncio.create_task(cancellable_task())
# Let it run for a bit
await asyncio.sleep(3)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: task was cancelled")
asyncio.run(cancel_example())
3. Resource Cleanup with Context Managers
import asyncio
class AsyncResource:
"""Async context manager for resource management."""
async def __aenter__(self):
"""Async setup."""
print("Acquiring resource")
await asyncio.sleep(1) # Simulate async setup
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async cleanup."""
print("Releasing resource")
await asyncio.sleep(1) # Simulate async cleanup
self.connection = None
async def use_resource():
"""Use async resource."""
async with AsyncResource() as resource:
print(f"Using resource: {resource.connection}")
await asyncio.sleep(1)
# Resource automatically cleaned up
asyncio.run(use_resource())
4. Debouncing and Throttling
import asyncio
from datetime import datetime
class Debouncer:
"""Debounce async function calls."""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""Debounced function call."""
# Cancel previous task
if self.task:
self.task.cancel()
# Create new task
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str):
"""Simulated API call."""
print(f"{datetime.now()}: API call with query: {query}")
async def debounce_example():
"""Example of debouncing."""
debouncer = Debouncer(delay=1.0)
# Rapid calls - only last one executes
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# Wait for debounced call
await asyncio.sleep(2)
asyncio.run(debounce_example())
# Output: Only "query3" API call executes
5. Rate Limiting
import asyncio
from datetime import datetime
class RateLimiter:
"""Limit rate of async operations."""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""Acquire rate limit slot."""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# Remove old calls outside period
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# Wait until oldest call expires
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""Release semaphore."""
self.semaphore.release()
async def rate_limited_operation(limiter, name):
"""Operation with rate limiting."""
async with limiter:
print(f"{datetime.now()}: {name}")
await asyncio.sleep(0.1)
async def rate_limit_example():
"""Example of rate limiting."""
# Max 3 calls per 2 seconds
limiter = RateLimiter(max_calls=3, period=2.0)
# Try to make 6 calls
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())
Debugging Async Code
1. Enable Debug Mode
import asyncio
import logging
# Enable asyncio debug mode
asyncio.run(main(), debug=True)
# Or set environment variable:
# PYTHONASYNCIODEBUG=1 python script.py
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def debug_example():
logger.debug("Starting operation")
await asyncio.sleep(1)
logger.debug("Operation complete")
2. Detect Blocking Code
import asyncio
import time
async def problematic_code():
"""Code with blocking operation."""
print("Starting")
# BAD: Blocking sleep
time.sleep(2) # This blocks the event loop!
print("Complete")
# Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
# Warning: Executing <Task> took 2.001 seconds
3. Track Pending Tasks
import asyncio
async def track_tasks():
"""Track all pending tasks."""
# Get all tasks
tasks = asyncio.all_tasks()
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# Check if task is done
if task.done():
try:
result = task.result()
print(f" Result: {result}")
except Exception as e:
print(f" Exception: {e}")
# Create some tasks
async def main():
task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
Testing Async Code
1. pytest-asyncio Setup
# test_async.py
import pytest
import asyncio
# Mark test as async
@pytest.mark.asyncio
async def test_async_function():
"""Test async function."""
result = await some_async_function()
assert result == "expected"
@pytest.mark.asyncio
async def test_async_http():
"""Test async HTTP client."""
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
assert response.status == 200
data = await response.json()
assert "slideshow" in data
# Async fixture
@pytest.fixture
async def async_client():
"""Async test fixture."""
client = await create_async_client()
yield client
await client.close()
@pytest.mark.asyncio
async def test_with_fixture(async_client):
"""Test using async fixture."""
result = await async_client.fetch_data()
assert result is not None
2. Mocking Async Functions
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
"""Test with async mock."""
# Create async mock
mock_func = AsyncMock(return_value="mocked result")
result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()
@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
"""Test with patched async function."""
mock_async.return_value = {"status": "success"}
result = await some_function_that_calls_async()
assert result["status"] == "success"
mock_async.assert_called_once()
Performance Optimization
1. Use asyncio.gather() for Parallelism
import asyncio
import time
async def slow_task(n):
await asyncio.sleep(1)
return n * 2
async def optimized():
"""Parallel execution."""
start = time.time()
# Sequential (slow) - 5 seconds
# results = []
# for i in range(5):
# result = await slow_task(i)
# results.append(result)
# Parallel (fast) - 1 second
results = await asyncio.gather(*[slow_task(i) for i in range(5)])
elapsed = time.time() - start
print(f"Time: {elapsed:.2f}s, Results: {results}")
asyncio.run(optimized())
2. Connection Pooling
import asyncio
import aiohttp
# BAD: Create new session for each request
async def bad_pattern():
for i in range(10):
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
await response.json()
# GOOD: Reuse session with connection pool
async def good_pattern():
async with aiohttp.ClientSession() as session:
tasks = [
session.get("https://httpbin.org/json")
for i in range(10)
]
responses = await asyncio.gather(*tasks)
for response in responses:
await response.json()
3. Avoid Blocking Operations
import asyncio
# BAD: Blocking I/O in async function
async def bad_file_read():
with open("large_file.txt") as f: # Blocks event loop!
data = f.read()
return data
# GOOD: Use async file I/O or run in executor
async def good_file_read():
loop = asyncio.get_running_loop()
# Run blocking operation in thread pool
data = await loop.run_in_executor(
None,
lambda: open("large_file.txt").read()
)
return data
# BETTER: Use aiofiles for async file I/O
import aiofiles
async def better_file_read():
async with aiofiles.open("large_file.txt") as f:
data = await f.read()
return data
Common Pitfalls
❌ Anti-Pattern 1: Not Awaiting Coroutines
# WRONG
async def bad():
result = async_function() # Returns coroutine, doesn't execute!
print(result) # Prints: <coroutine object>
# CORRECT
async def good():
result = await async_function() # Actually executes
print(result)
❌ Anti-Pattern 2: Blocking the Event Loop
# WRONG
import time
async def bad():
time.sleep(5) # Blocks entire event loop!
# CORRECT
async def good():
await asyncio.sleep(5) # Non-blocking
❌ Anti-Pattern 3: Not Handling Cancellation
# WRONG
async def bad():
await asyncio.sleep(10)
# No cleanup if cancelled
# CORRECT
async def good():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# Cleanup resources
await cleanup()
raise # Re-raise to propagate
❌ Anti-Pattern 4: Creating Event Loop Incorrectly
# WRONG (Python 3.7+)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# CORRECT (Python 3.7+)
asyncio.run(main())
❌ Anti-Pattern 5: Not Closing Resources
# WRONG
async def bad():
session = aiohttp.ClientSession()
response = await session.get(url)
# Session never closed - resource leak!
# CORRECT
async def good():
async with aiohttp.ClientSession() as session:
response = await session.get(url)
# Session automatically closed
Best Practices
- Use asyncio.run() for entry point (Python 3.7+)
- Always await coroutines - don't forget await
- Use async context managers for resource cleanup
- Connection pooling for HTTP and database clients
- Handle CancelledError for graceful shutdown
- Use asyncio.gather() for parallel operations
- Avoid blocking operations in async functions
- Use timeouts to prevent hanging operations
- Debug mode during development to catch issues
- Test async code with pytest-asyncio
Quick Reference
Common Commands
# Run async script
python script.py
# Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py
# Run tests
pytest -v --asyncio-mode=auto
# Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio
Essential Imports
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any
Resources
- Official Documentation: https://docs.python.org/3/library/asyncio.html
- aiohttp: https://docs.aiohttp.org/
- asyncpg: https://magicstack.github.io/asyncpg/
- FastAPI Async: https://fastapi.tiangolo.com/async/
- pytest-asyncio: https://pytest-asyncio.readthedocs.io/
Related Skills
When using asyncio, consider these complementary skills:
- fastapi-local-dev: FastAPI async server patterns and production deployment
- pytest: Testing async code with pytest-asyncio and fixtures
- systematic-debugging: Debugging async race conditions and deadlocks
Quick FastAPI Async Patterns (Inlined for Standalone Use)
# FastAPI async endpoint pattern
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
# Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# Async database query
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Background tasks with asyncio
@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
# Non-blocking background task
asyncio.create_task(send_email_async(email))
return {"status": "email queued"}
Quick pytest-asyncio Patterns (Inlined for Standalone Use)
# Testing async functions with pytest
import pytest
import pytest_asyncio
from httpx import AsyncClient
# Async test fixture
@pytest_asyncio.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# Async test function
@pytest.mark.asyncio
async def test_get_user(async_client):
response = await async_client.get("/users/1")
assert response.status_code == 200
assert response.json()["id"] == 1
# Testing concurrent operations
@pytest.mark.asyncio
async def test_concurrent_requests():
async with AsyncClient(app=app, base_url="http://test") as client:
# Run 10 requests concurrently
responses = await asyncio.gather(
*[client.get(f"/users/{i}") for i in range(1, 11)]
)
assert all(r.status_code == 200 for r in responses)
# Mock async dependencies
@pytest_asyncio.fixture
async def mock_db():
# Setup mock database
db = AsyncMock()
yield db
# Cleanup
Quick Async Debugging Reference (Inlined for Standalone Use)
Common Async Pitfalls:
Blocking the Event Loop
# ❌ WRONG: Blocking call in async function async def bad_function(): time.sleep(5) # Blocks entire event loop! return "done" # ✅ CORRECT: Use asyncio.sleep async def good_function(): await asyncio.sleep(5) # Releases event loop return "done"Debugging Race Conditions
# Add logging to track execution order import logging logging.basicConfig(level=logging.DEBUG) async def debug_task(name): logging.debug(f"{name}: Starting") await asyncio.sleep(1) logging.debug(f"{name}: Finished") return name # Run with detailed tracing asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)Deadlock Detection
# Use timeout to detect deadlocks try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("Deadlock detected: operation timed out") # Investigate what's blockingInspecting Running Tasks
# Check all pending tasks tasks = asyncio.all_tasks() for task in tasks: print(f"Task: {task.get_name()}, Done: {task.done()}") if not task.done(): print(f" Current coro: {task.get_coro()}")
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]
Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.