Core Concepts
Understanding these core concepts will help you build robust WebSocket applications with ws-kit.
Recommended: Export-with-Helpers Pattern
Use the modern import pattern for optimal tree-shaking and simplicity:
import { z, message, createRouter } from "@ws-kit/zod";
// Use message() directly - no factory setup needed
const PingMessage = message("PING", { text: z.string() });See Message Schemas (ADR-007) for details on the export-with-helpers pattern.
Message-Based Architecture
ws-kit uses a message-based architecture where all communication follows a consistent structure. This provides several benefits:
- Type Safety: Messages are validated against schemas before reaching handlers
- Predictability: All messages have the same structure, making debugging easier
- Routing: Messages are automatically routed based on their type
- Traceability: Built-in metadata helps track message flow
Message Structure
Every message consists of three parts:
interface Message<T = unknown> {
type: string; // Unique identifier for routing
meta: {
// Metadata (optional, auto-populated on send)
timestamp?: number; // Producer time (client clock, UI display only)
correlationId?: string; // Optional request tracking
};
payload?: T; // Optional validated data
}Server Timestamp Usage
Server logic must use ctx.receivedAt (authoritative server time), not meta.timestamp (client clock, untrusted). See Timestamp Handling below for guidance.
Connection Lifecycle
1. Connection Opening
When a client connects, the router:
- Generates a unique
clientId(UUID v7) - Stores connection metadata in
ws.data - Calls your
onOpenhandler
router.onOpen((ctx) => {
// ctx.ws.data.clientId is always available (UUID v7)
console.log(`Client ${ctx.ws.data.clientId} connected`);
});2. Message Handling
When a message arrives, the router processes it through a security-focused pipeline:
- Capture Timestamp -
ctx.receivedAt = Date.now()(before parsing, authoritative server time) - Parse - JSON.parse() the raw WebSocket message
- Type Check - Ensure
typefield exists - Handler Lookup - Find registered handler for this message type
- Normalize (Security Boundary) - Strip reserved keys (
clientId,receivedAt) to prevent client spoofing - Validate - Schema validation on normalized message (strict mode rejects unknown keys)
- Handler Execution - Your handler receives validated message + server context
Security
Normalization is a security boundary that prevents clients from spoofing server-only fields. Handlers receive only validated, normalized messages.
router.on(ChatMessage, async (ctx) => {
// ctx provides everything you need:
// - ctx.ws: The WebSocket instance
// - ctx.ws.data.clientId: Client identifier (UUID v7, auto-generated)
// - ctx.type: Message type literal from schema
// - ctx.meta: Validated metadata (timestamp, correlationId, custom fields)
// - ctx.payload: Validated message data (conditional - only if schema defines it)
// - ctx.receivedAt: Server receive timestamp (Date.now(), authoritative for server logic)
// - ctx.send: Type-safe send function (1-to-1, to current connection)
// - ctx.publish: Type-safe publish function (1-to-many, to topic subscribers)
// For broadcasting to topic subscribers:
await ctx.publish("chat", ChatMessage, ctx.payload);
// For subscriptions:
ctx.subscribe("room:123");
ctx.unsubscribe("room:456");
});3. Connection Closing
When a client disconnects:
router.onClose((ctx) => {
console.log(
`Client ${ctx.ws.data.clientId} disconnected: ${ctx.code} ${ctx.reason || "N/A"}`,
);
// Clean up resources, notify other clients, etc.
});Type Safety
The router provides full type inference from schema definition to handler:
import { z, message, createRouter } from "@ws-kit/zod";
const UpdateProfileMessage = message("UPDATE_PROFILE", {
name: z.string(),
avatar: z.url().optional(),
});
const router = createRouter();
router.on(UpdateProfileMessage, (ctx) => {
// TypeScript knows:
// - ctx.payload.name is string
// - ctx.payload.avatar is string | undefined
// - ctx.send() only accepts valid message schemas
});Middleware
Middleware runs before handlers to provide cross-cutting concerns like authentication, logging, and rate limiting:
import { createRouter } from "@ws-kit/zod";
type AppData = { userId?: string; roles?: string[] };
const router = createRouter<AppData>();
// Global middleware: runs for all messages
router.use((ctx, next) => {
if (!ctx.ws.data?.userId && ctx.type !== "LOGIN") {
ctx.error("UNAUTHENTICATED", "Not authenticated");
return; // Skip handler
}
return next(); // Continue to handler
});
// Per-route middleware: runs only for specific message
router.use(SendMessage, (ctx, next) => {
if (isRateLimited(ctx.ws.data?.userId)) {
ctx.error("RESOURCE_EXHAUSTED", "Too many messages");
return;
}
return next();
});
router.on(SendMessage, (ctx) => {
// Handler runs if all middleware calls next()
processMessage(ctx.payload);
});Key features:
- Global middleware —
router.use(middleware)runs for all messages - Per-route middleware —
router.use(schema, middleware)runs only for specific messages - Execution order — Global → per-route → handler
- Control flow — Call
next()to continue; omit to skip handler - Context mutation — Middleware can update
ctx.ws.dataviactx.assignData() - Error handling — Call
ctx.error()to reject and stop execution
See Middleware Guide and ADR-008 for complete documentation.
Error Handling
Error Boundaries
All handlers are wrapped in error boundaries to prevent crashes:
router.on(SomeMessage, (ctx) => {
throw new Error("Something went wrong");
// Router catches this and sends an error message to the client
});Standard Error Codes
Use ctx.error() with standard error codes for consistent error handling. Clients automatically infer whether errors are retryable:
// Non-retryable error (client won't retry)
ctx.error("INVALID_ARGUMENT", "Invalid room ID");
// Transient error with backoff hint (client retries after 2s)
ctx.error("RESOURCE_EXHAUSTED", "Server busy", undefined, {
retryable: true,
retryAfterMs: 2000,
});Available error codes (aligned with gRPC standards):
Terminal errors (non-retryable):
UNAUTHENTICATED: Auth token missing, expired, or invalidPERMISSION_DENIED: Authenticated but lacks rightsINVALID_ARGUMENT: Input validation or semantic violationFAILED_PRECONDITION: State requirement not metNOT_FOUND: Resource not foundALREADY_EXISTS: Uniqueness or idempotency violationUNIMPLEMENTED: Feature not supported or deployedCANCELLED: Call cancelled (client disconnect, timeout abort)
Transient errors (automatically retryable):
DEADLINE_EXCEEDED: RPC timed outRESOURCE_EXHAUSTED: Rate limit, quota, or backpressure exceededUNAVAILABLE: Transient infrastructure errorABORTED: Concurrency conflict (race condition)
Mixed (app-specific):
INTERNAL: Unexpected server error (server decides retryability)
Retry Behavior
Clients infer retryability using these rules:
- If
retryablefield is present: Use its value - If
retryablefield is absent:- Transient codes (
DEADLINE_EXCEEDED,RESOURCE_EXHAUSTED,UNAVAILABLE,ABORTED): infertrue - Terminal codes (all others): infer
false INTERNAL: inferfalse(conservative: assume bug, don't retry)
- Transient codes (
Use retryAfterMs to provide backoff hints for transient errors:
// Backoff hints are optional but recommended for rate-limited scenarios
ctx.error("RESOURCE_EXHAUSTED", undefined, undefined, {
retryAfterMs: 5000, // Client waits 5 seconds before retrying
});See Error Handling Spec and ADR-015 for complete error code taxonomy and semantics.
WebSocket Data
The router extends Bun's WebSocket data with typed metadata:
interface WebSocketData<T = unknown> {
clientId: string; // UUID v7, auto-generated by router
} & TPass custom data during upgrade:
// During WebSocket upgrade (using platform adapter)
// Router auto-generates clientId (UUID v7)
serve(router, {
port: 3000,
authenticate(req) {
const token = req.headers.get("authorization");
if (!token) return undefined;
const decoded = decodeToken(token);
return {
userId: decoded.id,
roles: decoded.roles,
};
},
});Context Object
Handler contexts provide access to message data and WebSocket operations:
interface MessageContext<TPayload, TData = unknown> {
ws: ServerWebSocket<TData>; // WebSocket instance
type: string; // Message type literal
meta: {
// Validated metadata
timestamp?: number; // Client timestamp (optional, for UI only)
correlationId?: string; // Optional correlation ID
[key: string]: unknown; // Custom metadata fields
};
receivedAt: number; // Server receive timestamp (authoritative)
// All handlers
send: SendFunction; // Type-safe send to current connection (1-to-1)
publish: PublishFunction; // Type-safe publish to topic subscribers (1-to-many)
error: ErrorFunction; // Type-safe error responses
assignData: AssignDataFunction; // Merge partial data into ctx.ws.data
subscribe: SubscribeFunction; // Subscribe to a channel
unsubscribe: UnsubscribeFunction; // Unsubscribe from a channel
timeRemaining: () => number; // ms until deadline (Infinity for events)
isRpc: boolean; // Flag: is this an RPC message?
payload?: TPayload; // Validated payload (conditional)
// RPC handlers only (when using router.rpc())
reply?: (schema: Schema, data: ResponseType) => void; // Terminal reply, one-shot guarded
progress?: (data?: unknown) => void; // Progress update (non-terminal)
abortSignal?: AbortSignal; // Fires on client cancel/disconnect
onCancel?: (cb: () => void) => () => void; // Register cancel callback
deadline?: number; // Server-derived deadline (epoch ms)
}Key points:
- Access client ID via
ctx.ws.data.clientId(notctx.clientId) - Use
ctx.receivedAtfor server-side logic (rate limiting, ordering, TTL, auditing) - Use
ctx.meta.timestamponly for UI display (not authoritative) - Subscriptions:
ctx.subscribe(topic)andctx.unsubscribe(topic) - Publishing:
await ctx.publish(topic, schema, payload)(1-to-many) orawait router.publish()outside handlers - Sending:
ctx.send(schema, payload)(1-to-1, to current connection) - Custom data: Access
ctx.ws.datadirectly or usectx.assignData()to merge partial updates - RPC: Use
ctx.reply(schema, data)for terminal responses,ctx.progress(data)for streaming updates (only available inrouter.rpc()handlers)
Request-Response Pattern (RPC)
ws-kit provides first-class support for request-response messaging with automatic correlation tracking. Use router.rpc() for handlers that need guaranteed responses:
import { z, rpc, createRouter } from "@ws-kit/zod";
// Define RPC schema - binds request to response type
const GetUser = rpc("GET_USER", { id: z.string() }, "USER_RESPONSE", {
user: UserSchema,
});
const router = createRouter();
// Use router.rpc() for RPC handlers
router.rpc(GetUser, async (ctx) => {
const user = await db.users.findById(ctx.payload.id);
if (!user) {
ctx.error("NOT_FOUND", "User not found");
return;
}
// Terminal reply (type-safe to response schema)
ctx.reply(GetUser.response, { user });
});Key RPC features:
ctx.reply(schema, data)— Terminal response, one-shot guarded (only called once)ctx.progress(data)— Optional streaming updates before terminal replyctx.abortSignal— AbortSignal for cancellation (integrates with fetch, etc.)ctx.onCancel(cb)— Register cleanup callbacks for cancellation- Automatic correlation — No manual tracking needed; client requests match responses
Client-side usage:
import { wsClient, message } from "@ws-kit/client/zod";
const client = wsClient({ url: "ws://localhost:3000" });
// Define a progress message schema (optional, for streaming updates)
const UserLoadProgress = message("USER_LOAD_PROGRESS", {
stage: z.enum(["fetching", "validating"]),
});
// Listen for progress updates (optional)
client.on(UserLoadProgress, (msg) => {
console.log("Progress:", msg.payload.stage);
});
// Make RPC call and await the terminal response
const response = await client.request(GetUser, { id: "123" });
const { user } = response.payload;See RPC Guide and ADR-015 for complete RPC documentation.
Broadcasting and PubSub
Use type-safe publishing for efficient broadcasting to topic subscribers:
router.on(JoinRoomMessage, async (ctx) => {
const roomId = ctx.payload.roomId;
// Subscribe to room topic
ctx.subscribe(`room:${roomId}`);
// Broadcast to all subscribers with type-safe publish
await ctx.publish(`room:${roomId}`, UserJoinedMessage, {
username: ctx.payload.username,
});
});
router.on(LeaveRoomMessage, async (ctx) => {
const roomId = ctx.payload.roomId;
// Unsubscribe when leaving
ctx.unsubscribe(`room:${roomId}`);
// Notify others
await ctx.publish(`room:${roomId}`, UserLeftMessage, {
username: ctx.payload.username,
});
});Key Distinction:
ctx.send(schema, data)— Sends to single connection (1-to-1)ctx.publish(topic, schema, data)— Broadcasts to topic subscribers (1-to-many)router.publish(topic, schema, data)— Use outside handlers (cron jobs, system events)
Both ctx.publish() and router.publish() return Promise<PublishResult> with subscription capability and matched count.
See Broadcasting and ADR-018/ADR-019 for complete documentation.
Timestamp Handling
The router provides two timestamps with different trust levels:
ctx.receivedAt- Server receive timestamp (authoritative,Date.now()captured before parsing)- Use for: Rate limiting, ordering, TTL, auditing, all server-side logic
ctx.meta.timestamp- Producer time (client clock, untrusted, may be skewed/missing)- Use for: UI "sent at" display, optimistic ordering, lag calculation
Rule: Server logic MUST use ctx.receivedAt for all business logic (rate limiting, ordering, TTL, auditing).
router.on(ChatMessage, (ctx) => {
// Rate limiting with server timestamp
const lastMessageTime = messageLog.get(ctx.ws.data.clientId);
if (lastMessageTime && ctx.receivedAt - lastMessageTime < 1000) {
ctx.error(
"RESOURCE_EXHAUSTED",
"Please wait before sending another message",
);
return;
}
messageLog.set(ctx.ws.data.clientId, ctx.receivedAt);
// Store both for different purposes
await saveMessage({
text: ctx.payload.text,
sentAt: ctx.meta.timestamp, // UI display
receivedAt: ctx.receivedAt, // Business logic
});
});Performance Considerations
- Message Parsing: Messages are parsed once and cached
- Validation: Schema validation happens before handler execution
- Error Boundaries: Handlers are wrapped with minimal overhead
- PubSub: Uses platform-native implementations (Bun, Cloudflare DO, etc.) for maximum performance
- Type Safety: Zero runtime overhead—all type checking happens at compile time
- Modular Design: Tree-shakeable imports ensure minimal bundle size
For platform-specific optimizations, see the adapter documentation for your target platform.