Advanced Usage
Advanced patterns for building sophisticated WebSocket applications with ws-kit.
Router Composition
Organize your application into modules by composing multiple routers:
import { z, message, createRouter } from "@ws-kit/zod";
type AppData = { userId?: string };
// Define message schemas
const LoginMessage = message("LOGIN", {
email: z.string().email(),
password: z.string(),
});
const SendMessageMessage = message("SEND_MESSAGE", {
text: z.string(),
});
const BroadcastMessage = message("BROADCAST", {
message: z.string(),
});
// Authentication router
const authRouter = createRouter<AppData>();
authRouter.on(LoginMessage, (ctx) => {
// Verify credentials and update connection data
ctx.assignData({ userId: "user_123" });
});
// Chat router
const chatRouter = createRouter<AppData>();
chatRouter.on(SendMessageMessage, (ctx) => {
const userId = ctx.ws.data?.userId;
const roomId = "general"; // or from ctx.ws.data?.roomId
console.log(`Message from ${userId}: ${ctx.payload.text}`);
// Broadcast to topic subscribers (use ctx.publish for convenience)
ctx.publish(`room:${roomId}`, BroadcastMessage, {
message: ctx.payload.text,
});
});
// Compose routers together
const router = createRouter<AppData>();
router.merge(authRouter).merge(chatRouter);The merge() method combines handlers, lifecycle hooks, and middleware from multiple routers.
Middleware
Middleware runs before handlers—use it for authorization, validation, logging, and rate limiting:
import { z, message, createRouter } from "@ws-kit/zod";
type AppData = { userId?: string; roles?: string[] };
const router = createRouter<AppData>();
// Global middleware: authentication check
router.use((ctx, next) => {
if (!ctx.ws.data?.userId && ctx.type !== "LOGIN") {
ctx.error("UNAUTHENTICATED", "Not authenticated");
return; // Skip handler
}
return next();
});
// Per-message middleware: rate limiting
const rateLimiter = new Map<string, number>();
const RateLimitMessage = message("RATE_LIMIT_MESSAGE", {
text: z.string(),
});
router.use(RateLimitMessage, (ctx, next) => {
const userId = ctx.ws.data?.userId || "anon";
const count = (rateLimiter.get(userId) || 0) + 1;
if (count > 10) {
ctx.error("RESOURCE_EXHAUSTED", "Too many messages");
return;
}
rateLimiter.set(userId, count);
return next();
});
router.on(RateLimitMessage, (ctx) => {
console.log(`Message from ${ctx.ws.data?.userId}: ${ctx.payload.text}`);
});Middleware semantics:
router.use(middleware)— Global middleware (runs for all messages)router.use(schema, middleware)— Per-message middleware (runs only for that message)- Middleware can call
ctx.error()to reject or skip callingnext()to prevent handler execution - Middleware can modify connection data via
ctx.assignData()for handlers to access - Both sync and async middleware supported
- Execution order: Global middleware → per-route middleware → handler
Error Handling
Send type-safe error responses with predefined error codes:
import { z, message, createRouter } from "@ws-kit/zod";
const router = createRouter();
const LoginMessage = message("LOGIN", {
email: z.string().email(),
password: z.string(),
});
const QueryMessage = message("QUERY", {
id: z.string(),
});
const QueryResponse = message("QUERY_RESPONSE", {
data: z.any(),
});
router.on(LoginMessage, (ctx) => {
try {
const user = authenticateUser(ctx.payload);
ctx.assignData({ userId: user.id });
} catch (err) {
// Type-safe error code
ctx.error("UNAUTHENTICATED", "Invalid credentials", {
hint: "Check your email and password",
});
}
});
router.on(QueryMessage, (ctx) => {
try {
const result = queryDatabase(ctx.payload.id);
ctx.send(QueryResponse, { data: result });
} catch (err) {
ctx.error("INTERNAL", "Database query failed");
}
});
// Transient errors with retry hints
router.on(ProcessPaymentMessage, (ctx) => {
try {
processPayment(ctx.payload);
} catch (err) {
if (isRateLimited(err)) {
// Transient error: send backoff hint for client retry
ctx.error("RESOURCE_EXHAUSTED", "Rate limit exceeded", undefined, {
retryable: true,
retryAfterMs: 5000, // Client waits 5s before retry
});
} else if (isTemporarilyUnavailable(err)) {
// Infrastructure error: automatic client backoff
ctx.error("UNAVAILABLE", "Service temporarily unavailable");
} else {
// Unexpected error: don't retry
ctx.error("INTERNAL", "Payment processing failed");
}
}
});Standard error codes (gRPC-aligned, see ADR-015):
Clients automatically infer retry behavior from error codes. Use retryAfterMs option to provide backoff hints for transient errors.
Terminal errors (don't retry):
INVALID_ARGUMENT— Invalid payload or schema mismatchUNAUTHENTICATED— Authentication failed (missing or invalid token)PERMISSION_DENIED— Authenticated but lacks rightsNOT_FOUND— Resource not foundFAILED_PRECONDITION— State requirement not metALREADY_EXISTS— Uniqueness or idempotency violationUNIMPLEMENTED— Feature not supported or deployedCANCELLED— Request cancelled by client or peer
Transient errors (automatically retryable):
DEADLINE_EXCEEDED— RPC request timed outRESOURCE_EXHAUSTED— Rate limit, quota, or backpressure exceeded (useretryAfterMsfor backoff)UNAVAILABLE— Transient infrastructure errorABORTED— Concurrency conflict (race condition), automatically retried
Mixed (app-specific):
INTERNAL— Unexpected server error (server decides if retryable)
Discriminated Unions
Use Zod discriminated unions for type-safe message handlers:
import { z, message, createRouter } from "@ws-kit/zod";
// Define individual message schemas
const TextMessage = message("TEXT", {
content: z.string(),
channelId: z.string(),
});
const ImageMessage = message("IMAGE", {
url: z.url(),
channelId: z.string(),
width: z.number(),
height: z.number(),
});
const VideoMessage = message("VIDEO", {
url: z.url(),
channelId: z.string(),
duration: z.number(),
});
// Register handlers for each type
const router = createRouter();
router.on(TextMessage, (ctx) => {
console.log("Text:", ctx.payload.content);
ctx.publish(ctx.payload.channelId, TextMessage, ctx.payload);
});
router.on(ImageMessage, (ctx) => {
console.log(
"Image:",
ctx.payload.url,
ctx.payload.width,
"x",
ctx.payload.height,
);
ctx.publish(ctx.payload.channelId, ImageMessage, ctx.payload);
});
router.on(VideoMessage, (ctx) => {
console.log("Video:", ctx.payload.url, ctx.payload.duration, "s");
ctx.publish(ctx.payload.channelId, VideoMessage, ctx.payload);
});This pattern is useful for protocol versioning, command/query separation, event sourcing, and complex state machines.
Connection Data Type Safety
For large applications, declare your default connection data type once using TypeScript declaration merging:
// types/app-data.d.ts
declare module "@ws-kit/core" {
interface AppDataDefault {
userId?: string;
email?: string;
roles?: string[];
tenant?: string;
}
}Now throughout your app, omit the generic type:
// ✅ No generic needed—automatically uses AppDataDefault
const router = createRouter();
router.on(SecureMessage, (ctx) => {
// ✅ ctx.ws.data is properly typed
const userId = ctx.ws.data?.userId; // string | undefined
const email = ctx.ws.data?.email; // string | undefined
const roles = ctx.ws.data?.roles; // string[] | undefined
});Alternatively, specify the type explicitly for custom routers:
// ✅ Custom data for specific routers
type FeatureData = { feature: string; version: number };
const featureRouter = createRouter<FeatureData>();Heartbeat Configuration
Heartbeat is opt-in and only enabled when explicitly configured:
import { createRouter } from "@ws-kit/zod";
const router = createRouter({
heartbeat: {
intervalMs: 30_000, // Optional: defaults to 30s
timeoutMs: 5_000, // Optional: defaults to 5s
onStaleConnection: (clientId, ws) => {
console.log(`Connection ${clientId} is stale`);
ws.close(1011, "Connection timeout");
},
},
});When to use heartbeat:
- Long-lived connections that need liveness checks
- Detecting network partitions or idle connections
- Cleaning up zombie connections
When not to use:
- Short-lived request/response patterns
- Applications where reconnect is acceptable
- High-throughput scenarios (adds overhead)
Publishing and Broadcasting
Type-safe publish/subscribe with schema validation:
import { z, message, createRouter } from "@ws-kit/zod";
type AppData = { userId?: string; roomId?: string };
const JoinRoom = message("JOIN_ROOM", { roomId: z.string() });
const RoomUpdate = message("ROOM_UPDATE", {
roomId: z.string(),
users: z.number(),
message: z.string(),
});
const router = createRouter<AppData>();
router.on(JoinRoom, (ctx) => {
const { roomId } = ctx.payload;
const userId = ctx.ws.data?.userId;
// Store room ID and subscribe to topic
ctx.assignData({ roomId });
ctx.subscribe(`room:${roomId}`);
// Broadcast to room (type-safe!)
ctx.publish(`room:${roomId}`, RoomUpdate, {
roomId,
users: 5,
message: `User ${userId} joined`,
});
});All broadcast messages are validated against their schemas before being sent, providing the same type safety for broadcasts as for direct messaging.
Key points:
- Use
ctx.publish()in handlers for ergonomic broadcasting (most common) - Use
router.publish()outside handlers (cron jobs, system events) - Both enforce schema validation before transmission
- Topic naming conventions help organize broadcasts (e.g.,
room:${roomId})
RPC Pattern (Request-Response)
For guaranteed request-response patterns with correlation tracking and timeouts, use the rpc() helper to bind request and response schemas:
import { z, rpc, createRouter } from "@ws-kit/zod";
// Define RPC schema - binds request to response type
const GetUser = rpc("GET_USER", { id: z.string() }, "USER_RESPONSE", {
user: z.object({ id: z.string(), name: z.string() }),
});
const router = createRouter();
// Use router.rpc() for type-safe RPC handlers
router.rpc(GetUser, async (ctx) => {
// Optional progress updates before terminal reply
ctx.progress({ stage: "loading" });
const user = await db.users.findById(ctx.payload.id);
if (!user) {
ctx.error("NOT_FOUND", "User not found");
return;
}
// Terminal reply with response schema (type-safe, one-shot guaranteed)
ctx.reply(GetUser.response, { user });
});RPC-specific context methods:
ctx.reply(schema, data)— Terminal response (one-shot, schema-enforced)ctx.progress(data?)— Non-terminal progress updatesctx.abortSignal— Fires on client cancel/disconnectctx.onCancel(callback)— Register cancellation callbackctx.deadline— Server-derived deadline (epoch ms)ctx.timeRemaining()— Milliseconds until deadline
When to use RPC:
- Client needs guaranteed response
- Correlation tracking required
- Progress updates needed
- Timeout handling important
See ADR-015 for complete RPC design and error taxonomy.
Custom Validators
While ws-kit provides official adapters for Zod and Valibot, you can integrate any validation library by implementing the ValidatorAdapter interface:
import { WebSocketRouter, type ValidatorAdapter } from "@ws-kit/core";
// Example: Custom validator adapter
class CustomValidatorAdapter implements ValidatorAdapter {
validate(
schema: unknown,
data: unknown,
): {
success: boolean;
data?: unknown;
error?: { message: string; path?: string[] };
} {
// Your validation logic here
try {
const result = myValidator.parse(schema, data);
return { success: true, data: result };
} catch (err) {
return {
success: false,
error: { message: String(err), path: [] },
};
}
}
}
// Use custom validator with router
const router = new WebSocketRouter({
validator: new CustomValidatorAdapter(),
});Validator requirements:
- Strict mode: Reject unknown keys at all levels (root, meta, payload)
- Payload enforcement: Reject messages with
payloadkey when schema defines none - Type safety: Preserve TypeScript types through validation
- Error reporting: Provide clear error messages with paths
See docs/specs/validation.md for complete requirements and validation flow.
See Also
- Core Concepts — Message routing, lifecycle hooks
- Middleware — Detailed middleware design
- Error Handling — Complete error code taxonomy
- Broadcasting — Pub/sub patterns and throttling