API Reference
Complete API documentation for WS-Kit.
Core Exports
@ws-kit/zod
import { z, message, rpc, createRouter } from "@ws-kit/zod";z- Re-exported Zod instance (canonical import source)message()- Create type-safe message schemasrpc()- Create request-response (RPC) schemascreateRouter()- Create a type-safe WebSocket router
@ws-kit/valibot
import { v, message, rpc, createRouter } from "@ws-kit/valibot";v- Re-exported Valibot instance (canonical import source)message()- Create type-safe message schemasrpc()- Create request-response (RPC) schemascreateRouter()- Create a type-safe WebSocket router
@ws-kit/client/zod and @ws-kit/client/valibot
import { wsClient } from "@ws-kit/client/zod";
// or
import { wsClient } from "@ws-kit/client/valibot";wsClient()- Create a type-safe WebSocket client
@ws-kit/bun
import { serve, createBunHandler } from "@ws-kit/bun";serve()- High-level server functioncreateBunHandler()- Low-level handler for custom setups
Schema Creation
message()
Create a type-safe message schema.
Signatures:
// Simple message (no payload)
function message<T extends string>(type: T): MessageSchema<T>;
// With payload (Zod object or raw shape)
function message<
T extends string,
P extends ZodObject<ZodRawShape> | ZodRawShape,
>(type: T, payload: P): MessageSchema<T, P>;
// With payload and custom metadata
function message<
T extends string,
P extends ZodObject<ZodRawShape> | ZodRawShape,
M extends ZodRawShape,
>(type: T, payload: P, meta: M): MessageSchema<T, P, M>;
// With config object (payload and/or response)
function message<T extends string>(
type: T,
config: { payload?: P; response?: R; meta?: M },
): MessageSchema<T>;Parameters:
type- Unique message type identifier (string literal)payload- Zod object, raw shape, or config object for payload validation (optional)meta- Custom metadata schema fields (optional, cannot use reserved keys: clientId, receivedAt)
Returns: MessageSchema with full TypeScript inference
Examples:
import { z, message } from "@ws-kit/zod";
// Simple message (no payload)
const PingMessage = message("PING");
// With payload (raw shape)
const ChatMessage = message("CHAT_MESSAGE", {
text: z.string(),
roomId: z.string(),
});
// With Zod object
const ChatMessage = message(
"CHAT_MESSAGE",
z.object({
text: z.string(),
roomId: z.string(),
}),
);
// With custom metadata
const TrackedMessage = message(
"TRACKED_ACTION",
{ action: z.string() },
{ traceId: z.string() },
);Reserved Meta Keys: Cannot use clientId or receivedAt in custom metadata - these are server-controlled fields injected after validation.
rpc()
Create a request-response (RPC) schema that binds request and response types.
Signature:
function rpc<
ReqT extends string,
ReqP extends ZodType | ValibotSchema | undefined,
ResT extends string,
ResP extends ZodType | ValibotSchema | undefined,
>(
requestType: ReqT,
requestPayload: ReqP,
responseType: ResT,
responsePayload: ResP,
): RpcSchema<ReqT, ReqP, ResT, ResP>;Parameters:
requestType- Message type for the requestrequestPayload- Validation schema for request payload (useundefinedfor no payload)responseType- Message type for the responseresponsePayload- Validation schema for response payload (useundefinedfor no payload)
Returns: RPC schema with .response property for type inference
Examples:
import { z, rpc } from "@ws-kit/zod";
// Simple RPC
const Ping = rpc("PING", { text: z.string() }, "PONG", { reply: z.string() });
// RPC with no payloads
const Heartbeat = rpc("HEARTBEAT", undefined, "HEARTBEAT_ACK", undefined);
// Complex RPC
const GetUser = rpc("GET_USER", { userId: z.string() }, "USER_RESPONSE", {
user: UserSchema,
});Router API
createRouter()
Create a type-safe WebSocket router.
Signature:
function createRouter<TData extends WebSocketData = WebSocketData>(
options?: WebSocketRouterOptions<ZodAdapter, TData>,
): WebSocketRouter<ZodAdapter, TData>;Type Parameters:
TData- Custom connection data type (extends{ clientId: string })
Options:
interface WebSocketRouterOptions<V, TData> {
// Core adapters
validator?: V; // Validator adapter (auto-configured by createRouter)
platform?: PlatformAdapter; // Platform adapter (optional)
pubsub?: PubSub; // PubSub implementation (lazy: MemoryPubSub on first use)
// Lifecycle hooks
hooks?: {
onOpen?: OpenHandler<TData>;
onClose?: CloseHandler<TData>;
onAuth?: AuthHandler<TData>;
onError?: ErrorHandler<TData>;
};
// Heartbeat configuration (opt-in, not initialized by default)
heartbeat?: {
intervalMs?: number; // Default: 30000
timeoutMs?: number; // Default: 5000
onStaleConnection?: (clientId: string, ws: ServerWebSocket<TData>) => void;
};
// Limits and configuration
limits?: {
maxPayloadBytes?: number; // Default: 1000000 (1MB)
};
socketBufferLimitBytes?: number; // Default: 1000000 (backpressure threshold)
rpcTimeoutMs?: number; // Default: 30000
dropProgressOnBackpressure?: boolean; // Default: true
maxInflightRpcsPerSocket?: number; // Default: 1000
rpcIdleTimeoutMs?: number; // Default: rpcTimeoutMs + 10000
// Error handling
autoSendErrorOnThrow?: boolean; // Default: true
exposeErrorDetails?: boolean; // Default: false
warnIncompleteRpc?: boolean; // Default: true (dev mode only)
}Example:
import { createRouter } from "@ws-kit/zod";
type AppData = { userId?: string; roles?: string[] };
const router = createRouter<AppData>({
heartbeat: {
intervalMs: 30000,
timeoutMs: 5000,
},
limits: {
maxPayloadBytes: 1_000_000,
},
autoSendErrorOnThrow: true,
exposeErrorDetails: false,
});Router Methods
on(schema, handler)
Register a handler for fire-and-forget messages or pub/sub events.
on<Schema extends MessageSchemaType>(
schema: Schema,
handler: EventHandler<Schema, TData>
): this;Parameters:
schema- Message schema created withmessage()handler- Event handler function (receivesEventMessageContext)
Returns: Router instance for chaining
Example:
router.on(ChatMessage, (ctx) => {
// ctx.payload is fully typed
console.log(`Message from ${ctx.ws.data.userId}: ${ctx.payload.text}`);
// Publish to room subscribers
ctx.publish(`room:${ctx.payload.roomId}`, ChatMessage, ctx.payload);
});rpc(schema, handler)
Register a handler for request-response (RPC) messages.
rpc<Schema extends MessageSchemaType>(
schema: Schema,
handler: RpcHandler<Schema, TData>
): this;Parameters:
schema- RPC message schema created withrpc()handler- RPC handler function (receivesRpcMessageContext)
Returns: Router instance for chaining
Example:
const GetUser = rpc("GET_USER", { userId: z.string() }, "USER_RESPONSE", {
user: UserSchema,
});
router.rpc(GetUser, async (ctx) => {
const user = await db.users.findById(ctx.payload.userId);
if (!user) {
ctx.error("NOT_FOUND", "User not found");
return;
}
// Terminal response (one-shot, correlation tracked)
ctx.reply(GetUser.response, { user });
});off(schema)
Unregister a handler for a specific message type.
off<Schema extends MessageSchemaType>(schema: Schema): this;Parameters:
schema- Message schema to unregister
Returns: Router instance for chaining
use(middleware) and use(schema, middleware)
Register global or per-route middleware.
// Global middleware
use(middleware: Middleware<TData>): this;
// Per-route middleware
use<Schema extends MessageSchemaType>(
schema: Schema,
middleware: Middleware<TData>
): this;Parameters:
middleware- Middleware function(ctx, next) => void | Promise<void>schema- Message schema for per-route middleware
Returns: Router instance for chaining
Examples:
// Global authentication middleware
router.use((ctx, next) => {
if (!ctx.ws.data?.userId && ctx.type !== "LOGIN") {
ctx.error("UNAUTHENTICATED", "Not authenticated");
return;
}
return next();
});
// Per-route rate limiting
router.use(SendMessage, (ctx, next) => {
if (isRateLimited(ctx.ws.data?.userId)) {
ctx.error("RESOURCE_EXHAUSTED", "Too many messages");
return;
}
return next();
});onOpen(handler), onClose(handler), onAuth(handler), onError(handler)
Register lifecycle hooks.
onOpen(handler: OpenHandler<TData>): this;
onClose(handler: CloseHandler<TData>): this;
onAuth(handler: AuthHandler<TData>): this;
onError(handler: ErrorHandler<TData>): this;Handler Signatures:
type OpenHandler<TData> = (
ctx: OpenHandlerContext<TData>,
) => void | Promise<void>;
type CloseHandler<TData> = (
ctx: CloseHandlerContext<TData>,
) => void | Promise<void>;
type AuthHandler<TData> = (
ctx: MessageContext<any, TData>,
) => boolean | Promise<boolean>;
type ErrorHandler<TData> = (
error: WsKitError,
ctx: MessageContext<any, TData>,
) => boolean | void;Examples:
router.onOpen((ctx) => {
console.log(`Client ${ctx.ws.data.clientId} connected`);
ctx.send(WelcomeMessage, { text: "Welcome!" });
});
router.onClose((ctx) => {
console.log(`Client ${ctx.ws.data.clientId} disconnected: ${ctx.code}`);
});
router.onAuth((ctx) => {
// Return false to reject connection
return ctx.ws.data?.userId !== undefined;
});
router.onError((error, ctx) => {
console.error("Error:", error.code, error.message, error.details);
// Return false to suppress automatic error response
return false; // Suppress auto-send
});merge(router)
Merge routes from another router.
merge(router: WebSocketRouter<V, TData>): this;Parameters:
router- Another router to merge (same validator and data type)
Returns: Router instance for chaining
Example:
const authRouter = createRouter();
authRouter.on(LoginMessage, handleLogin);
const chatRouter = createRouter();
chatRouter.on(MessageMessage, handleMessage);
const mainRouter = createRouter().merge(authRouter).merge(chatRouter);publish(channel, schema, payload, options?)
Publish a typed message to a channel (broadcasts to all subscribers).
publish(
channel: string,
schema: MessageSchemaType,
payload: unknown,
options?: PublishOptions
): Promise<PublishResult>;Parameters:
channel- Channel/topic name (e.g.,"room:123","user:456")schema- Message schema for validationpayload- Message payload (validated against schema)options- Publish options (optional)
Options:
interface PublishOptions {
excludeSelf?: boolean; // Exclude sender (default: false)
partitionKey?: string; // For sharding (future use)
meta?: Record<string, unknown>; // Custom metadata
}Returns: Promise<PublishResult> with subscriber match count and capability info
Example:
// Inside a handler
router.on(UserCreated, async (ctx) => {
const result = await ctx.publish(
`org:${ctx.payload.orgId}:users`,
UserListInvalidated,
{ orgId: ctx.payload.orgId },
);
if (result.ok && result.matched !== undefined) {
console.log(`Notified ${result.matched} subscribers`);
}
});
// Outside handlers (cron, queue, lifecycle)
const result = await router.publish("system:announcements", Announcement, {
text: "Server maintenance at 02:00 UTC",
});
if (result.ok) {
console.log("Announcement published");
}reset()
Clear all registered handlers, middleware, and state.
reset(): this;Returns: Router instance for chaining
Note: Preserves configuration (validator, platform, limits). Does not reset active connection heartbeat states.
Context API
Context objects are passed to message handlers and provide type-safe access to the WebSocket connection, payload, and various utility methods.
Event Message Context
Context for fire-and-forget messages (via router.on()).
interface EventMessageContext<TSchema, TData> {
// Connection and message info
ws: ServerWebSocket<TData>; // WebSocket connection
type: string; // Message type
payload?: unknown; // Validated payload (only if schema defines it)
meta: MessageMeta; // Message metadata
receivedAt: number; // Server receive timestamp (ms)
// Type information
isRpc: false; // Always false for events
timeRemaining(): number; // Returns Infinity for events
// Sending messages
send: SendFunction; // Send message to this connection
error(code: string, message: string, details?: Record<string, unknown>): void;
// Connection data management
assignData(partial: Partial<TData>): void;
// Pub/sub
subscribe(channel: string): void;
unsubscribe(channel: string): void;
publish(
channel: string,
schema: MessageSchemaType,
payload: unknown,
options?: PublishOptions,
): Promise<PublishResult>;
}Note: For messages without a payload (e.g., message("PING")), ctx.payload will be undefined.
RPC Message Context
Context for request-response messages (via router.rpc()).
interface RpcMessageContext<TSchema, TData> {
// Connection and message info
ws: ServerWebSocket<TData>; // WebSocket connection
type: string; // Message type
payload?: unknown; // Validated payload (only if schema defines it)
meta: MessageMeta; // Message metadata (includes correlationId)
receivedAt: number; // Server receive timestamp (ms)
// Type information
isRpc: true; // Always true for RPC
deadline: number; // Request deadline (ms since epoch)
timeRemaining(): number; // Milliseconds until deadline
// Sending messages
send: SendFunction; // Send side-effect messages
error(code: string, message: string, details?: Record<string, unknown>): void;
// RPC-specific methods
reply(
responseSchema: MessageSchemaType,
data: unknown,
options?: Record<string, unknown>,
): void;
progress(data?: unknown): void;
onCancel(cb: () => void): () => void;
abortSignal: AbortSignal;
// Connection data management
assignData(partial: Partial<TData>): void;
// Pub/sub
subscribe(channel: string): void;
unsubscribe(channel: string): void;
publish(
channel: string,
schema: MessageSchemaType,
payload: unknown,
options?: PublishOptions,
): Promise<PublishResult>;
}RPC Requirements:
- Must call
ctx.reply()orctx.error()for terminal response - Can call
ctx.progress()multiple times before terminal response correlationIdis auto-synthesized if missing (belt-and-suspenders approach)- One-shot guarantee: multiple
reply()calls are guarded (only first succeeds)
Context Methods
ctx.send(schema, payload, options?)
Send a validated message to the current client.
send(schema: MessageSchemaType, payload: unknown, options?: SendOptions): void;Parameters:
schema- Message schemapayload- Message payload (validated against schema)options- Optional metadata (timestamp auto-added, can include correlationId)
Auto-injected metadata:
timestamp- Producer timestamp (milliseconds since epoch)- Does NOT inject
clientIdorreceivedAt(outbound messages use client time)
Example:
router.on(PingMessage, (ctx) => {
ctx.send(PongMessage, { reply: "pong" });
});Note: For RPC handlers, use ctx.reply() for terminal responses instead of ctx.send() to ensure one-shot guarantee and proper correlation tracking.
ctx.error(code, message?, details?, options?)
Send a type-safe error response with optional retry semantics.
error(
code: string,
message?: string,
details?: Record<string, unknown>,
options?: {
retryable?: boolean;
retryAfterMs?: number;
}
): void;Parameters:
code- Standard error code (see Error Codes section)message- Optional human-readable error descriptiondetails- Optional error contextoptions- Optional retry semantics:retryable- Whether error is retryable (auto-inferred from code if omitted)retryAfterMs- Backoff interval hint for transient errors
Examples:
Non-retryable error with details:
router.on(JoinRoom, (ctx) => {
if (!roomExists(ctx.payload.roomId)) {
ctx.error("NOT_FOUND", "Room not found", { roomId: ctx.payload.roomId });
return;
}
});Transient error with backoff hint:
router.on(ProcessPayment, (ctx) => {
try {
processPayment(ctx.payload);
} catch (err) {
if (isRateLimited(err)) {
ctx.error("RESOURCE_EXHAUSTED", "Rate limited", undefined, {
retryable: true,
retryAfterMs: 5000,
});
} else {
ctx.error("INTERNAL", "Payment processing failed");
}
}
});ctx.reply(responseSchema, data, options?) (RPC only)
Send a terminal reply for an RPC request.
reply(responseSchema: MessageSchemaType, data: unknown, options?: Record<string, unknown>): void;Parameters:
responseSchema- Response message schemadata- Response data (validated)options- Optional metadata
Example:
router.rpc(GetUser, async (ctx) => {
const user = await db.users.findById(ctx.payload.userId);
ctx.reply(GetUser.response, { user });
});ctx.progress(data?) (RPC only)
Send a non-terminal progress update.
progress(data?: unknown): void;Parameters:
data- Optional progress data
Example:
router.rpc(LongQuery, async (ctx) => {
for (const batch of largeBatches) {
ctx.progress({ processed: batch.count, total: largeBatches.length });
await processBatch(batch);
}
ctx.reply(LongQuery.response, { result: finalResult });
});ctx.onCancel(callback) (RPC only)
Register a callback for when the RPC is cancelled.
onCancel(cb: () => void): () => void;Parameters:
cb- Callback invoked on cancellation (client abort or disconnect)
Returns: Unregister function
Example:
router.rpc(LongOperation, async (ctx) => {
const timer = setTimeout(() => doWork(), 1000);
ctx.onCancel(() => {
clearTimeout(timer);
console.log("Operation cancelled");
});
const result = await doWork();
ctx.reply(LongOperation.response, { result });
});ctx.abortSignal (RPC only)
Standard AbortSignal that fires when the RPC is cancelled.
abortSignal: AbortSignal;Example:
router.rpc(FetchData, async (ctx) => {
const response = await fetch(url, { signal: ctx.abortSignal });
const data = await response.json();
ctx.reply(FetchData.response, { data });
});ctx.subscribe(channel) and ctx.unsubscribe(channel)
Subscribe/unsubscribe to pub/sub channels.
subscribe(channel: string): void;
unsubscribe(channel: string): void;Example:
router.on(JoinRoom, (ctx) => {
ctx.subscribe(`room:${ctx.payload.roomId}`);
ctx.ws.data.currentRoom = ctx.payload.roomId;
});
router.on(LeaveRoom, (ctx) => {
ctx.unsubscribe(`room:${ctx.ws.data.currentRoom}`);
});ctx.publish(channel, schema, payload, options?)
Publish a message to a channel (convenience method, delegates to router.publish()).
publish(channel: string, schema: MessageSchemaType, payload: unknown, options?: PublishOptions): Promise<PublishResult>;Example:
router.on(SendMessage, async (ctx) => {
const result = await ctx.publish(`room:${ctx.payload.roomId}`, ChatMessage, {
text: ctx.payload.text,
roomId: ctx.payload.roomId,
});
if (result.ok) {
console.log(`Published to ${result.matched ?? "?"} subscribers`);
}
});ctx.assignData(partial)
Merge partial data into connection data.
assignData(partial: Partial<TData>): void;Example:
router.on(LoginMessage, (ctx) => {
const user = authenticate(ctx.payload);
ctx.assignData({ userId: user.id, roles: user.roles });
});Client API
wsClient()
Create a type-safe WebSocket client.
Signature:
function wsClient<TRouter = unknown>(options: ClientOptions): WebSocketClient;Options:
interface ClientOptions {
url: string | URL; // WebSocket URL
protocols?: string | string[]; // WebSocket subprotocols
// Reconnection
reconnect?: {
enabled?: boolean; // Default: true
maxAttempts?: number; // Default: Infinity
initialDelayMs?: number; // Default: 300
maxDelayMs?: number; // Default: 10000
jitter?: "full" | "none"; // Default: "full"
};
// Queue configuration
queue?: "drop-oldest" | "drop-newest" | "off"; // Default: "drop-newest"
queueSize?: number; // Default: 1000
// Behavior
autoConnect?: boolean; // Default: false
pendingRequestsLimit?: number; // Default: 1000
// Authentication
auth?: {
attach?: "query" | "protocol"; // Default: "query"
queryParam?: string; // Default: "access_token"
protocolPrefix?: string; // Default: "bearer."
protocolPosition?: "prepend" | "append"; // Default: "append"
getToken?: () => string | Promise<string> | null | Promise<null>;
};
// Advanced
wsFactory?: (url: string | URL, protocols?: string | string[]) => WebSocket;
}Returns: WebSocketClient instance
Example:
import { wsClient } from "@ws-kit/client/zod";
const client = wsClient({
url: "ws://localhost:3000",
reconnect: {
enabled: true,
maxAttempts: 10,
},
auth: {
attach: "query",
getToken: () => localStorage.getItem("token"),
},
});Client Methods
client.connect()
Manually connect to the server.
connect(): Promise<void>;Returns: Promise that resolves when connected
client.close(options?)
Close the connection.
close(options?: { code?: number; reason?: string }): Promise<void>;Parameters:
options.code- WebSocket close code (default: 1000)options.reason- Close reason string
Returns: Promise that resolves when closed
client.on(schema, handler)
Register a message handler.
on<S extends MessageSchema>(schema: S, handler: (payload: InferPayload<S>) => void): () => void;Parameters:
schema- Message schemahandler- Message handler function
Returns: Unregister function
Example:
const RoomUpdated = message("ROOM_UPDATED", {
roomId: z.string(),
users: z.number(),
});
client.on(RoomUpdated, (payload) => {
console.log(`Room ${payload.roomId} has ${payload.users} users`);
});client.send(schema, payload, options?)
Send a fire-and-forget message.
send<S extends MessageSchema>(
schema: S,
payload: InferPayload<S>,
options?: { meta?: Record<string, unknown>; correlationId?: string }
): boolean;Parameters:
schema- Message schemapayload- Message payloadoptions- Optional metadata
Returns: true if sent immediately, false if queued
Example:
const success = client.send(ChatMessage, {
text: "Hello!",
roomId: "general",
});client.request(schema, payload, options?)
Send a request and wait for response (RPC).
// RPC-style (auto-detected response schema)
request<S extends RpcSchema>(
schema: S,
payload: InferPayload<S>,
options?: RequestOptions
): Promise<InferResponse<S>>;
// Traditional style (explicit response schema)
request<S extends MessageSchema, R extends MessageSchema>(
schema: S,
payload: InferPayload<S>,
reply: R,
options?: RequestOptions
): Promise<InferMessage<R>>;Options:
interface RequestOptions {
timeoutMs?: number; // Default: 30000
meta?: Record<string, unknown>; // Custom metadata (correlationId auto-generated if not provided)
correlationId?: string; // Explicit correlation ID (auto-generated if not provided)
signal?: AbortSignal; // Cancellation signal
}Returns: Promise resolving to the full response message object {type, meta, payload} with the response type and validated payload
Behavior:
- Auto-generates
correlationIdif not provided - Strips reserved meta keys (
clientId,receivedAt) from user meta - Auto-connects if
autoConnect: trueand never attempted - Queues request if disconnected (unless
queue: "off") - Rejects with
ValidationErrorif outbound validation fails - Rejects with timeout error if no response within
timeoutMs
Examples:
// RPC-style with auto-detected response
const Ping = rpc("PING", { text: z.string() }, "PONG", { reply: z.string() });
const response = await client.request(Ping, { text: "hello" });
console.log(response.payload.reply); // Fully typed - access via payload
// Traditional style with explicit response schema
const response = await client.request(
PingMessage,
{ text: "hello" },
PongMessage,
{ timeoutMs: 5000 },
);
// With AbortSignal
const controller = new AbortController();
const promise = client.request(
Ping,
{ text: "hello" },
{
signal: controller.signal,
},
);
setTimeout(() => controller.abort(), 1000); // Cancel after 1sclient.onState(callback)
Register a state change listener.
onState(callback: (state: ClientState) => void): () => void;States:
"closed"- Connection is closed (initial state)"connecting"- Connection attempt in progress"open"- Connected and ready"closing"- Close initiated, waiting for close event"reconnecting"- Reconnect attempt scheduled or in progress
Returns: Unregister function
client.onceOpen()
Wait for the next "open" state.
onceOpen(): Promise<void>;Returns: Promise that resolves when connected
client.onUnhandled(callback)
Register a handler for unhandled messages.
onUnhandled(callback: (msg: AnyInboundMessage) => void): () => void;Returns: Unregister function
client.onError(callback)
Register an error handler.
onError(callback: (error: Error, context: ErrorContext) => void): () => void;Error Context:
interface ErrorContext {
type: "parse" | "validation" | "overflow" | "unknown";
details?: unknown;
}Returns: Unregister function
Client Properties
client.state: ClientState; // Current connection state
client.isConnected: boolean; // True if state === "open"
client.protocol: string; // Selected WebSocket subprotocolPlatform Adapters
Bun Adapter
serve(router, options)
High-level server function for Bun.
function serve<TData extends { clientId: string }>(
router: WebSocketRouter<any, TData>,
options?: ServeOptions<TData>,
): Promise<void>;Options:
interface ServeOptions<TData> {
port?: number; // Default: 3000
authenticate?: (
req: Request,
) => TData | Promise<TData> | undefined | Promise<undefined>;
onError?: (error: Error, ctx: MessageContext) => void;
onBroadcast?: (channel: string, message: unknown) => void;
onUpgrade?: (req: Request, ws: ServerWebSocket<TData>) => void;
onOpen?: (ws: ServerWebSocket<TData>) => void;
onClose?: (ws: ServerWebSocket<TData>, code: number, reason: string) => void;
context?: Record<string, unknown>;
clientIdHeader?: string;
}Example:
import { serve } from "@ws-kit/bun";
import { createRouter } from "@ws-kit/zod";
const router = createRouter();
serve(router, {
port: 3000,
authenticate(req) {
const token = req.headers.get("authorization")?.replace("Bearer ", "");
return token ? { userId: "123", roles: ["admin"] } : undefined;
},
});createBunHandler(router, options)
Low-level handler creation for custom setups.
function createBunHandler<TData extends { clientId: string }>(
router: WebSocketRouter<any, TData>,
options?: BunHandlerOptions<TData>,
): { fetch: FetchHandler; websocket: WebSocketHandler<TData> };Returns: Object with fetch and websocket handlers for Bun.serve()
Example:
import { createBunHandler } from "@ws-kit/bun";
const { fetch, websocket } = createBunHandler(router, {
authenticate(req) {
// Custom auth logic
},
});
Bun.serve({
port: 3000,
fetch,
websocket,
});Error Handling
Standard Error Codes
13 standard error codes aligned with gRPC conventions (from ErrorCode enum):
Terminal errors (don't retry):
UNAUTHENTICATED- Not authenticated / auth token missing, expired, or invalidPERMISSION_DENIED- Permission denied / authenticated but lacks rights (authZ)INVALID_ARGUMENT- Invalid argument / input validation or semantic violationFAILED_PRECONDITION- Failed precondition / state requirement not metNOT_FOUND- Not found / target resource absentALREADY_EXISTS- Already exists / uniqueness or idempotency replay violationABORTED- Aborted / concurrency conflict (race condition)
Transient errors (retry with backoff):
DEADLINE_EXCEEDED- Deadline exceeded / RPC timed outRESOURCE_EXHAUSTED- Resource exhausted / rate limit, quota, or buffer overflowUNAVAILABLE- Unavailable / transient infrastructure error
Server/evolution:
UNIMPLEMENTED- Unimplemented / feature not supported or deployedINTERNAL- Internal / unexpected server error (bug)CANCELLED- Cancelled / call cancelled (client disconnect, timeout abort)
Error Response Methods
Send a type-safe error response using ctx.error() with automatic retry inference:
error(
code: ErrorCode,
message?: string,
details?: Record<string, unknown>,
options?: { retryable?: boolean; retryAfterMs?: number }
): void;Clients automatically infer whether errors are retryable based on the error code. Use retryAfterMs to provide backoff hints for transient errors.
Non-retryable error:
router.on(JoinRoom, (ctx) => {
if (!roomExists(ctx.payload.roomId)) {
ctx.error("NOT_FOUND", "Room not found", { roomId: ctx.payload.roomId });
return;
}
});Transient error with backoff:
router.on(QueryData, (ctx) => {
if (isSaturated()) {
ctx.error("RESOURCE_EXHAUSTED", "Server busy", undefined, {
retryable: true,
retryAfterMs: 1000,
});
}
});WsKitError
Standardized error object for structured error handling. Follows WHATWG Error standard with cause for error chaining.
class WsKitError extends Error {
code: string;
message: string;
details: Record<string, unknown>;
cause?: unknown; // WHATWG standard: original error
// Convenience accessor
get originalError(): Error | undefined;
static from(
code: string,
message: string,
details?: Record<string, unknown>,
): WsKitError;
static wrap(
error: unknown,
code: string,
message?: string,
details?: Record<string, unknown>,
): WsKitError;
static isWsKitError(value: unknown): value is WsKitError;
toJSON(): {
code: string;
message: string;
details: Record<string, unknown>;
stack: string | undefined;
cause?:
| { name: string; message: string; stack: string | undefined }
| string;
};
toPayload(): ErrorPayload;
}Static Methods:
from()- Create a new WsKitErrorwrap()- Wrap an existing error (preserves as cause). If already a WsKitError, returns as-isisWsKitError()- Type guard to check if a value is a WsKitError
Instance Methods:
toJSON()- Serialize to plain object for structured logging (includes cause and stack)toPayload()- Create error payload for client transmission (excludes cause and stack)
Example:
router.on(CreateUser, async (ctx) => {
try {
const user = await db.users.create(ctx.payload);
ctx.send(UserCreated, user);
} catch (err) {
throw WsKitError.wrap(err, "INTERNAL", "Failed to create user", {
email: ctx.payload.email,
});
}
});Error Handler Example:
router.onError((error, ctx) => {
// Error is always a WsKitError (wrapped if needed)
logger.error({
code: error.code,
message: error.message,
details: error.details,
cause: error.originalError, // Access wrapped error
clientId: ctx.ws.data?.clientId,
});
});Type Utilities
Infer Types from Schemas
import type { InferPayload, InferMessage } from "@ws-kit/zod";
// Infer payload type
type ChatPayload = InferPayload<typeof ChatMessage>;
// Infer full message type
type ChatMsg = InferMessage<typeof ChatMessage>;Connection Data Type (Declaration Merging)
Declare default connection data type once using TypeScript declaration merging:
// types/app-data.d.ts
declare module "@ws-kit/core" {
interface AppDataDefault {
userId?: string;
roles?: string[];
tenant?: string;
}
}
// Now in any module (no generic needed):
const router = createRouter(); // Automatically uses AppDataDefault
router.on(SecureMessage, (ctx) => {
// ctx.ws.data is properly typed with userId, roles, tenant
const userId = ctx.ws.data?.userId;
});Message Metadata
All messages include a meta object with standard fields:
interface MessageMeta {
// Server-controlled (injected after validation)
clientId: string; // Client identifier (UUID v7)
receivedAt: number; // Server ingress timestamp (ms)
// Client-provided (validated, not trusted for server logic)
timestamp?: number; // Client timestamp (for UI display)
correlationId?: string; // Request/response correlation
timeoutMs?: number; // RPC timeout
// Custom metadata (via schema)
[key: string]: unknown;
}Which timestamp to use:
- Server logic (rate limiting, ordering, TTL): Use
ctx.receivedAt(authoritative, server time) - UI display (relative timestamps): Use
ctx.meta.timestamp(client time, for display only)
Reserved meta keys: clientId and receivedAt are server-controlled and injected after validation. Never trust client-provided values for these fields.
WebSocket Connection Data
Every connection has a data object with custom application data:
interface WebSocketData<T = unknown> {
clientId: string; // Auto-generated UUID v7 (always present)
} & TAccess via ctx.ws.data:
router.on(SecureMessage, (ctx) => {
const userId = ctx.ws.data?.userId;
const clientId = ctx.ws.data.clientId; // Always present
});Modify via ctx.assignData():
router.on(LoginMessage, (ctx) => {
ctx.assignData({ userId: "123", roles: ["admin"] });
});Reserved Message Type Prefix
Message types starting with $ws: are reserved for internal control messages and cannot be registered with router.on() or router.rpc().
Reserved types:
$ws:rpc-progress- RPC progress updates (sent byctx.progress())$ws:abort- RPC abort signal (sent by client to cancel RPC)
Enforcement:
- Design-time:
message()andrpc()throw if message type uses reserved prefix - Runtime: Messages with reserved prefix are handled internally, never dispatched to user handlers
Example (will throw):
// ❌ Throws: Reserved prefix "$ws:" not allowed
const BadMessage = message("$ws:custom", { data: z.string() });
const BadRpc = rpc("$ws:query", payload, "$ws:response", response);Best Practices
Type Safety
// ✅ Use message() for full type inference
const ChatMessage = message("CHAT_MESSAGE", {
text: z.string(),
roomId: z.string(),
});
router.on(ChatMessage, (ctx) => {
// ctx.payload.text is typed as string
console.log(ctx.payload.text);
});
// ✅ Use rpc() for request-response patterns
const GetUser = rpc("GET_USER", { userId: z.string() }, "USER_RESPONSE", {
user: UserSchema,
});
router.rpc(GetUser, async (ctx) => {
const user = await db.users.findById(ctx.payload.userId);
ctx.reply(GetUser.response, { user }); // Fully typed
});Error Handling Patterns
// ✅ Use standard error codes
ctx.error("NOT_FOUND", "User not found", { userId });
// ✅ Wrap exceptions with WsKitError
try {
await db.users.create(data);
} catch (err) {
throw WsKitError.wrap(err, "INTERNAL", "Database error");
}
// ✅ Register error handlers for observability
router.onError((error, ctx) => {
logger.error({
code: error.code,
message: error.message,
details: error.details,
clientId: ctx.ws.data?.clientId,
});
});Middleware Patterns
// ✅ Global authentication middleware
router.use((ctx, next) => {
if (!ctx.ws.data?.userId && ctx.type !== "LOGIN") {
ctx.error("UNAUTHENTICATED", "Not authenticated");
return;
}
return next();
});
// ✅ Per-route rate limiting
const rateLimiter = new Map<string, number>();
router.use(SendMessage, (ctx, next) => {
const userId = ctx.ws.data?.userId || "anon";
const count = (rateLimiter.get(userId) || 0) + 1;
if (count > 10) {
ctx.error("RESOURCE_EXHAUSTED", "Too many messages");
return;
}
rateLimiter.set(userId, count);
return next();
});Broadcasting
// ✅ Use ctx.publish() for ergonomics
router.on(SendMessage, (ctx) => {
ctx.publish(`room:${ctx.payload.roomId}`, ChatMessage, ctx.payload);
});
// ✅ Use router.publish() outside handlers
await router.publish("system:announcements", Announcement, {
text: "Server maintenance at 02:00 UTC",
});Client Patterns
// ✅ Use RPC for request-response
const response = await client.request(GetUser, { userId: "123" });
// ✅ Use send() for fire-and-forget
client.send(ChatMessage, { text: "Hello!", roomId: "general" });
// ✅ Handle connection states
client.onState((state) => {
if (state === "open") {
console.log("Connected!");
} else if (state === "reconnecting") {
console.log("Reconnecting...");
}
});
// ✅ Use AbortSignal for cancellation
const controller = new AbortController();
const promise = client.request(
LongQuery,
{ query: "..." },
{
signal: controller.signal,
},
);
setTimeout(() => controller.abort(), 5000);