| name | Asyncio Programming |
| description | Master asynchronous programming with asyncio, async/await, concurrent operations, and async frameworks |
| version | 2.1.0 |
| sasmp_version | 1.3.0 |
| bonded_agent | 05-async-concurrency |
| bond_type | PRIMARY_BOND |
| retry_strategy | exponential_backoff |
| observability | [object Object] |
Asyncio Programming
Overview
Master asynchronous programming in Python with asyncio. Learn to write concurrent code that efficiently handles I/O-bound operations, build async web applications, and understand the async/await paradigm.
Learning Objectives
- Understand asynchronous programming concepts
- Write async functions with async/await syntax
- Manage concurrent operations with asyncio
- Build async web applications
- Handle async I/O operations efficiently
- Debug and test async code
Core Topics
1. Async/Await Basics
- Understanding coroutines
- async/await syntax
- Event loop fundamentals
- Running async functions
- Async vs sync execution
- Common pitfalls
Code Example:
import asyncio
import time
# Synchronous version (slow)
def fetch_data_sync(url):
print(f"Fetching {url}...")
time.sleep(2) # Simulating network delay
return f"Data from {url}"
def main_sync():
urls = ['url1', 'url2', 'url3']
results = []
for url in urls:
data = fetch_data_sync(url)
results.append(data)
return results
# Takes 6 seconds (2 * 3)
start = time.time()
main_sync()
print(f"Sync took: {time.time() - start:.2f}s")
# Asynchronous version (fast)
async def fetch_data_async(url):
print(f"Fetching {url}...")
await asyncio.sleep(2) # Non-blocking sleep
return f"Data from {url}"
async def main_async():
urls = ['url1', 'url2', 'url3']
# Create tasks and run concurrently
tasks = [fetch_data_async(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Takes 2 seconds (concurrent execution)
start = time.time()
asyncio.run(main_async())
print(f"Async took: {time.time() - start:.2f}s")
2. Asyncio Tasks & Coroutines
- Creating and managing tasks
- asyncio.gather() vs asyncio.wait()
- Task cancellation
- Task groups (Python 3.11+)
- Exception handling in tasks
- Timeouts
Code Example:
import asyncio
async def process_item(item_id, delay):
print(f"Processing item {item_id}")
await asyncio.sleep(delay)
if item_id == 3:
raise ValueError(f"Item {item_id} failed!")
return f"Result {item_id}"
async def main():
# Method 1: gather (returns results in order)
tasks = [
process_item(1, 1),
process_item(2, 2),
process_item(3, 1),
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} result: {result}")
except Exception as e:
print(f"Error: {e}")
# Method 2: wait (returns done/pending sets)
tasks = [
asyncio.create_task(process_item(i, i))
for i in range(1, 4)
]
done, pending = await asyncio.wait(tasks, timeout=2.5)
print(f"Completed: {len(done)}, Pending: {len(pending)}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Method 3: Task groups (Python 3.11+)
async with asyncio.TaskGroup() as tg:
for i in range(1, 4):
tg.create_task(process_item(i, 1))
# All tasks completed or exception raised
asyncio.run(main())
3. Async I/O Operations
- Async file operations (aiofiles)
- Async HTTP requests (aiohttp)
- Async database operations (asyncpg, motor)
- Async messaging (aio-pika)
- Streams and protocols
Code Example:
import asyncio
import aiohttp
import aiofiles
from typing import List
# Async HTTP requests
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Async file operations
async def read_file_async(filepath):
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content
async def write_file_async(filepath, content):
async with aiofiles.open(filepath, 'w') as f:
await f.write(content)
# Async database operations (example with asyncpg)
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
rows = await conn.fetch('SELECT * FROM users')
return rows
finally:
await conn.close()
# Usage
async def main():
# Fetch URLs concurrently
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
]
results = await fetch_multiple_urls(urls)
# Read/write files
content = await read_file_async('input.txt')
await write_file_async('output.txt', content.upper())
# Database operations
users = await fetch_users()
print(f"Found {len(users)} users")
asyncio.run(main())
4. Async Web Frameworks
- FastAPI async routes
- aiohttp web server
- WebSocket handling
- Background tasks
- Middleware and dependencies
Code Example:
# FastAPI async example
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# Async route
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Async database call
user = await fetch_user_from_db(user_id)
return user
# Background task
async def send_notification(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f"Sent email to {email}: {message}")
@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
# Process order synchronously
order_id = save_order(order_data)
# Send notification in background
background_tasks.add_task(
send_notification,
order_data['customer_email'],
f"Order #{order_id} created"
)
return {"order_id": order_id}
# aiohttp web server
from aiohttp import web
async def handle_request(request):
name = request.match_info.get('name', 'Anonymous')
await asyncio.sleep(1) # Async operation
return web.json_response({'message': f'Hello {name}'})
app = web.Application()
app.add_routes([web.get('/{name}', handle_request)])
# WebSocket example
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
elif msg.type == web.WSMsgType.ERROR:
print(f'Error: {ws.exception()}')
return ws
app.add_routes([web.get('/ws', websocket_handler)])
Hands-On Practice
Project 1: Async Web Scraper
Build a concurrent web scraper with rate limiting.
Requirements:
- Scrape multiple websites concurrently
- Implement rate limiting
- Handle errors gracefully
- Save results to async database
- Progress tracking
- Retry failed requests
Key Skills: aiohttp, async I/O, error handling
Project 2: Real-time Chat Server
Create a WebSocket-based chat application.
Requirements:
- WebSocket server with aiohttp
- Multiple chat rooms
- User authentication
- Message broadcasting
- Connection management
- Message history persistence
Key Skills: WebSockets, async server, state management
Project 3: Async Task Queue
Build a distributed task processing system.
Requirements:
- Task queue with Redis/RabbitMQ
- Worker pool management
- Task prioritization
- Result caching
- Progress monitoring
- Graceful shutdown
Key Skills: Message queues, concurrent workers, cleanup
Assessment Criteria
- Understand async/await semantics
- Write efficient concurrent code
- Handle async exceptions properly
- Use asyncio tasks effectively
- Build async web applications
- Debug async code
- Manage async resources (cleanup)
Resources
Official Documentation
- asyncio Docs - Official documentation
- aiohttp - Async HTTP client/server
- FastAPI - Modern async web framework
Learning Platforms
- Using Asyncio in Python - Real Python guide
- Python Concurrency - O'Reilly book
- Asyncio Recipes - Practical examples
Tools
- aiohttp - Async HTTP
- aiofiles - Async file I/O
- asyncpg - Async PostgreSQL
- aiodebug - Async debugging
Next Steps
After mastering asyncio, explore:
- Multiprocessing - CPU-bound parallelism
- Celery - Distributed task queue
- gRPC - Async RPC framework
- Kafka - Async event streaming