Skip to content

Advanced Usage

This guide covers advanced patterns and techniques for building sophisticated WebSocket applications with Bun WebSocket Router.

Discriminated Unions

With the factory pattern (v0.4.0+), you can now use Zod's discriminated unions to create type-safe message handlers:

typescript
import { z } from "zod";
import { createMessageSchema } from "bun-ws-router/zod";

const { messageSchema } = createMessageSchema(z);

// Define individual message schemas
const TextMessage = messageSchema("TEXT", {
  content: z.string(),
  channelId: z.string(),
});

const ImageMessage = messageSchema("IMAGE", {
  url: z.url(),
  channelId: z.string(),
  width: z.number(),
  height: z.number(),
});

const VideoMessage = messageSchema("VIDEO", {
  url: z.url(),
  channelId: z.string(),
  duration: z.number(),
});

// Create a discriminated union
const MediaMessage = z.discriminatedUnion("type", [
  TextMessage,
  ImageMessage,
  VideoMessage,
]);

// Type-safe message handling
function handleMediaMessage(message: z.infer<typeof MediaMessage>) {
  switch (message.type) {
    case "TEXT":
      // TypeScript knows payload has { content, channelId }
      console.log("Text:", message.payload.content);
      break;

    case "IMAGE":
      // TypeScript knows payload has { url, channelId, width, height }
      console.log("Image:", message.payload.url, message.payload.width);
      break;

    case "VIDEO":
      // TypeScript knows payload has { url, channelId, duration }
      console.log("Video:", message.payload.url, message.payload.duration);
      break;
  }
}

// Use in router - register individual handlers
router
  .onMessage(TextMessage, (ctx) => {
    // Handle text specifically
  })
  .onMessage(ImageMessage, (ctx) => {
    // Handle images specifically
  })
  .onMessage(VideoMessage, (ctx) => {
    // Handle videos specifically
  });

This pattern is especially useful for:

  • Protocol versioning
  • Command/query separation
  • Event sourcing patterns
  • Complex state machines

Router Composition

Compose multiple routers to organize your application into modules:

typescript
import { z } from "zod";
import { WebSocketRouter, createMessageSchema } from "bun-ws-router/zod";

const { messageSchema } = createMessageSchema(z);

// Authentication router
const authRouter = new WebSocketRouter()
  .onMessage(LoginMessage, handleLogin)
  .onMessage(LogoutMessage, handleLogout)
  .onMessage(RefreshTokenMessage, handleRefresh);

// Chat router
const chatRouter = new WebSocketRouter()
  .onMessage(SendMessageMessage, handleSendMessage)
  .onMessage(EditMessageMessage, handleEditMessage)
  .onMessage(DeleteMessageMessage, handleDeleteMessage);

// Admin router
const adminRouter = new WebSocketRouter()
  .onMessage(KickUserMessage, handleKickUser)
  .onMessage(BanUserMessage, handleBanUser)
  .onMessage(MuteUserMessage, handleMuteUser);

// Main router combining all
const mainRouter = new WebSocketRouter()
  .addRoutes(authRouter)
  .addRoutes(chatRouter)
  .addRoutes(adminRouter)
  .onOpen(handleConnection)
  .onClose(handleDisconnection);

Middleware Pattern

Implement middleware for cross-cutting concerns:

typescript
// Middleware type
type Middleware<T = unknown> = (
  ctx: MessageContext<any, T>,
  next: () => void | Promise<void>,
) => void | Promise<void>;

// Authentication middleware
const requireAuth: Middleware = async (ctx, next) => {
  const userData = ctx.getData<{ authenticated?: boolean }>();

  if (!userData.authenticated) {
    ctx.send(ErrorMessage, {
      code: "AUTHENTICATION_FAILED",
      message: "Authentication required",
    });
    return;
  }

  await next();
};

// Logging middleware
const logMessages: Middleware = async (ctx, next) => {
  const start = Date.now();
  console.log(
    `[${ctx.clientId}] Received message type: ${ctx.ws.data.lastMessageType}`,
  );

  await next();

  const duration = Date.now() - start;
  console.log(`[${ctx.clientId}] Processed in ${duration}ms`);
};

// Apply middleware wrapper
function withMiddleware<T>(
  handler: (ctx: MessageContext<T>) => void,
  ...middleware: Middleware[]
): (ctx: MessageContext<T>) => void {
  return (ctx) => {
    let index = 0;

    const next = () => {
      if (index >= middleware.length) {
        return handler(ctx);
      }

      const mw = middleware[index++];
      return mw(ctx, next);
    };

    return next();
  };
}

// Use with router
router.onMessage(
  ProtectedMessage,
  withMiddleware(handleProtectedAction, requireAuth, logMessages),
);

Custom Context Extensions

Extend the context with custom functionality:

