Claude Code Plugins

Community-maintained marketplace

Feedback

implementing-websocket-realtime

@miethe/family-shopping-dashboard
0
0

Use this skill when implementing real-time features with WebSockets, including connection lifecycle management, pub/sub event patterns, state synchronization between client and server, event broadcasting, and integration with frontend state management (React Query, SWR, etc.). Covers FastAPI/Node.js/Express backends and React/Next.js/Vue frontends with fallback strategies and error recovery.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name implementing-websocket-realtime
description Use this skill when implementing real-time features with WebSockets, including connection lifecycle management, pub/sub event patterns, state synchronization between client and server, event broadcasting, and integration with frontend state management (React Query, SWR, etc.). Covers FastAPI/Node.js/Express backends and React/Next.js/Vue frontends with fallback strategies and error recovery.

WebSocket Real-Time Implementation

Decision Tree: Real-Time Protocol Selection

Requirement Protocol Why
Bidirectional, instant updates WebSocket Full duplex, low latency
Server → client only SSE Simpler, auto-reconnect
Infrequent updates (<1/min) Polling Simpler, no persistent connection
Mobile/unstable network WS + Fallback Graceful degradation

Decision: Use WebSocket for real-time collaboration, SSE for notifications, polling as last resort.

Generic Event Structure

interface WebSocketEvent {
  topic: string;              // "resource-type:identifier" (e.g., "gift-list:family-123")
  event: EventType;           // ADDED | UPDATED | DELETED | STATUS_CHANGED | CUSTOM
  data: {
    entity_id: string;        // Resource ID
    payload: unknown;         // Event-specific data (DTO)
    user_id?: string;         // Who triggered (optional)
    tenant_id?: string;       // Multi-tenant context (optional)
    timestamp: string;        // ISO 8601
  };
  trace_id?: string;          // For observability
}

type EventType = "ADDED" | "UPDATED" | "DELETED" | "STATUS_CHANGED" | string;

Validation: See ./scripts/validate-ws-event.js

Connection Lifecycle

1. Initial Connection

// Client
const ws = new WebSocket(`${WS_URL}?token=${authToken}`);

ws.onopen = () => {
  console.log('Connected');
  subscribeToTopics(['gift-list:123', 'user:456']);
};
# Server (FastAPI)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str):
    await websocket.accept()
    user = authenticate(token)  # Validate on connect
    connection_manager.connect(websocket, user.id)

2. Authentication

Options:

  • Query param: ?token=jwt (simplest)
  • First message: Send auth message after connect
  • Cookie: Use existing HTTP session

Recommendation: Query param for stateless, first message for flexible auth.

3. Subscription Management

// Subscribe to topics
function subscribe(topics: string[]) {
  ws.send(JSON.stringify({
    type: 'subscribe',
    topics: ['gift-list:123', 'user:456']
  }));
}

// Unsubscribe on unmount
function unsubscribe(topics: string[]) {
  ws.send(JSON.stringify({
    type: 'unsubscribe',
    topics: ['gift-list:123']
  }));
}

4. Heartbeat/Keepalive

// Client: Ping every 30s
const heartbeat = setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ type: 'ping' }));
  }
}, 30000);

// Server responds with pong
ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  if (msg.type === 'pong') {
    lastPong = Date.now();
  }
};

5. Reconnection Logic

let reconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 5;
const INITIAL_DELAY = 1000;

function reconnect() {
  if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
    console.error('Max reconnection attempts reached');
    return;
  }

  const delay = INITIAL_DELAY * Math.pow(2, reconnectAttempts); // Exponential backoff
  setTimeout(() => {
    reconnectAttempts++;
    connect();
  }, delay);
}

ws.onclose = () => {
  console.log('Connection closed, reconnecting...');
  reconnect();
};

6. Cleanup

// On component unmount
useEffect(() => {
  return () => {
    if (ws) {
      ws.close();
      clearInterval(heartbeat);
    }
  };
}, []);

Details: See ./connection-lifecycle.md

State Synchronization Pattern

Flow

1. Load initial data    → REST API (React Query)
2. Subscribe to updates → WebSocket (on mount)
3. Receive event        → Invalidate cache → React Query refetches
4. Optimistic update    → Update UI immediately, rollback on error
5. Unsubscribe          → WebSocket (on unmount)
6. Fallback             → Poll every 10s if WS fails

Implementation

// 1. Load initial data
const { data, isLoading } = useQuery({
  queryKey: ['gift-list', listId],
  queryFn: () => fetchGiftList(listId),
});

