Claude Code Plugins

Community-maintained marketplace

Feedback

Master asynchronous programming with asyncio, async/await, concurrent operations, and async frameworks

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name Asyncio Programming
description Master asynchronous programming with asyncio, async/await, concurrent operations, and async frameworks
version 2.1.0
sasmp_version 1.3.0
bonded_agent 05-async-concurrency
bond_type PRIMARY_BOND
retry_strategy exponential_backoff
observability [object Object]

Asyncio Programming

Overview

Master asynchronous programming in Python with asyncio. Learn to write concurrent code that efficiently handles I/O-bound operations, build async web applications, and understand the async/await paradigm.

Learning Objectives

  • Understand asynchronous programming concepts
  • Write async functions with async/await syntax
  • Manage concurrent operations with asyncio
  • Build async web applications
  • Handle async I/O operations efficiently
  • Debug and test async code

Core Topics

1. Async/Await Basics

  • Understanding coroutines
  • async/await syntax
  • Event loop fundamentals
  • Running async functions
  • Async vs sync execution
  • Common pitfalls

Code Example:

import asyncio
import time

# Synchronous version (slow)
def fetch_data_sync(url):
    print(f"Fetching {url}...")
    time.sleep(2)  # Simulating network delay
    return f"Data from {url}"

def main_sync():
    urls = ['url1', 'url2', 'url3']
    results = []
    for url in urls:
        data = fetch_data_sync(url)
        results.append(data)
    return results

# Takes 6 seconds (2 * 3)
start = time.time()
main_sync()
print(f"Sync took: {time.time() - start:.2f}s")

# Asynchronous version (fast)
async def fetch_data_async(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # Non-blocking sleep
    return f"Data from {url}"

async def main_async():
    urls = ['url1', 'url2', 'url3']
    # Create tasks and run concurrently
    tasks = [fetch_data_async(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# Takes 2 seconds (concurrent execution)
start = time.time()
asyncio.run(main_async())
print(f"Async took: {time.time() - start:.2f}s")

2. Asyncio Tasks & Coroutines

  • Creating and managing tasks
  • asyncio.gather() vs asyncio.wait()
  • Task cancellation
  • Task groups (Python 3.11+)
  • Exception handling in tasks
  • Timeouts

Code Example:

import asyncio

async def process_item(item_id, delay):
    print(f"Processing item {item_id}")
    await asyncio.sleep(delay)
    if item_id == 3:
        raise ValueError(f"Item {item_id} failed!")
    return f"Result {item_id}"

async def main():
    # Method 1: gather (returns results in order)
    tasks = [
        process_item(1, 1),
        process_item(2, 2),
        process_item(3, 1),
    ]
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Task {i} failed: {result}")
            else:
                print(f"Task {i} result: {result}")
    except Exception as e:
        print(f"Error: {e}")

    # Method 2: wait (returns done/pending sets)
    tasks = [
        asyncio.create_task(process_item(i, i))
        for i in range(1, 4)
    ]
    done, pending = await asyncio.wait(tasks, timeout=2.5)

    print(f"Completed: {len(done)}, Pending: {len(pending)}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Method 3: Task groups (Python 3.11+)
    async with asyncio.TaskGroup() as tg:
        for i in range(1, 4):
            tg.create_task(process_item(i, 1))
    # All tasks completed or exception raised

asyncio.run(main())

3. Async I/O Operations

  • Async file operations (aiofiles)
  • Async HTTP requests (aiohttp)
  • Async database operations (asyncpg, motor)
  • Async messaging (aio-pika)
  • Streams and protocols

Code Example:

import asyncio
import aiohttp
import aiofiles
from typing import List

# Async HTTP requests
async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_multiple_urls(urls: List[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Async file operations
async def read_file_async(filepath):
    async with aiofiles.open(filepath, 'r') as f:
        content = await f.read()
        return content

async def write_file_async(filepath, content):
    async with aiofiles.open(filepath, 'w') as f:
        await f.write(content)

# Async database operations (example with asyncpg)
import asyncpg

async def fetch_users():
    conn = await asyncpg.connect(
        user='user',
        password='password',
        database='mydb',
        host='localhost'
    )
    try:
        rows = await conn.fetch('SELECT * FROM users')
        return rows
    finally:
        await conn.close()

# Usage
async def main():
    # Fetch URLs concurrently
    urls = [
        'https://api.example.com/data1',
        'https://api.example.com/data2',
        'https://api.example.com/data3',
    ]
    results = await fetch_multiple_urls(urls)

    # Read/write files
    content = await read_file_async('input.txt')
    await write_file_async('output.txt', content.upper())

    # Database operations
    users = await fetch_users()
    print(f"Found {len(users)} users")

asyncio.run(main())

4. Async Web Frameworks

  • FastAPI async routes
  • aiohttp web server
  • WebSocket handling
  • Background tasks
  • Middleware and dependencies

Code Example:

# FastAPI async example
from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

# Async route
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Async database call
    user = await fetch_user_from_db(user_id)
    return user

# Background task
async def send_notification(email: str, message: str):
    await asyncio.sleep(2)  # Simulate email sending
    print(f"Sent email to {email}: {message}")

@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
    # Process order synchronously
    order_id = save_order(order_data)

    # Send notification in background
    background_tasks.add_task(
        send_notification,
        order_data['customer_email'],
        f"Order #{order_id} created"
    )

    return {"order_id": order_id}

# aiohttp web server
from aiohttp import web

async def handle_request(request):
    name = request.match_info.get('name', 'Anonymous')
    await asyncio.sleep(1)  # Async operation
    return web.json_response({'message': f'Hello {name}'})

app = web.Application()
app.add_routes([web.get('/{name}', handle_request)])

# WebSocket example
async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT:
            await ws.send_str(f"Echo: {msg.data}")
        elif msg.type == web.WSMsgType.ERROR:
            print(f'Error: {ws.exception()}')

    return ws

app.add_routes([web.get('/ws', websocket_handler)])

Hands-On Practice

Project 1: Async Web Scraper

Build a concurrent web scraper with rate limiting.

Requirements:

  • Scrape multiple websites concurrently
  • Implement rate limiting
  • Handle errors gracefully
  • Save results to async database
  • Progress tracking
  • Retry failed requests

Key Skills: aiohttp, async I/O, error handling

Project 2: Real-time Chat Server

Create a WebSocket-based chat application.

Requirements:

  • WebSocket server with aiohttp
  • Multiple chat rooms
  • User authentication
  • Message broadcasting
  • Connection management
  • Message history persistence

Key Skills: WebSockets, async server, state management

Project 3: Async Task Queue

Build a distributed task processing system.

Requirements:

  • Task queue with Redis/RabbitMQ
  • Worker pool management
  • Task prioritization
  • Result caching
  • Progress monitoring
  • Graceful shutdown

Key Skills: Message queues, concurrent workers, cleanup

Assessment Criteria

  • Understand async/await semantics
  • Write efficient concurrent code
  • Handle async exceptions properly
  • Use asyncio tasks effectively
  • Build async web applications
  • Debug async code
  • Manage async resources (cleanup)

Resources

Official Documentation

Learning Platforms

Tools

Next Steps

After mastering asyncio, explore:

  • Multiprocessing - CPU-bound parallelism
  • Celery - Distributed task queue
  • gRPC - Async RPC framework
  • Kafka - Async event streaming