typescript
// Extended context with utilities
class ExtendedContext<T, TData = unknown> {
  constructor(private ctx: MessageContext<T, TData>) {}

  // Broadcast to all connected clients
  broadcast(message: Message) {
    this.ctx.publish("global", message);
  }

  // Send error with standard format
  sendError(code: string, message: string, context?: any) {
    this.ctx.send(ErrorMessage, {
      code,
      message,
      context,
    });
  }

  // Reply to a message with correlation
  reply(type: string, payload?: any) {
    this.ctx.send({
      type,
      meta: {
        clientId: this.ctx.clientId,
        timestamp: Date.now(),
        correlationId: this.ctx.ws.data.lastCorrelationId,
      },
      payload,
    });
  }

  // Check if user has role
  hasRole(role: string): boolean {
    const userData = this.ctx.getData<{ roles?: string[] }>();
    return userData.roles?.includes(role) ?? false;
  }
}

// Helper to wrap handlers
function extendedHandler<T>(
  handler: (ctx: ExtendedContext<T>) => void,
): (ctx: MessageContext<T>) => void {
  return (ctx) => handler(new ExtendedContext(ctx));
}

// Use in router
router.onMessage(
  AdminMessage,
  extendedHandler((ctx) => {
    if (!ctx.hasRole("admin")) {
      ctx.sendError("AUTHORIZATION_FAILED", "Admin access required");
      return;
    }

    ctx.broadcast({
      type: "ADMIN_ANNOUNCEMENT",
      payload: ctx.ctx.payload,
    });
  }),
);

State Management

Implement centralized state management:

typescript
// State store
class StateStore {
  private state = new Map<string, any>();
  private subscribers = new Map<string, Set<(state: any) => void>>();

  get<T>(key: string): T | undefined {
    return this.state.get(key);
  }

  set(key: string, value: any) {
    this.state.set(key, value);
    this.notify(key, value);
  }

  update<T>(key: string, updater: (current: T) => T) {
    const current = this.get<T>(key);
    const updated = updater(current!);
    this.set(key, updated);
  }

  subscribe(key: string, callback: (state: any) => void) {
    if (!this.subscribers.has(key)) {
      this.subscribers.set(key, new Set());
    }
    this.subscribers.get(key)!.add(callback);

    // Return unsubscribe function
    return () => {
      this.subscribers.get(key)?.delete(callback);
    };
  }

  private notify(key: string, value: any) {
    this.subscribers.get(key)?.forEach((cb) => cb(value));
  }
}

// Global state
const store = new StateStore();

// Room state example
interface RoomState {
  users: Map<string, { username: string; joinedAt: number }>;
  messages: Array<{ id: string; text: string; userId: string }>;
}

router.onMessage(JoinRoomMessage, (ctx) => {
  const { roomId, username } = ctx.payload;

  // Update room state
  store.update<RoomState>(
    `room:${roomId}`,
    (
      room = {
        users: new Map(),
        messages: [],
      },
    ) => ({
      ...room,
      users: new Map(room.users).set(ctx.clientId, {
        username,
        joinedAt: Date.now(),
      }),
    }),
  );

  // Subscribe to room state changes
  const unsubscribe = store.subscribe(`room:${roomId}`, (roomState) => {
    ctx.send({
      type: "ROOM_STATE_UPDATE",
      payload: {
        userCount: roomState.users.size,
        latestMessages: roomState.messages.slice(-10),
      },
    });
  });

  // Clean up on disconnect
  ctx.ws.data.cleanupCallbacks = [
    ...(ctx.ws.data.cleanupCallbacks || []),
    unsubscribe,
  ];
});

Message Queuing

Implement message queuing for reliability:

typescript
class MessageQueue {
  private queues = new Map<string, Message[]>();
  private processing = new Set<string>();

  enqueue(clientId: string, message: Message) {
    if (!this.queues.has(clientId)) {
      this.queues.set(clientId, []);
    }
    this.queues.get(clientId)!.push(message);
  }

  async process(
    clientId: string,
    handler: (message: Message) => Promise<void>,
  ) {
    if (this.processing.has(clientId)) {
      return; // Already processing
    }

    this.processing.add(clientId);
    const queue = this.queues.get(clientId) || [];

    while (queue.length > 0) {
      const message = queue.shift()!;
      try {
        await handler(message);
      } catch (error) {
        console.error(`Failed to process message:`, error);
        // Could implement retry logic here
      }
    }

    this.processing.delete(clientId);
  }

  clear(clientId: string) {
    this.queues.delete(clientId);
    this.processing.delete(clientId);
  }
}

const messageQueue = new MessageQueue();

