| name | realtime-systems |
| description | WebSocket, real-time communication, and event-driven architectures |
| domain | development-stacks |
| version | 1.0.0 |
| tags | websocket, socket-io, sse, pubsub, real-time, events |
| triggers | [object Object] |
Real-time Systems
Overview
Building real-time applications with WebSocket, Server-Sent Events, and event-driven architectures.
WebSocket
Server Implementation (Node.js)
import { WebSocketServer, WebSocket } from 'ws';
import { createServer } from 'http';
import { v4 as uuid } from 'uuid';
const server = createServer();
const wss = new WebSocketServer({ server });
interface Client {
id: string;
ws: WebSocket;
userId?: string;
rooms: Set<string>;
}
const clients = new Map<string, Client>();
const rooms = new Map<string, Set<string>>();
wss.on('connection', (ws, req) => {
const clientId = uuid();
const client: Client = {
id: clientId,
ws,
rooms: new Set(),
};
clients.set(clientId, client);
console.log(`Client connected: ${clientId}`);
// Handle messages
ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
handleMessage(client, message);
} catch (error) {
console.error('Invalid message:', error);
}
});
// Handle disconnection
ws.on('close', () => {
// Leave all rooms
client.rooms.forEach(room => leaveRoom(client, room));
clients.delete(clientId);
console.log(`Client disconnected: ${clientId}`);
});
// Send connection confirmation
send(ws, { type: 'connected', clientId });
});
function handleMessage(client: Client, message: any) {
switch (message.type) {
case 'authenticate':
client.userId = message.userId;
break;
case 'join':
joinRoom(client, message.room);
break;
case 'leave':
leaveRoom(client, message.room);
break;
case 'message':
broadcastToRoom(message.room, {
type: 'message',
from: client.userId,
content: message.content,
timestamp: Date.now(),
}, client.id);
break;
case 'ping':
send(client.ws, { type: 'pong' });
break;
}
}
function joinRoom(client: Client, room: string) {
if (!rooms.has(room)) {
rooms.set(room, new Set());
}
rooms.get(room)!.add(client.id);
client.rooms.add(room);
// Notify room members
broadcastToRoom(room, {
type: 'user_joined',
userId: client.userId,
room,
}, client.id);
}
function leaveRoom(client: Client, room: string) {
rooms.get(room)?.delete(client.id);
client.rooms.delete(room);
// Notify room members
broadcastToRoom(room, {
type: 'user_left',
userId: client.userId,
room,
});
}
function broadcastToRoom(room: string, message: any, excludeClientId?: string) {
const roomClients = rooms.get(room);
if (!roomClients) return;
roomClients.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = clients.get(clientId);
if (client?.ws.readyState === WebSocket.OPEN) {
send(client.ws, message);
}
}
});
}
function send(ws: WebSocket, message: any) {
ws.send(JSON.stringify(message));
}
server.listen(8080);
Client Implementation
class WebSocketClient {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
private messageHandlers = new Map<string, Set<Function>>();
private messageQueue: any[] = [];
constructor(private url: string) {}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.flushMessageQueue();
resolve();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason);
this.attemptReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
});
}
private attemptReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect().catch(() => {});
}, delay);
}
send(message: any) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// Queue message for when connection is restored
this.messageQueue.push(message);
}
}
private flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.send(message);
}
}
private handleMessage(message: any) {
const handlers = this.messageHandlers.get(message.type);
handlers?.forEach(handler => handler(message));
// Also emit to wildcard handlers
const wildcardHandlers = this.messageHandlers.get('*');
wildcardHandlers?.forEach(handler => handler(message));
}
on(type: string, handler: Function) {
if (!this.messageHandlers.has(type)) {
this.messageHandlers.set(type, new Set());
}
this.messageHandlers.get(type)!.add(handler);
// Return unsubscribe function
return () => {
this.messageHandlers.get(type)?.delete(handler);
};
}
// Convenience methods
joinRoom(room: string) {
this.send({ type: 'join', room });
}
leaveRoom(room: string) {
this.send({ type: 'leave', room });
}
sendMessage(room: string, content: string) {
this.send({ type: 'message', room, content });
}
disconnect() {
this.ws?.close();
this.ws = null;
}
}
// Usage
const ws = new WebSocketClient('wss://api.example.com/ws');
ws.on('connected', (msg) => {
console.log('Connected with ID:', msg.clientId);
ws.joinRoom('general');
});
ws.on('message', (msg) => {
console.log(`[${msg.from}]: ${msg.content}`);
});
await ws.connect();
Socket.IO
Server
import { Server } from 'socket.io';
import { createServer } from 'http';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
const httpServer = createServer();
const io = new Server(httpServer, {
cors: {
origin: process.env.ALLOWED_ORIGINS?.split(',') || '*',
credentials: true,
},
});
// Redis adapter for horizontal scaling
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
io.adapter(createAdapter(pubClient, subClient));
});
// Authentication middleware
io.use(async (socket, next) => {
const token = socket.handshake.auth.token;
try {
const user = await verifyToken(token);
socket.data.user = user;
next();
} catch (err) {
next(new Error('Authentication failed'));
}
});
// Namespace for chat
const chatNamespace = io.of('/chat');
chatNamespace.on('connection', (socket) => {
const user = socket.data.user;
console.log(`User connected: ${user.name}`);
// Join user's personal room
socket.join(`user:${user.id}`);
// Join a chat room
socket.on('join_room', async (roomId: string) => {
// Verify access
const hasAccess = await checkRoomAccess(user.id, roomId);
if (!hasAccess) {
socket.emit('error', { message: 'Access denied' });
return;
}
socket.join(roomId);
// Notify room members
socket.to(roomId).emit('user_joined', {
userId: user.id,
userName: user.name,
});
// Send recent messages
const messages = await getRecentMessages(roomId, 50);
socket.emit('room_history', { roomId, messages });
});
// Leave room
socket.on('leave_room', (roomId: string) => {
socket.leave(roomId);
socket.to(roomId).emit('user_left', {
userId: user.id,
userName: user.name,
});
});
// Send message
socket.on('message', async (data: { roomId: string; content: string }) => {
const message = {
id: uuid(),
roomId: data.roomId,
userId: user.id,
userName: user.name,
content: data.content,
timestamp: new Date(),
};
// Persist message
await saveMessage(message);
// Broadcast to room
chatNamespace.to(data.roomId).emit('message', message);
});
// Typing indicator
socket.on('typing_start', (roomId: string) => {
socket.to(roomId).emit('user_typing', {
userId: user.id,
userName: user.name,
});
});
socket.on('typing_stop', (roomId: string) => {
socket.to(roomId).emit('user_stopped_typing', {
userId: user.id,
});
});
// Disconnect
socket.on('disconnect', () => {
console.log(`User disconnected: ${user.name}`);
});
});
// Send to specific user (from anywhere in the app)
function sendToUser(userId: string, event: string, data: any) {
chatNamespace.to(`user:${userId}`).emit(event, data);
}
httpServer.listen(3000);
Client (React)
import { io, Socket } from 'socket.io-client';
import { createContext, useContext, useEffect, useState } from 'react';
// Socket context
const SocketContext = createContext<Socket | null>(null);
export function SocketProvider({ children }: { children: React.ReactNode }) {
const [socket, setSocket] = useState<Socket | null>(null);
const { token } = useAuth();
useEffect(() => {
if (!token) return;
const newSocket = io(`${API_URL}/chat`, {
auth: { token },
transports: ['websocket'],
});
newSocket.on('connect', () => {
console.log('Socket connected');
});
newSocket.on('connect_error', (error) => {
console.error('Socket connection error:', error);
});
setSocket(newSocket);
return () => {
newSocket.close();
};
}, [token]);
return (
<SocketContext.Provider value={socket}>
{children}
</SocketContext.Provider>
);
}
export function useSocket() {
return useContext(SocketContext);
}
// Chat room hook
function useChatRoom(roomId: string) {
const socket = useSocket();
const [messages, setMessages] = useState<Message[]>([]);
const [typingUsers, setTypingUsers] = useState<Set<string>>(new Set());
useEffect(() => {
if (!socket || !roomId) return;
// Join room
socket.emit('join_room', roomId);
// Listen for messages
socket.on('message', (message: Message) => {
setMessages(prev => [...prev, message]);
});
// Room history
socket.on('room_history', ({ messages }: { messages: Message[] }) => {
setMessages(messages);
});
// Typing indicators
socket.on('user_typing', ({ userId }: { userId: string }) => {
setTypingUsers(prev => new Set(prev).add(userId));
});
socket.on('user_stopped_typing', ({ userId }: { userId: string }) => {
setTypingUsers(prev => {
const next = new Set(prev);
next.delete(userId);
return next;
});
});
return () => {
socket.emit('leave_room', roomId);
socket.off('message');
socket.off('room_history');
socket.off('user_typing');
socket.off('user_stopped_typing');
};
}, [socket, roomId]);
const sendMessage = (content: string) => {
socket?.emit('message', { roomId, content });
};
const startTyping = () => {
socket?.emit('typing_start', roomId);
};
const stopTyping = () => {
socket?.emit('typing_stop', roomId);
};
return { messages, typingUsers, sendMessage, startTyping, stopTyping };
}
Server-Sent Events (SSE)
Server
import express from 'express';
const app = express();
// SSE endpoint
app.get('/events', (req, res) => {
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Send initial connection event
res.write(`event: connected\ndata: ${JSON.stringify({ time: Date.now() })}\n\n`);
// Keep-alive interval
const keepAlive = setInterval(() => {
res.write(`: keep-alive\n\n`);
}, 30000);
// Subscribe to events
const unsubscribe = eventEmitter.on('update', (data) => {
res.write(`event: update\ndata: ${JSON.stringify(data)}\n\n`);
});
// Handle client disconnect
req.on('close', () => {
clearInterval(keepAlive);
unsubscribe();
});
});
// With user-specific events
app.get('/events/user/:userId', authenticate, (req, res) => {
const { userId } = req.params;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Subscribe to user-specific channel
const channel = `user:${userId}`;
const unsubscribe = pubsub.subscribe(channel, (message) => {
res.write(`event: ${message.type}\ndata: ${JSON.stringify(message.data)}\n\n`);
});
req.on('close', () => {
unsubscribe();
});
});
Client
class EventSourceClient {
private eventSource: EventSource | null = null;
private handlers = new Map<string, Set<Function>>();
connect(url: string) {
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
console.log('SSE connected');
};
this.eventSource.onerror = (error) => {
console.error('SSE error:', error);
// EventSource auto-reconnects
};
// Handle named events
this.handlers.forEach((handlers, eventType) => {
this.eventSource!.addEventListener(eventType, (event: MessageEvent) => {
const data = JSON.parse(event.data);
handlers.forEach(handler => handler(data));
});
});
}
on(eventType: string, handler: Function) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
// Add listener if already connected
if (this.eventSource) {
this.eventSource.addEventListener(eventType, (event: MessageEvent) => {
const data = JSON.parse(event.data);
this.handlers.get(eventType)?.forEach(h => h(data));
});
}
}
this.handlers.get(eventType)!.add(handler);
return () => {
this.handlers.get(eventType)?.delete(handler);
};
}
close() {
this.eventSource?.close();
this.eventSource = null;
}
}
// Usage
const sse = new EventSourceClient();
sse.on('update', (data) => console.log('Update:', data));
sse.on('notification', (data) => showNotification(data));
sse.connect('/events');
Pub/Sub with Redis
import Redis from 'ioredis';
const publisher = new Redis(process.env.REDIS_URL);
const subscriber = new Redis(process.env.REDIS_URL);
// Publish event
async function publishEvent(channel: string, event: any) {
await publisher.publish(channel, JSON.stringify(event));
}
// Subscribe to channel
function subscribe(channel: string, handler: (event: any) => void) {
subscriber.subscribe(channel);
subscriber.on('message', (ch, message) => {
if (ch === channel) {
handler(JSON.parse(message));
}
});
}
// Pattern subscription
function subscribePattern(pattern: string, handler: (channel: string, event: any) => void) {
subscriber.psubscribe(pattern);
subscriber.on('pmessage', (pat, channel, message) => {
if (pat === pattern) {
handler(channel, JSON.parse(message));
}
});
}
// Usage
subscribe('notifications', (event) => {
console.log('Notification:', event);
});
subscribePattern('room:*', (channel, event) => {
const roomId = channel.split(':')[1];
console.log(`Room ${roomId}:`, event);
});
publishEvent('notifications', { type: 'alert', message: 'New message' });
publishEvent('room:123', { type: 'message', content: 'Hello!' });
Related Skills
- [[backend]] - Server implementation
- [[system-design]] - Event-driven architecture
- [[cloud-platforms]] - Managed pub/sub services