Request-Response Pattern (RPC) Guide
RPC (Remote Procedure Call) enables type-safe request-response communication with automatic correlation tracking. Unlike fire-and-forget messages, RPC calls wait for a response and handle timeouts, errors, and cancellation.
Overview
RPC in ws-kit differs from typical messaging:
- Fire-and-forget (
on(),send()): Unidirectional, no reply expected - Request-response (
rpc(),request()): Bidirectional, client waits for response
// Fire-and-forget: no response
router.on(UserLoggedIn, (ctx) => {
ctx.publish("notifications", NotifyMessage, { userId: ctx.payload.userId });
});
// Request-response: client waits for response
router.rpc(GetUser, (ctx) => {
ctx.reply(GetUser.response, { user: await db.findById(ctx.payload.id) });
});RPC Lifecycle:
sequenceDiagram
participant C as Client
participant S as Server
C->>S: request (with correlationId)
Note over S: Validation & handler
S->>C: progress (non-terminal)
S->>C: progress (non-terminal)
S->>C: reply (terminal)
Note over C: Matched correlationId<br/>Request resolvedServer-Side: Defining RPC Handlers
Basic RPC Handler
Use router.rpc() to register a handler that returns a response:
import { z, rpc, createRouter } from "@ws-kit/zod";
const GetUser = rpc("GET_USER", { id: z.string() }, "USER_RESPONSE", {
user: UserSchema,
});
const router = createRouter();
router.rpc(GetUser, async (ctx) => {
const user = await db.users.findById(ctx.payload.id);
if (!user) {
ctx.error("NOT_FOUND", "User not found");
return;
}
// Terminal response - resolves the client's request()
ctx.reply(GetUser.response, { user });
});The difference from on():
ctx.reply()is available (terminal response, one-shot guarded)ctx.progress()is available (non-terminal streaming updates)ctx.abortSignalis available (for client cancellation)ctx.deadlineis available (server-derived deadline from client timeout)
Progress Updates
Before sending a terminal reply, send progress updates to keep the client informed. Progress updates use the same schema shape as the response:
const LongOperation = rpc("LONG_OP", { query: z.string() }, "LONG_OP_RESULT", {
results: z.array(z.unknown()),
duration: z.number(),
});
router.rpc(LongOperation, async (ctx) => {
const start = Date.now();
const results = [];
for (let i = 0; i < 1000; i++) {
results.push(await processItem(i));
if (i % 100 === 0) {
// Progress uses same schema shape as response
ctx.progress({
results: results.slice(),
duration: Date.now() - start,
});
}
}
// Terminal reply: ends the request
ctx.reply({ results, duration: Date.now() - start });
});Error Handling in RPC
Use ctx.error() to reject requests with standard error codes. Clients automatically infer retry behavior and respect backoff hints:
router.rpc(GetUser, async (ctx) => {
// Validation error (non-retryable)
if (!isValidUserId(ctx.payload.id)) {
ctx.error("INVALID_ARGUMENT", "Invalid user ID format");
return;
}
// Not found (non-retryable)
const user = await db.users.findById(ctx.payload.id);
if (!user) {
ctx.error("NOT_FOUND", "User not found");
return;
}
// Precondition error (non-retryable)
if (!user.isActive) {
ctx.error("FAILED_PRECONDITION", "User account is inactive");
return;
}
ctx.reply(GetUser.response, { user });
});Transient errors with backoff hints:
router.rpc(ProcessData, async (ctx) => {
try {
const result = await expensiveComputation(ctx.payload);
ctx.reply(ProcessData.response, { result });
} catch (err) {
if (isRateLimited(err)) {
// Transient error: client will auto-retry with backoff
ctx.error("RESOURCE_EXHAUSTED", "Server busy", undefined, {
retryable: true,
retryAfterMs: 2000, // Client waits 2s before retry
});
} else if (isTemporary(err)) {
// Transient infrastructure error (auto-retryable)
ctx.error("UNAVAILABLE", "Database temporarily unavailable");
} else {
// Unexpected error (don't retry)
ctx.error("INTERNAL", "Computation failed");
}
}
});Key error codes:
Terminal (non-retryable):
INVALID_ARGUMENT: Input validation failedUNAUTHENTICATED: Authentication missing or invalidPERMISSION_DENIED: Authenticated but lacks permissionsNOT_FOUND: Resource doesn't existFAILED_PRECONDITION: State requirement not metUNIMPLEMENTED: Feature not supported
Transient (automatically retryable):
RESOURCE_EXHAUSTED: Rate limit or quota exceeded (useretryAfterMs)DEADLINE_EXCEEDED: Request timed outUNAVAILABLE: Service temporarily unavailableABORTED: Concurrency conflict
Mixed:
INTERNAL: Unexpected server error (server decides retryability)CANCELLED: Client cancelled or disconnected
See Error Handling Spec for the complete taxonomy and retry semantics.
Cancellation and Deadlines
Handle client cancellation using ctx.abortSignal:
router.rpc(LongQuery, async (ctx) => {
const controller = new AbortController();
try {
// Pass abort signal to fetch
const result = await fetch(`/api/query`, {
signal: ctx.abortSignal || controller.signal,
}).then((r) => r.json());
ctx.reply(LongQuery.response, result);
} catch (err) {
if (err.name === "AbortError") {
// Client cancelled or connection closed
console.log("Request cancelled");
return;
}
ctx.error("INTERNAL", "Query failed");
}
});Use ctx.timeRemaining() to check deadline:
router.rpc(GetUser, async (ctx) => {
// Check time budget before starting expensive work
if (ctx.timeRemaining() < 1000) {
ctx.error("DEADLINE_EXCEEDED", "Not enough time to process");
return;
}
const user = await db.users.findById(ctx.payload.id);
ctx.reply(GetUser.response, { user });
});Use ctx.onCancel() to register cleanup callbacks:
router.rpc(UploadFile, async (ctx) => {
const file = await openUploadStream();
// Clean up if cancelled
ctx.onCancel?.(() => {
file.close();
console.log("Upload cancelled");
});
await file.write(ctx.payload.data);
ctx.reply(UploadFile.response, { success: true });
});Client-Side: Making RPC Requests
Basic Request
Use client.request() to make RPC calls:
import { z, rpc } from "@ws-kit/zod";
import { wsClient } from "@ws-kit/client/zod";
const GetUser = rpc("GET_USER", { id: z.string() }, "USER_RESPONSE", {
user: UserSchema,
});
const client = wsClient({ url: "ws://localhost:3000" });
try {
const response = await client.request(GetUser, { id: "123" });
console.log("User:", response.payload.user);
} catch (err) {
if (err instanceof TimeoutError) {
console.warn("Request timed out");
} else if (err instanceof RpcError) {
console.error("Server error:", err.code, err.details);
}
}Server-Side Progress (Stream Pattern)
The server can send progress updates before the terminal reply. Progress updates are correlated with the original request and use the same schema shape as the response:
// Client receives progress updates via onProgress callback
try {
const result = await client.request(
LongOperation,
{ query: "search term" },
{
onProgress: (update) => {
// Progress payload matches response schema shape
console.log(`Processed ${update.payload.results.length} items`);
},
},
);
console.log("Results:", result.payload.results);
} catch (err) {
console.error("Failed:", err);
}Note: Progress updates share the same correlation ID as the request, so they are automatically matched to the correct onProgress callback.
Timeouts and Cancellation
Set a custom timeout and cancel requests:
const controller = new AbortController();
// Cancel after 5 seconds
const timeoutId = setTimeout(() => controller.abort(), 5000);
try {
const response = await client.request(
GetUser,
{ id: "123" },
{
timeoutMs: 30000, // Server timeout
signal: controller.signal, // Client-side abort
},
);
console.log("User:", response.payload.user);
} catch (err) {
if (err instanceof TimeoutError) {
console.warn("Request timed out");
}
} finally {
clearTimeout(timeoutId);
}Error Handling
Handle different error types:
import {
TimeoutError,
RpcError,
ValidationError,
WsDisconnectedError,
} from "@ws-kit/client";
try {
const response = await client.request(GetUser, { id: "123" });
} catch (err) {
if (err instanceof TimeoutError) {
console.warn(`Timeout after ${err.timeoutMs}ms`);
} else if (err instanceof RpcError) {
console.error(`RPC error: ${err.code}`);
// Some errors are retryable with backoff
if (err.retryable && err.retryAfterMs) {
console.log(`Retry after ${err.retryAfterMs}ms`);
}
} else if (err instanceof ValidationError) {
console.error("Invalid response from server");
} else if (err instanceof WsDisconnectedError) {
console.warn("Connection lost during request");
}
}Common Patterns
Request-Reply with Data Validation
Validate responses on the client:
const response = await client.request(GetUser, { id: "123" });
// Response is already validated against UserSchema
// TypeScript knows the structure
if (response.payload.user.email.includes("@")) {
console.log("Valid email");
}Idempotent Operations
For idempotent operations, include a correlation ID:
const correlationId = crypto.randomUUID();
const response = await client.request(
CreateUser,
{ name: "Alice", email: "alice@example.com" },
{
correlationId, // Server can use this for deduplication
},
);Chained Requests
Chain multiple RPC calls:
try {
// First request
const userResponse = await client.request(GetUser, { id: "123" });
const userId = userResponse.payload.user.id;
// Second request (depends on first)
const ordersResponse = await client.request(GetUserOrders, { userId });
console.log("Orders:", ordersResponse.payload.orders);
} catch (err) {
console.error("Chain failed:", err);
}Concurrent Requests
Run multiple RPC calls in parallel:
const [userResponse, ordersResponse, statsResponse] = await Promise.all([
client.request(GetUser, { id: "123" }),
client.request(GetUserOrders, { userId: "123" }),
client.request(GetUserStats, { userId: "123" }),
]);
console.log("User:", userResponse.payload.user);
console.log("Orders:", ordersResponse.payload.orders);
console.log("Stats:", statsResponse.payload.stats);Streaming Large Results
For large responses, use progress updates to stream data in batches:
// Define progress message for batch data
const DataBatch = message("DATA_BATCH", {
items: z.array(z.unknown()),
offset: z.number(),
total: z.number(),
});
// Server sends progress updates via ctx.progress(), which client receives as DATA_BATCH messages
router.rpc(StreamLargeData, async (ctx) => {
const batchSize = 100;
const total = 10000;
for (let i = 0; i < total; i += batchSize) {
const batch = await db.fetchBatch(i, batchSize);
ctx.progress?.({
items: batch,
offset: i,
total,
});
}
ctx.reply(StreamLargeData.response, { complete: true });
});
// Client listens for batch updates
client.on(DataBatch, (msg) => {
console.log(`Received ${msg.payload.items.length} items`);
processItems(msg.payload.items);
});
// Then makes the RPC request and waits for completion
const result = await client.request(StreamLargeData, {});
console.log("Transfer complete");Comparison: RPC vs Fire-and-Forget
| Aspect | RPC (router.rpc()) | Fire-and-Forget (router.on()) |
|---|---|---|
| Response | Required (via ctx.reply()) | None (via ctx.send() / ctx.publish()) |
| Client waiting | Yes, promises resolve | No, fire-and-forget |
| Timeout | Yes (server-derived) | N/A |
| Correlation | Automatic | Manual (via metadata) |
| One-shot reply | Guaranteed | N/A |
| Progress updates | Yes (ctx.progress()) | N/A |
| Use case | Query, command with response | Event, notification, broadcast |
Choose RPC when:
- Client needs a response
- You want automatic correlation
- Operation might timeout
- Progress updates are valuable
Choose fire-and-forget when:
- No response needed
- Publishing to multiple clients
- Pure event notification
Architecture Decisions
For design rationale, alternatives considered, and implementation details, see:
- ADR-012: RPC Minimal, Reliable
- ADR-013: RPC Reconnect & Idempotency
- ADR-014: RPC DX & Safety Improvements
- ADR-015: Unified RPC API Design
Troubleshooting
See the RPC Troubleshooting Guide for solutions to common issues.