Claude Code Plugins

Community-maintained marketplace

Feedback

Implement real-time bidirectional communication using WebSocket connections. Use when building live chat, real-time notifications, streaming data feeds, or collaborative applications requiring persistent connections.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name websocket
description Implement real-time bidirectional communication using WebSocket connections. Use when building live chat, real-time notifications, streaming data feeds, or collaborative applications requiring persistent connections.

WebSocket Integration

Provides patterns for WebSocket-based real-time communication.

Basic WebSocket Client

import asyncio
import websockets
import json

async def connect_websocket(url: str, on_message: callable):
    """
    Connect to WebSocket and handle messages.

    Usage:
        async def handle_message(data):
            print(f"Received: {data}")

        await connect_websocket("wss://example.com/ws", handle_message)
    """
    async with websockets.connect(url) as ws:
        async for message in ws:
            data = json.loads(message)
            await on_message(data)

WebSocket with Reconnection

import asyncio
import websockets
import json
from typing import Callable, Optional

class WebSocketClient:
    """WebSocket client with automatic reconnection."""

    def __init__(
        self,
        url: str,
        on_message: Callable,
        reconnect_interval: float = 5.0
    ):
        self.url = url
        self.on_message = on_message
        self.reconnect_interval = reconnect_interval
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.running = False

    async def connect(self):
        """Connect with automatic reconnection."""
        self.running = True
        while self.running:
            try:
                async with websockets.connect(self.url) as ws:
                    self.ws = ws
                    async for message in ws:
                        data = json.loads(message)
                        await self.on_message(data)
            except websockets.ConnectionClosed:
                if self.running:
                    await asyncio.sleep(self.reconnect_interval)
            except Exception as e:
                if self.running:
                    await asyncio.sleep(self.reconnect_interval)

    async def send(self, data: dict):
        """Send message to WebSocket."""
        if self.ws:
            await self.ws.send(json.dumps(data))

    def stop(self):
        """Stop the client."""
        self.running = False

Subscription Pattern

async def subscribe_to_channel(url: str, channel: str, handler: callable):
    """Subscribe to a specific channel."""
    async with websockets.connect(url) as ws:
        # Send subscription message
        await ws.send(json.dumps({
            "action": "subscribe",
            "channel": channel
        }))

        # Handle incoming messages
        async for message in ws:
            data = json.loads(message)
            if data.get("channel") == channel:
                await handler(data.get("payload"))

Heartbeat/Ping-Pong

async def websocket_with_heartbeat(url: str, heartbeat_interval: float = 30.0):
    """Maintain connection with periodic heartbeats."""
    async with websockets.connect(url) as ws:
        async def send_heartbeat():
            while True:
                await asyncio.sleep(heartbeat_interval)
                await ws.ping()

        heartbeat_task = asyncio.create_task(send_heartbeat())
        try:
            async for message in ws:
                yield json.loads(message)
        finally:
            heartbeat_task.cancel()

Message Queue Pattern

from asyncio import Queue

class MessageBroker:
    """Broker for WebSocket message distribution."""

    def __init__(self):
        self.subscribers: dict[str, list[Queue]] = {}

    async def subscribe(self, channel: str) -> Queue:
        """Subscribe to a channel."""
        queue = Queue()
        if channel not in self.subscribers:
            self.subscribers[channel] = []
        self.subscribers[channel].append(queue)
        return queue

    async def publish(self, channel: str, message: dict):
        """Publish message to all subscribers."""
        if channel in self.subscribers:
            for queue in self.subscribers[channel]:
                await queue.put(message)