Skip to content

WebSockets Guide

Learn how to build real-time applications with WebSocket support in Veloce-TS v0.2.6.

Veloce-TS provides comprehensive WebSocket support with:

  • Decorator-based Handlers for clean WebSocket code
  • Authentication Integration with JWT tokens
  • Room Management for group communications
  • Message Broadcasting to multiple clients
  • Type-safe Events with Zod validation
  • Automatic Reconnection handling
import { VeloceTS } from 'veloce-ts';
const app = new VeloceTS();
app.install('websocket', {
path: '/ws'
});
app.install('websocket', {
path: '/ws', // WebSocket endpoint path
cors: true, // Enable CORS
authentication: true, // Enable JWT authentication
heartbeatInterval: 30000, // Heartbeat interval in ms
maxConnections: 1000 // Maximum concurrent connections
});
import { WebSocket, OnConnect, OnMessage, OnDisconnect } from 'veloce-ts';
@WebSocket('/chat')
export class ChatHandler {
@OnConnect()
async onConnect(socket: any) {
console.log('Client connected:', socket.id);
// Send welcome message
socket.emit('welcome', {
message: 'Welcome to the chat!',
timestamp: new Date().toISOString()
});
}
@OnMessage('message')
async onMessage(socket: any, data: any) {
console.log('Received message:', data);
// Broadcast message to all connected clients
socket.broadcast.emit('new-message', {
id: generateId(),
content: data.content,
sender: data.sender,
timestamp: new Date().toISOString()
});
}
@OnDisconnect()
async onDisconnect(socket: any) {
console.log('Client disconnected:', socket.id);
}
}
import { WebSocket, OnConnect, OnMessage, OnDisconnect } from 'veloce-ts';
import { z } from 'zod';
const JoinRoomSchema = z.object({
roomId: z.string(),
userId: z.string()
});
const ChatMessageSchema = z.object({
roomId: z.string(),
content: z.string().min(1),
senderId: z.string()
});
@WebSocket('/rooms')
export class RoomHandler {
@OnConnect()
async onConnect(socket: any) {
console.log('Client connected to rooms:', socket.id);
}
@OnMessage('join-room')
async onJoinRoom(socket: any, data: z.infer<typeof JoinRoomSchema>) {
const { roomId, userId } = JoinRoomSchema.parse(data);
// Join the room
socket.join(roomId);
// Notify room members
socket.to(roomId).emit('user-joined', {
userId,
roomId,
timestamp: new Date().toISOString()
});
// Send current room info
socket.emit('room-joined', {
roomId,
memberCount: await this.getRoomMemberCount(roomId)
});
}
@OnMessage('leave-room')
async onLeaveRoom(socket: any, data: { roomId: string; userId: string }) {
const { roomId, userId } = data;
socket.leave(roomId);
// Notify room members
socket.to(roomId).emit('user-left', {
userId,
roomId,
timestamp: new Date().toISOString()
});
}
@OnMessage('room-message')
async onRoomMessage(socket: any, data: z.infer<typeof ChatMessageSchema>) {
const { roomId, content, senderId } = ChatMessageSchema.parse(data);
// Broadcast message to room members
socket.to(roomId).emit('room-message', {
id: generateId(),
roomId,
content,
senderId,
timestamp: new Date().toISOString()
});
// Store message in database
await this.messageService.create({
roomId,
content,
senderId,
timestamp: new Date()
});
}
@OnDisconnect()
async onDisconnect(socket: any) {
console.log('Client disconnected from rooms:', socket.id);
// Handle cleanup if needed
await this.handleUserDisconnect(socket.id);
}
private async getRoomMemberCount(roomId: string): Promise<number> {
// Implementation depends on your WebSocket library
// This is a placeholder
return 1;
}
private async handleUserDisconnect(socketId: string) {
// Clean up user sessions, notify rooms, etc.
}
}
DecoratorPurposeExample
@WebSocket(path)Marks a class as a WebSocket handler@WebSocket('/chat')
@OnConnect()Handles client connections@OnConnect()
@OnMessage(event)Handles specific message events@OnMessage('message')
@OnDisconnect()Handles client disconnections@OnDisconnect()
import { WebSocket, OnMessage } from 'veloce-ts';
import { z } from 'zod';
const TypingSchema = z.object({
roomId: z.string(),
userId: z.string(),
isTyping: z.boolean()
});
const FileUploadSchema = z.object({
roomId: z.string(),
fileName: z.string(),
fileSize: z.number(),
fileType: z.string()
});
@WebSocket('/chat')
export class AdvancedChatHandler {
@OnMessage('typing')
async onTyping(socket: any, data: z.infer<typeof TypingSchema>) {
const { roomId, userId, isTyping } = TypingSchema.parse(data);
// Broadcast typing status to room members (except sender)
socket.to(roomId).emit('user-typing', {
userId,
isTyping,
timestamp: new Date().toISOString()
});
}
@OnMessage('file-upload')
async onFileUpload(socket: any, data: z.infer<typeof FileUploadSchema>) {
const { roomId, fileName, fileSize, fileType } = FileUploadSchema.parse(data);
// Validate file
if (fileSize > 10 * 1024 * 1024) { // 10MB limit
socket.emit('upload-error', {
message: 'File too large. Maximum size is 10MB.'
});
return;
}
// Process file upload
const fileUrl = await this.fileService.uploadFile(data);
// Broadcast file info to room
socket.to(roomId).emit('file-shared', {
id: generateId(),
roomId,
fileName,
fileSize,
fileType,
fileUrl,
timestamp: new Date().toISOString()
});
}
@OnMessage('get-online-users')
async onGetOnlineUsers(socket: any, data: { roomId: string }) {
const { roomId } = data;
const onlineUsers = await this.getOnlineUsers(roomId);
socket.emit('online-users', {
roomId,
users: onlineUsers,
timestamp: new Date().toISOString()
});
}
}
@WebSocket('/chat')
export class LiveChatHandler {
private activeUsers = new Map<string, any>();
@OnConnect()
async onConnect(socket: any) {
console.log('User connected:', socket.id);
}
@OnMessage('join-chat')
async onJoinChat(socket: any, data: { username: string }) {
const { username } = data;
// Store user info
this.activeUsers.set(socket.id, { username, joinedAt: new Date() });
// Broadcast user joined
socket.broadcast.emit('user-joined', {
username,
timestamp: new Date().toISOString()
});
// Send current users list
const users = Array.from(this.activeUsers.values()).map(u => u.username);
socket.emit('users-list', { users });
}
@OnMessage('chat-message')
async onChatMessage(socket: any, data: { message: string }) {
const user = this.activeUsers.get(socket.id);
if (!user) return;
const messageData = {
id: generateId(),
username: user.username,
message: data.message,
timestamp: new Date().toISOString()
};
// Broadcast to all users
socket.broadcast.emit('chat-message', messageData);
// Store in database
await this.messageService.create(messageData);
}
@OnDisconnect()
async onDisconnect(socket: any) {
const user = this.activeUsers.get(socket.id);
if (user) {
// Broadcast user left
socket.broadcast.emit('user-left', {
username: user.username,
timestamp: new Date().toISOString()
});
this.activeUsers.delete(socket.id);
}
}
}
@WebSocket('/notifications')
export class NotificationHandler {
@OnConnect()
async onConnect(socket: any) {
console.log('Notification client connected:', socket.id);
}
@OnMessage('subscribe-notifications')
async onSubscribeNotifications(socket: any, data: { userId: string }) {
const { userId } = data;
// Join user-specific room
socket.join(`user-${userId}`);
// Send pending notifications
const pendingNotifications = await this.notificationService.getPending(userId);
socket.emit('pending-notifications', {
notifications: pendingNotifications
});
}
@OnMessage('mark-as-read')
async onMarkAsRead(socket: any, data: { notificationId: string; userId: string }) {
const { notificationId, userId } = data;
await this.notificationService.markAsRead(notificationId);
socket.emit('notification-read', {
notificationId,
timestamp: new Date().toISOString()
});
}
// Method to send notification to specific user
async sendNotificationToUser(userId: string, notification: any) {
const io = this.getSocketIO();
io.to(`user-${userId}`).emit('new-notification', {
...notification,
timestamp: new Date().toISOString()
});
}
// Method to broadcast notification to all users
async broadcastNotification(notification: any) {
const io = this.getSocketIO();
io.emit('broadcast-notification', {
...notification,
timestamp: new Date().toISOString()
});
}
}
@WebSocket('/collaboration')
export class CollaborationHandler {
private documentStates = new Map<string, any>();
@OnMessage('join-document')
async onJoinDocument(socket: any, data: { documentId: string; userId: string }) {
const { documentId, userId } = data;
// Join document room
socket.join(`document-${documentId}`);
// Send current document state
const currentState = this.documentStates.get(documentId) || { content: '', version: 0 };
socket.emit('document-state', currentState);
// Notify other collaborators
socket.to(`document-${documentId}`).emit('collaborator-joined', {
userId,
documentId,
timestamp: new Date().toISOString()
});
}
@OnMessage('document-change')
async onDocumentChange(socket: any, data: {
documentId: string;
userId: string;
changes: any;
version: number
}) {
const { documentId, userId, changes, version } = data;
// Validate version
const currentVersion = this.documentStates.get(documentId)?.version || 0;
if (version !== currentVersion + 1) {
socket.emit('version-conflict', {
currentVersion,
receivedVersion: version
});
return;
}
// Update document state
const newState = {
...this.documentStates.get(documentId),
content: this.applyChanges(this.documentStates.get(documentId)?.content || '', changes),
version: version,
lastModifiedBy: userId,
lastModifiedAt: new Date()
};
this.documentStates.set(documentId, newState);
// Broadcast changes to other collaborators
socket.to(`document-${documentId}`).emit('document-updated', {
documentId,
changes,
version,
modifiedBy: userId,
timestamp: new Date().toISOString()
});
}
@OnMessage('cursor-position')
async onCursorPosition(socket: any, data: {
documentId: string;
userId: string;
position: number
}) {
const { documentId, userId, position } = data;
// Broadcast cursor position to other collaborators
socket.to(`document-${documentId}`).emit('cursor-moved', {
documentId,
userId,
position,
timestamp: new Date().toISOString()
});
}
private applyChanges(content: string, changes: any[]): string {
// Apply operational transforms
// This is a simplified example
let result = content;
for (const change of changes) {
if (change.type === 'insert') {
result = result.slice(0, change.position) + change.text + result.slice(change.position);
} else if (change.type === 'delete') {
result = result.slice(0, change.position) + result.slice(change.position + change.length);
}
}
return result;
}
}
import { WebSocket, OnConnect, Auth } from 'veloce-ts';
@WebSocket('/secure')
export class SecureHandler {
@OnConnect()
@Auth() // Require authentication
async onConnect(socket: any, user: any) {
console.log('Authenticated user connected:', user.username);
// Join user-specific room
socket.join(`user-${user.id}`);
socket.emit('authenticated', {
message: 'Successfully authenticated',
user: {
id: user.id,
username: user.username,
role: user.role
}
});
}
@OnMessage('private-message')
@Auth()
async onPrivateMessage(socket: any, data: { targetUserId: string; message: string }, user: any) {
const { targetUserId, message } = data;
// Send private message to specific user
const io = this.getSocketIO();
io.to(`user-${targetUserId}`).emit('private-message', {
from: {
id: user.id,
username: user.username
},
message,
timestamp: new Date().toISOString()
});
}
}
@WebSocket('/chat')
export class ChatHandler {
@OnMessage('message')
async onMessage(socket: any, data: any) {
try {
// Validate data
const { content, roomId } = this.validateMessage(data);
// Process message
await this.processMessage(socket, { content, roomId });
} catch (error) {
// Send error to client
socket.emit('error', {
message: error.message,
code: error.code || 'UNKNOWN_ERROR'
});
// Log error for debugging
console.error('WebSocket error:', error);
}
}
private validateMessage(data: any) {
if (!data.content || typeof data.content !== 'string') {
throw new Error('Invalid message content');
}
if (!data.roomId || typeof data.roomId !== 'string') {
throw new Error('Invalid room ID');
}
return data;
}
}
@WebSocket('/chat')
export class ChatHandler {
private messageCounts = new Map<string, number>();
@OnMessage('message')
async onMessage(socket: any, data: any) {
const userId = socket.userId;
const now = Date.now();
const userKey = `${userId}-${Math.floor(now / 60000)}`; // Per minute
// Check rate limit
const messageCount = this.messageCounts.get(userKey) || 0;
if (messageCount > 60) { // 60 messages per minute
socket.emit('rate-limit-exceeded', {
message: 'Too many messages. Please slow down.'
});
return;
}
// Update count
this.messageCounts.set(userKey, messageCount + 1);
// Process message
await this.processMessage(socket, data);
}
}
@WebSocket('/chat')
export class ChatHandler {
private connections = new Map<string, any>();
@OnConnect()
async onConnect(socket: any) {
// Store connection info
this.connections.set(socket.id, {
connectedAt: new Date(),
userId: socket.userId,
rooms: new Set()
});
// Set up heartbeat
socket.heartbeatInterval = setInterval(() => {
if (!socket.connected) {
this.handleDisconnection(socket.id);
return;
}
socket.emit('ping');
}, 30000);
}
@OnDisconnect()
async onDisconnect(socket: any) {
this.handleDisconnection(socket.id);
}
private handleDisconnection(socketId: string) {
const connection = this.connections.get(socketId);
if (connection) {
// Clean up
if (connection.heartbeatInterval) {
clearInterval(connection.heartbeatInterval);
}
this.connections.delete(socketId);
// Notify rooms
this.notifyUserDisconnected(connection.userId, connection.rooms);
}
}
}