// 2. Subscribe to WebSocket updates
useEffect(() => {
  const ws = connectWebSocket();

  ws.onmessage = (event) => {
    const wsEvent: WebSocketEvent = JSON.parse(event.data);

    // 3. Invalidate cache on event
    if (wsEvent.topic === `gift-list:${listId}`) {
      queryClient.invalidateQueries(['gift-list', listId]);
    }
  };

  ws.send(JSON.stringify({
    type: 'subscribe',
    topics: [`gift-list:${listId}`]
  }));

  return () => {
    ws.send(JSON.stringify({
      type: 'unsubscribe',
      topics: [`gift-list:${listId}`]
    }));
    ws.close();
  };
}, [listId]);

// 4. Optimistic update
const mutation = useMutation({
  mutationFn: updateGift,
  onMutate: async (newGift) => {
    await queryClient.cancelQueries(['gift-list', listId]);
    const previous = queryClient.getQueryData(['gift-list', listId]);
    queryClient.setQueryData(['gift-list', listId], (old) => ({
      ...old,
      gifts: old.gifts.map(g => g.id === newGift.id ? newGift : g)
    }));
    return { previous };
  },
  onError: (err, newGift, context) => {
    queryClient.setQueryData(['gift-list', listId], context.previous);
  },
});

Details: See ./state-sync-strategies.md

Implementation Checklist

Backend Setup

  • WebSocket server endpoint
    • FastAPI: @app.websocket("/ws")
    • Node.js: ws or socket.io library
  • Authentication on connect
  • Connection manager (track active connections)
  • Topic subscription logic
  • Event broadcasting
    • Per-topic subscribers
    • Per-user filtering (if multi-tenant)
  • Heartbeat/pong handler
  • Error handling & logging
  • Trace IDs for observability

Frontend Setup

  • WebSocket connection hook
  • Auto-reconnection logic
  • Subscription management
  • Event handlers
  • State management integration
    • React Query invalidation
    • SWR revalidation
    • Custom state updates
  • Optimistic updates
  • Connection status UI
  • Fallback polling (if WS fails)
  • Cleanup on unmount

Testing

  • Connection establishment
  • Authentication flow
  • Subscription/unsubscription
  • Event delivery
  • Reconnection logic
  • Fallback behavior
  • Load testing (concurrent connections)
  • Network failure scenarios

Details: See ./backend-patterns.md and ./frontend-patterns.md

Backend Patterns

FastAPI Connection Manager

from fastapi import WebSocket
from typing import Dict, Set

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.subscriptions: Dict[str, Set[str]] = {}  # topic -> set of user_ids

    async def connect(self, websocket: WebSocket, user_id: str):
        self.active_connections[user_id] = websocket

    def disconnect(self, user_id: str):
        if user_id in self.active_connections:
            del self.active_connections[user_id]

    def subscribe(self, user_id: str, topic: str):
        if topic not in self.subscriptions:
            self.subscriptions[topic] = set()
        self.subscriptions[topic].add(user_id)

    async def broadcast(self, topic: str, event: dict):
        if topic not in self.subscriptions:
            return

        for user_id in self.subscriptions[topic]:
            if user_id in self.active_connections:
                ws = self.active_connections[user_id]
                await ws.send_json(event)

manager = ConnectionManager()

Full examples: See ./backend-patterns.md

Frontend Patterns

React Hook: useWebSocket

function useWebSocket(topics: string[]) {
  const [isConnected, setIsConnected] = useState(false);
  const wsRef = useRef<WebSocket | null>(null);
  const queryClient = useQueryClient();

  useEffect(() => {
    const ws = new WebSocket(`${WS_URL}?token=${getToken()}`);

    ws.onopen = () => {
      setIsConnected(true);
      ws.send(JSON.stringify({ type: 'subscribe', topics }));
    };

    ws.onmessage = (event) => {
      const wsEvent: WebSocketEvent = JSON.parse(event.data);

      // Invalidate relevant queries
      queryClient.invalidateQueries([wsEvent.topic.split(':')[0]]);
    };

    ws.onclose = () => {
      setIsConnected(false);
      // Reconnect logic here
    };

    wsRef.current = ws;

    return () => {
      ws.send(JSON.stringify({ type: 'unsubscribe', topics }));
      ws.close();
    };
  }, [topics.join(',')]);

  return { isConnected };
}

Full examples: See ./frontend-patterns.md

Progressive Disclosure References

For detailed patterns and examples:

  • Connection Lifecycle: ./connection-lifecycle.md
  • Event Structure & Validation: ./event-structure-patterns.md
  • State Sync Strategies: ./state-sync-strategies.md
  • Backend Implementations: ./backend-patterns.md
  • Frontend Implementations: ./frontend-patterns.md
  • Fallback & Recovery: ./fallback-strategies.md
  • Event Validator Script: ./scripts/validate-ws-event.js