| name | websocket-implementation |
| description | Implement real-time bidirectional communication with WebSockets including connection management, message routing, and scaling. Use when building real-time features, chat systems, live notifications, or collaborative applications. |
WebSocket Implementation
Overview
Build scalable WebSocket systems for real-time communication with proper connection management, message routing, error handling, and horizontal scaling support.
When to Use
- Building real-time chat and messaging
- Implementing live notifications
- Creating collaborative editing tools
- Broadcasting live data updates
- Building real-time dashboards
- Streaming events to clients
- Live multiplayer games
Instructions
1. Node.js WebSocket Server (Socket.IO)
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const redis = require('redis');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: { origin: '*' },
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: 5
});
// Redis adapter for horizontal scaling
const redisClient = redis.createClient();
const { createAdapter } = require('@socket.io/redis-adapter');
io.adapter(createAdapter(redisClient, redisClient.duplicate()));
// Connection management
const connectedUsers = new Map();
io.on('connection', (socket) => {
console.log(`User connected: ${socket.id}`);
// Store user connection
socket.on('auth', (userData) => {
connectedUsers.set(socket.id, {
userId: userData.id,
username: userData.username,
socketId: socket.id,
connectedAt: new Date()
});
// Join user-specific room
socket.join(`user:${userData.id}`);
socket.join('authenticated_users');
// Notify others user is online
io.to('authenticated_users').emit('user:online', {
userId: userData.id,
username: userData.username,
timestamp: new Date()
});
console.log(`User authenticated: ${userData.username}`);
});
// Chat messaging
socket.on('chat:message', (message) => {
const user = connectedUsers.get(socket.id);
if (!user) {
socket.emit('error', { message: 'Not authenticated' });
return;
}
const chatMessage = {
id: `msg_${Date.now()}`,
senderId: user.userId,
senderName: user.username,
text: message.text,
roomId: message.roomId,
timestamp: new Date(),
status: 'delivered'
};
// Save to database
Message.create(chatMessage);
// Broadcast to room
io.to(`room:${message.roomId}`).emit('chat:message', chatMessage);
// Update message status
setTimeout(() => {
socket.emit('chat:message:ack', { messageId: chatMessage.id, status: 'read' });
}, 100);
});
// Room management
socket.on('room:join', (roomId) => {
socket.join(`room:${roomId}`);
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('room:user:joined', {
userId: user.userId,
username: user.username,
timestamp: new Date()
});
});
socket.on('room:leave', (roomId) => {
socket.leave(`room:${roomId}`);
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('room:user:left', {
userId: user.userId,
timestamp: new Date()
});
});
// Typing indicator
socket.on('typing:start', (roomId) => {
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('typing:indicator', {
userId: user.userId,
username: user.username,
isTyping: true
});
});
socket.on('typing:stop', (roomId) => {
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('typing:indicator', {
userId: user.userId,
isTyping: false
});
});
// Handle disconnection
socket.on('disconnect', () => {
const user = connectedUsers.get(socket.id);
if (user) {
connectedUsers.delete(socket.id);
io.to('authenticated_users').emit('user:offline', {
userId: user.userId,
timestamp: new Date()
});
console.log(`User disconnected: ${user.username}`);
}
});
// Error handling
socket.on('error', (error) => {
console.error(`Socket error: ${error}`);
socket.emit('error', { message: 'An error occurred' });
});
});
// Server methods
const broadcastUserUpdate = (userId, data) => {
io.to(`user:${userId}`).emit('user:update', data);
};
const notifyRoom = (roomId, event, data) => {
io.to(`room:${roomId}`).emit(event, data);
};
const sendDirectMessage = (userId, event, data) => {
io.to(`user:${userId}`).emit(event, data);
};
server.listen(3000, () => {
console.log('WebSocket server listening on port 3000');
});
2. Browser WebSocket Client
class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectDelay = options.reconnectDelay || 1000;
this.listeners = new Map();
this.messageQueue = [];
this.isAuthenticated = false;
this.connect();
}
connect() {
this.socket = io(this.url, {
reconnection: true,
reconnectionDelay: this.reconnectDelay,
reconnectionAttempts: this.maxReconnectAttempts
});
this.socket.on('connect', () => {
console.log('Connected to server');
this.reconnectAttempts = 0;
this.processMessageQueue();
});
this.socket.on('disconnect', () => {
console.log('Disconnected from server');
});
this.socket.on('error', (error) => {
console.error('Socket error:', error);
this.emit('error', error);
});
this.socket.on('connect_error', (error) => {
console.error('Connection error:', error);
});
}
authenticate(userData) {
this.socket.emit('auth', userData, (response) => {
if (response.success) {
this.isAuthenticated = true;
this.emit('authenticated');
}
});
}
on(event, callback) {
this.socket.on(event, callback);
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
emit(event, data, callback) {
if (!this.socket.connected) {
this.messageQueue.push({ event, data, callback });
return;
}
this.socket.emit(event, data, callback);
}
processMessageQueue() {
while (this.messageQueue.length > 0) {
const { event, data, callback } = this.messageQueue.shift();
this.socket.emit(event, data, callback);
}
}
joinRoom(roomId) {
this.emit('room:join', roomId);
}
leaveRoom(roomId) {
this.emit('room:leave', roomId);
}
sendMessage(roomId, text) {
this.emit('chat:message', { roomId, text });
}
setTypingIndicator(roomId, isTyping) {
if (isTyping) {
this.emit('typing:start', roomId);
} else {
this.emit('typing:stop', roomId);
}
}
disconnect() {
this.socket.disconnect();
}
}
// Usage
const client = new WebSocketClient('http://localhost:3000');
client.on('chat:message', (message) => {
console.log('Received message:', message);
displayMessage(message);
});
client.on('typing:indicator', (data) => {
updateTypingIndicator(data);
});
client.on('user:online', (user) => {
updateUserStatus(user.userId, 'online');
});
client.authenticate({ id: 'user123', username: 'john' });
client.joinRoom('room1');
client.sendMessage('room1', 'Hello everyone!');
3. Python WebSocket Server (aiohttp)
from aiohttp import web
import aiohttp
import json
from datetime import datetime
from typing import Set
class WebSocketServer:
def __init__(self):
self.app = web.Application()
self.rooms = {}
self.users = {}
self.setup_routes()
def setup_routes(self):
self.app.router.add_get('/ws', self.websocket_handler)
self.app.router.add_post('/api/message', self.send_message_api)
async def websocket_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
user_id = None
room_id = None
async for msg in ws.iter_any():
if isinstance(msg, aiohttp.WSMessage):
data = json.loads(msg.data)
event_type = data.get('type')
try:
if event_type == 'auth':
user_id = data.get('userId')
self.users[user_id] = ws
await ws.send_json({
'type': 'authenticated',
'timestamp': datetime.now().isoformat()
})
elif event_type == 'join_room':
room_id = data.get('roomId')
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(user_id)
# Notify others
await self.broadcast_to_room(room_id, {
'type': 'user_joined',
'userId': user_id,
'timestamp': datetime.now().isoformat()
}, exclude=user_id)
elif event_type == 'message':
message = {
'id': f'msg_{datetime.now().timestamp()}',
'userId': user_id,
'text': data.get('text'),
'roomId': room_id,
'timestamp': datetime.now().isoformat()
}
# Save to database
await self.save_message(message)
# Broadcast to room
await self.broadcast_to_room(room_id, message)
elif event_type == 'leave_room':
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
except Exception as error:
await ws.send_json({
'type': 'error',
'message': str(error)
})
# Cleanup on disconnect
if user_id:
del self.users[user_id]
if room_id and user_id:
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
return ws
async def broadcast_to_room(self, room_id, message, exclude=None):
if room_id not in self.rooms:
return
for user_id in self.rooms[room_id]:
if user_id != exclude and user_id in self.users:
try:
await self.users[user_id].send_json(message)
except Exception as error:
print(f'Error sending message: {error}')
async def save_message(self, message):
# Save to database
pass
async def send_message_api(self, request):
data = await request.json()
room_id = data.get('roomId')
await self.broadcast_to_room(room_id, {
'type': 'message',
'text': data.get('text'),
'timestamp': datetime.now().isoformat()
})
return web.json_response({'sent': True})
def create_app():
server = WebSocketServer()
return server.app
if __name__ == '__main__':
app = create_app()
web.run_app(app, port=3000)
4. Message Types and Protocols
// Authentication
{
"type": "auth",
"userId": "user123",
"token": "jwt_token_here"
}
// Chat Message
{
"type": "message",
"roomId": "room123",
"text": "Hello everyone!",
"timestamp": "2025-01-15T10:30:00Z"
}
// Typing Indicator
{
"type": "typing",
"roomId": "room123",
"isTyping": true
}
// Presence
{
"type": "presence",
"status": "online|away|offline"
}
// Notification
{
"type": "notification",
"title": "New message",
"body": "You have a new message",
"data": {}
}
5. Scaling with Redis
const redis = require('redis');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const pubClient = createClient({ host: 'redis', port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
// Publish to multiple servers
io.emit('user:action', { userId: 123, action: 'login' });
// Subscribe to events from other servers
redisClient.subscribe('notifications', (message) => {
const notification = JSON.parse(message);
io.to(`user:${notification.userId}`).emit('notification', notification);
});
Best Practices
✅ DO
- Implement proper authentication
- Handle reconnection gracefully
- Manage rooms/channels effectively
- Persist messages appropriately
- Monitor active connections
- Implement presence features
- Use Redis for scaling
- Add message acknowledgment
- Implement rate limiting
- Handle errors properly
❌ DON'T
- Send unencrypted sensitive data
- Keep unlimited message history in memory
- Allow arbitrary room/channel creation
- Forget to clean up disconnected connections
- Send large messages frequently
- Ignore network failures
- Store passwords in messages
- Skip authentication/authorization
- Create unbounded growth of connections
- Ignore scalability from day one
Monitoring
// Track active connections
io.engine.on('connection_error', (err) => {
console.log(err.req); // the request object
console.log(err.code); // the error code, e.g. 1
console.log(err.message); // the error message
console.log(err.context); // some additional error context
});
app.get('/metrics/websocket', (req, res) => {
res.json({
activeConnections: io.engine.clientsCount,
connectedSockets: io.sockets.sockets.size,
rooms: Object.keys(io.sockets.adapter.rooms)
});
});