// Queue messages when client is processing
router.onMessage(QueuedMessage, async (ctx) => {
  const userData = ctx.getData<{ isProcessing?: boolean }>();

  if (userData.isProcessing) {
    // Queue message for later
    messageQueue.enqueue(ctx.clientId, {
      type: QueuedMessage.type,
      meta: {
        clientId: ctx.clientId,
        timestamp: Date.now(),
      },
      payload: ctx.payload,
    });
    return;
  }

  // Process immediately
  ctx.setData({ ...userData, isProcessing: true });

  try {
    await processMessage(ctx.payload);

    // Process any queued messages
    await messageQueue.process(ctx.clientId, async (msg) => {
      await processMessage(msg.payload);
    });
  } finally {
    ctx.setData({ ...userData, isProcessing: false });
  }
});

Connection Pooling

Manage groups of connections efficiently:

typescript
class ConnectionPool {
  private pools = new Map<string, Set<ServerWebSocket>>();

  add(poolId: string, ws: ServerWebSocket) {
    if (!this.pools.has(poolId)) {
      this.pools.set(poolId, new Set());
    }
    this.pools.get(poolId)!.add(ws);
  }

  remove(poolId: string, ws: ServerWebSocket) {
    this.pools.get(poolId)?.delete(ws);

    // Clean up empty pools
    if (this.pools.get(poolId)?.size === 0) {
      this.pools.delete(poolId);
    }
  }

  broadcast(poolId: string, message: Message, exclude?: string) {
    const pool = this.pools.get(poolId);
    if (!pool) return;

    const messageStr = JSON.stringify(message);

    for (const ws of pool) {
      if (ws.data.clientId !== exclude) {
        ws.send(messageStr);
      }
    }
  }

  getSize(poolId: string): number {
    return this.pools.get(poolId)?.size || 0;
  }

  getAll(poolId: string): ServerWebSocket[] {
    return Array.from(this.pools.get(poolId) || []);
  }
}

const connectionPool = new ConnectionPool();

// Use in router
router
  .onOpen((ws) => {
    // Add to global pool
    connectionPool.add("global", ws);
  })

  .onMessage(JoinPoolMessage, (ctx) => {
    const { poolId } = ctx.payload;

    // Add to specific pool
    connectionPool.add(poolId, ctx.ws);

    // Notify pool members
    connectionPool.broadcast(
      poolId,
      {
        type: "USER_JOINED_POOL",
        payload: {
          userId: ctx.clientId,
          poolSize: connectionPool.getSize(poolId),
        },
      },
      ctx.clientId,
    );
  })

  .onClose((ws) => {
    // Remove from all pools
    connectionPool.remove("global", ws);
    // Remove from other pools...
  });

Schema Versioning

Handle message schema evolution:

typescript
// Version 1 schema
const UserMessageV1 = messageSchema("USER_UPDATE", {
  name: z.string(),
  email: z.email(),
});

// Version 2 schema with additional field
const UserMessageV2 = messageSchema("USER_UPDATE", {
  name: z.string(),
  email: z.email(),
  avatar: z.url().optional(),
  version: z.literal(2).default(2),
});

// Migration function
function migrateUserMessage(data: any): z.infer<typeof UserMessageV2.schema> {
  if (!data.version || data.version === 1) {
    return {
      ...data,
      avatar: undefined,
      version: 2,
    };
  }
  return data;
}

// Versioned handler
router.onMessage(UserMessageV2, (ctx) => {
  const migrated = migrateUserMessage(ctx.payload);

  // Process with latest schema
  updateUser(migrated);
});

Performance Optimization

Tips for optimizing WebSocket performance:

typescript
// 1. Batch operations
class BatchProcessor {
  private batch: Array<() => void> = [];
  private timer?: Timer;

  add(operation: () => void) {
    this.batch.push(operation);

    if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), 10);
    }
  }

  flush() {
    const operations = this.batch;
    this.batch = [];
    this.timer = undefined;

    // Process all at once
    operations.forEach((op) => op());
  }
}

// 2. Message compression (for large payloads)
import { gzipSync, gunzipSync } from "zlib";

const CompressedMessage = messageSchema("COMPRESSED", {
  encoding: z.literal("gzip"),
  data: z.string(), // Base64 encoded
});

function compressMessage(message: Message): Message {
  const json = JSON.stringify(message);
  const compressed = gzipSync(json);

  return {
    type: "COMPRESSED",
    payload: {
      encoding: "gzip",
      data: compressed.toString("base64"),
    },
  };
}

// 3. Connection pooling for broadcasts
const BROADCAST_CHUNK_SIZE = 100;

async function broadcastInChunks(
  connections: ServerWebSocket[],
  message: string,
) {
  for (let i = 0; i < connections.length; i += BROADCAST_CHUNK_SIZE) {
    const chunk = connections.slice(i, i + BROADCAST_CHUNK_SIZE);

    // Send to chunk
    chunk.forEach((ws) => ws.send(message));

    // Yield to event loop
    if (i + BROADCAST_CHUNK_SIZE < connections.length) {
      await new Promise((resolve) => setImmediate(resolve));
    }
  }
}

Next Steps