| name | websocket |
| description | Implement real-time bidirectional communication using WebSocket connections. Use when building live chat, real-time notifications, streaming data feeds, or collaborative applications requiring persistent connections. |
WebSocket Integration
Provides patterns for WebSocket-based real-time communication.
Basic WebSocket Client
import asyncio
import websockets
import json
async def connect_websocket(url: str, on_message: callable):
"""
Connect to WebSocket and handle messages.
Usage:
async def handle_message(data):
print(f"Received: {data}")
await connect_websocket("wss://example.com/ws", handle_message)
"""
async with websockets.connect(url) as ws:
async for message in ws:
data = json.loads(message)
await on_message(data)
WebSocket with Reconnection
import asyncio
import websockets
import json
from typing import Callable, Optional
class WebSocketClient:
"""WebSocket client with automatic reconnection."""
def __init__(
self,
url: str,
on_message: Callable,
reconnect_interval: float = 5.0
):
self.url = url
self.on_message = on_message
self.reconnect_interval = reconnect_interval
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.running = False
async def connect(self):
"""Connect with automatic reconnection."""
self.running = True
while self.running:
try:
async with websockets.connect(self.url) as ws:
self.ws = ws
async for message in ws:
data = json.loads(message)
await self.on_message(data)
except websockets.ConnectionClosed:
if self.running:
await asyncio.sleep(self.reconnect_interval)
except Exception as e:
if self.running:
await asyncio.sleep(self.reconnect_interval)
async def send(self, data: dict):
"""Send message to WebSocket."""
if self.ws:
await self.ws.send(json.dumps(data))
def stop(self):
"""Stop the client."""
self.running = False
Subscription Pattern
async def subscribe_to_channel(url: str, channel: str, handler: callable):
"""Subscribe to a specific channel."""
async with websockets.connect(url) as ws:
# Send subscription message
await ws.send(json.dumps({
"action": "subscribe",
"channel": channel
}))
# Handle incoming messages
async for message in ws:
data = json.loads(message)
if data.get("channel") == channel:
await handler(data.get("payload"))
Heartbeat/Ping-Pong
async def websocket_with_heartbeat(url: str, heartbeat_interval: float = 30.0):
"""Maintain connection with periodic heartbeats."""
async with websockets.connect(url) as ws:
async def send_heartbeat():
while True:
await asyncio.sleep(heartbeat_interval)
await ws.ping()
heartbeat_task = asyncio.create_task(send_heartbeat())
try:
async for message in ws:
yield json.loads(message)
finally:
heartbeat_task.cancel()
Message Queue Pattern
from asyncio import Queue
class MessageBroker:
"""Broker for WebSocket message distribution."""
def __init__(self):
self.subscribers: dict[str, list[Queue]] = {}
async def subscribe(self, channel: str) -> Queue:
"""Subscribe to a channel."""
queue = Queue()
if channel not in self.subscribers:
self.subscribers[channel] = []
self.subscribers[channel].append(queue)
return queue
async def publish(self, channel: str, message: dict):
"""Publish message to all subscribers."""
if channel in self.subscribers:
for queue in self.subscribers[channel]:
await queue.put(message)