Skip to content

Plugins

Complete reference for the plugin system: how plugins work, core plugins shipped with WS-Kit, and how to implement custom plugins.

Quick Reference: Plugins add capabilities to the router via .plugin() method. Each plugin can:

  • Define TypeScript types (adding methods to context)
  • Register middleware
  • Initialize adapter backends
  • Gate features at compile-time and runtime

See ADR-031 for design rationale and ADR-032 for canonical import sources.

Plugin Locations & Canonical Imports

Core Framework Plugins (import from @ws-kit/plugins):

  • withMessaging() — Fire-and-forget unicast/broadcast
  • withRpc() — Request-response with streaming

Feature Plugins (import from feature packages):

  • withPubSub() — Topic-based pub/sub (canonical: @ws-kit/pubsub)
  • rateLimit() — Rate limiting middleware (canonical: @ws-kit/rate-limit)
  • Future: withTelemetry(), withCompression(), withCaching() (each has own package)

Validator Plugins (choose one validator, import from its package):

  • withZod() / withValibot() — All-in-one validation plugin

Convenience Re-exports (optional, from validators):

Core plugins and feature plugins are re-exported from @ws-kit/zod and @ws-kit/valibot for convenience. See ADR-032: Canonical Imports Design for complete rules and rationale.


Overview

What is a Plugin?

A plugin is a function that:

  1. Takes a router instance and optional configuration
  2. Returns a modified router (or void if modifying in-place)
  3. Enhances context with new methods, middleware, or capabilities
  4. Types the router at compile-time to gate method availability

Plugin Composition

Plugins compose sequentially. Each plugin can depend on earlier plugins.

typescript
const router = createRouter()
  .plugin(withValidation()) // Step 1: Enables type inference for payload
  .plugin(withMessaging()) // Step 2: Adds ctx.send()
  .plugin(withRpc()) // Step 3: Adds ctx.reply(), ctx.progress()
  .plugin(withPubSub()); // Step 4: Adds ctx.publish()

Dependency Model:

  • withValidation() has no dependencies
  • withMessaging() depends on withValidation() (type-level only)
  • withRpc() depends on withValidation() and withMessaging()
  • withPubSub() depends on withMessaging()
  • rateLimit() middleware has no dependencies (can apply anywhere)

Capability Gating (No Runtime Overwrites)

  • Router methods are never overwritten at runtime. Core owns on() and rpc(); plugins add capability markers (__caps) to expose gated APIs via conditional types.
  • Validation plugins (withZod/withValibot) set { validation: true } on __caps. They do not reassign router.on/router.rpc; type exposure is static through the Router type’s capability check.
  • Core plugins (withMessaging/withRpc) contribute context methods through enhancers and stash data in non-enumerable fields (__wskit), avoiding public surface pollution.
  • If adding a new plugin API, pick a unique namespace on ctx.extensions and add only capability markers on the router. Do not patch existing router methods.

Plugin Registration Styles

Style 1: Validator Plugins (Recommended for Most Apps)

typescript
import { createRouter, withZod } from "@ws-kit/zod"; // or @ws-kit/valibot
import { withPubSub } from "@ws-kit/pubsub";
import { redisPubSub } from "@ws-kit/redis";

const router = createRouter()
  .plugin(withZod()) // ✅ Includes validation + messaging + RPC
  .plugin(
    withPubSub({
      adapter: redisPubSub(redis),
    }),
  );

Style 2: Granular Plugins (For Advanced Composition)

typescript
import { createRouter } from "@ws-kit/core";
import { withZod } from "@ws-kit/zod"; // or withValibot from @ws-kit/valibot
import { withMessaging, withRpc } from "@ws-kit/plugins";
import { withPubSub } from "@ws-kit/pubsub";

const router = createRouter()
  .plugin(withZod())
  .plugin(withMessaging())
  .plugin(withRpc())
  .plugin(withPubSub());

Core Plugins

Validation Plugins

Location: @ws-kit/zod and @ws-kit/valibot (validator adapters)

Purpose: Enable type-safe schema validation and payload type inference in handlers.

Validation is provided by validator adapter plugins rather than a standalone plugin. Choose your validator:

Example (Zod):

typescript
import { createRouter, withZod } from "@ws-kit/zod";
import { message } from "@ws-kit/zod";

const router = createRouter().plugin(withZod());

router.on(message("PING", { text: z.string() }), (ctx) => {
  ctx.payload.text; // ✅ Inferred as string
});

Example (Valibot):

typescript
import { createRouter, withValibot } from "@ws-kit/valibot";
import { message } from "@ws-kit/valibot";

const router = createRouter().plugin(withValibot());

router.on(message("PING", { text: v.string() }), (ctx) => {
  ctx.payload.text; // ✅ Inferred as string
});

Type Effects:

typescript
interface ContextWithValidation {
  payload: TPayload; // Type inferred from schema
  error(code: string, message: string, details?: unknown): void | Promise<void>;
  // Plus messaging and RPC methods (composed from withMessaging, withRpc)
}

withMessaging()

Location: @ws-kit/plugins/src/messaging

Purpose: Enable fire-and-forget messaging to individual connections.

Configuration: None

Effects:

  • Adds ctx.send(schema, data, opts?) method
  • Allows sending typed messages to current connection
  • Works with or without validation plugin

Example:

typescript
router.on(message("PING", { text: z.string() }), (ctx) => {
  ctx.send(message("PONG"), { reply: `Got: ${ctx.payload.text}` });
});

Type Effects:

typescript
interface ContextWithMessaging {
  send<TPayload>(
    schema: MessageSchema<TPayload>,
    data: TPayload,
    opts?: SendOptions,
  ): void | Promise<void>;
}

SendOptions:

typescript
interface SendOptions {
  waitFor?: "drain" | "ack"; // Wait for send completion
  signal?: AbortSignal; // Cancel if aborted
  meta?: Record<string, unknown>; // Additional metadata
}

withRpc()

Location: @ws-kit/plugins/src/rpc

Purpose: Enable request-response patterns with streaming progress updates.

Configuration: None

Effects:

  • Adds .rpc(schema, handler) method to router
  • Adds ctx.reply() and ctx.progress() to handlers
  • Requires withValidation() plugin (enforced at type-level)
  • Auto-correlates responses via correlationId

Example:

typescript
const FetchDataRequest = message("FETCH_DATA", { id: z.string() });
const FetchDataResponse = message("FETCH_DATA_RESPONSE", { data: z.unknown() });

router.rpc(FetchDataRequest, FetchDataResponse, (ctx) => {
  ctx.reply({ data: fetchedData });
});

// Or with streaming progress
router.rpc(FetchDataRequest, FetchDataResponse, (ctx) => {
  ctx.progress({ data: partial1 });
  ctx.progress({ data: partial2 });
  ctx.reply({ data: complete });
});

Type Effects:

typescript
interface ContextWithRpc {
  reply<TResponse>(
    payload: TResponse,
    opts?: ReplyOptions,
  ): void | Promise<void>;

  progress<TResponse>(
    payload: TResponse,
    opts?: ProgressOptions,
  ): void | Promise<void>;

  error<TDetails = unknown>(
    code: string,
    message: string,
    details?: TDetails,
    opts?: ReplyOptions,
  ): void | Promise<void>;
}

interface ReplyOptions {
  waitFor?: "drain" | "ack";
  signal?: AbortSignal;
  meta?: Record<string, unknown>;
}

interface ProgressOptions extends ReplyOptions {
  throttleMs?: number; // Rate-limit rapid updates
}

withPubSub(config?)

Location: @ws-kit/pubsub (canonical; also re-exported from @ws-kit/zod/@ws-kit/valibot for convenience)

Purpose: Enable topic-based broadcasting to multiple subscribers.

Configuration:

typescript
interface PubSubConfig {
  adapter?: PubSubAdapter; // Defaults to memoryPubSub()
}

Effects:

  • Adds ctx.publish(topic, schema, data) method
  • Adds ctx.topics.subscribe(topic) and ctx.topics.unsubscribe(topic)
  • Works with any backend via adapters (memory, Redis, Cloudflare, custom)
  • No validation dependency required

Example:

typescript
// Subscribe (in a handler or lifecycle hook)
await ctx.topics.subscribe("chat:room-123");

// Publish
router.publish("chat:room-123", MessageSchema, { text: "Hello" });

// Unsubscribe
await ctx.topics.unsubscribe("chat:room-123");

Adapter Swap Pattern (no code change needed):

typescript
// Development (memory adapter, zero config)
.plugin(withPubSub())  // Uses memoryPubSub() by default

// Production (Redis adapter)
import { redisPubSub } from "@ws-kit/redis";
.plugin(withPubSub({ adapter: redisPubSub(redis) }))

// Cloudflare Workers (Durable Objects)
import { DurablePubSub } from "@ws-kit/cloudflare";
.plugin(withPubSub({ adapter: new DurablePubSub(state) }))

Type Effects:

typescript
interface ContextWithPubSub {
  publish<TPayload>(
    topic: string,
    schema: MessageSchema<TPayload>,
    data: TPayload,
    opts?: SendOptions,
  ): void | Promise<void>;

  topics: {
    subscribe(topic: string): Promise<void>;
    unsubscribe(topic: string): Promise<void>;
    list(): Promise<string[]>; // List subscribed topics for this connection
  };
}

rateLimit(config)

Location: @ws-kit/rate-limit (canonical)

Purpose: Rate-limiting middleware for messages per connection, per user, or per type.

Note: Rate limiting is middleware (.use()), not a plugin (.plugin()). It doesn't add context methods; it gates handler execution.

Configuration:

typescript
interface RateLimitOptions {
  limiter: RateLimiter; // Required: memoryRateLimiter(), redisRateLimiter(), etc.
  key?: (ctx: MinimalContext) => string; // Custom key function (default: keyPerUserPerType)
  cost?: (ctx: MinimalContext) => number; // Token cost per message (default: 1)
}

Effects:

  • Registers global middleware that checks rate limits before handlers
  • Blocks over-limit messages with RESOURCE_EXHAUSTED error
  • Works with any backend via adapters (memory, Redis, Cloudflare)

Example:

typescript
import { rateLimit, keyPerUserPerType } from "@ws-kit/rate-limit";
import { redisRateLimiter } from "@ws-kit/redis";

const router = createRouter().use(
  rateLimit({
    limiter: redisRateLimiter(redis, { capacity: 1000, tokensPerSecond: 50 }),
    key: keyPerUserPerType, // Per-user-per-type isolation
    cost: (ctx) => (ctx.type === "Compute" ? 10 : 1),
  }),
);

Built-in Key Functions:

typescript
import { keyPerUser, keyPerUserPerType } from "@ws-kit/rate-limit";

// keyPerUserPerType (default) — tenant:user:type
// keyPerUser — tenant:user (lighter footprint)

Type Effects:

typescript
// Rate limiting applies globally; no context methods added
// Handlers run only if rate limit not exceeded
// If exceeded, automatic error response is sent

Adapter System

What is an Adapter?

An adapter is a backend implementation for a plugin. Adapters:

  • Implement the plugin's interface contract
  • Handle backend-specific concerns (Redis, memory, Cloudflare, etc.)
  • Are swappable without code changes

Core Adapter Interfaces

PubSubAdapter

typescript
export interface PubSubAdapter {
  /**
   * Subscribe a connection to a topic.
   */
  subscribe(clientId: string, topic: string): Promise<void>;

  /**
   * Unsubscribe a connection from a topic.
   */
  unsubscribe(clientId: string, topic: string): Promise<void>;

  /**
   * Publish a message to all subscribers of a topic.
   * Returns publish result with matched subscriber count.
   */
  publish(topic: string, message: SerializedMessage): Promise<PublishResult>;

  /**
   * List all topics a connection is subscribed to.
   */
  list(clientId: string): Promise<string[]>;
}

export interface PublishResult {
  matched: number; // Number of subscribers that received message
  capability: "exact" | "prefix" | "regex"; // Matching mode used
}

RateLimiter

typescript
export interface RateLimiter {
  /**
   * Atomically consume tokens from a rate limit bucket.
   * @param key - Rate limit key (e.g., "user:123:SendMessage")
   * @param cost - Number of tokens to consume
   */
  consume(key: string, cost: number): Promise<RateLimitDecision>;

  /**
   * Get the policy configuration for this rate limiter.
   * Required for accurate error reporting in middleware.
   */
  getPolicy(): RateLimitPolicy;

  /**
   * Optional cleanup on app shutdown.
   */
  dispose?(): void | Promise<void>;
}

export type RateLimitDecision =
  | { allowed: true; remaining: number }
  | { allowed: false; remaining: number; retryAfterMs: number | null };

Memory Adapters (Zero-Config Defaults)

Memory adapters are available in @ws-kit/memory for development and testing.

memoryPubSub():

typescript
import { memoryPubSub } from "@ws-kit/memory";

const router = createRouter().plugin(withPubSub({ adapter: memoryPubSub() }));

memoryRateLimiter():

typescript
import { rateLimit } from "@ws-kit/rate-limit";
import { memoryRateLimiter } from "@ws-kit/memory";

const router = createRouter().use(
  rateLimit({
    limiter: memoryRateLimiter({ capacity: 100, tokensPerSecond: 10 }),
  }),
);

External Adapters

External packages provide production-grade adapters.

@ws-kit/redis

Distributed pub/sub and rate-limiting via Redis.

typescript
import { createClient } from "redis";
import { redisPubSub, redisRateLimiter } from "@ws-kit/redis";
import { rateLimit } from "@ws-kit/rate-limit";

const redis = createClient();
await redis.connect();

const router = createRouter()
  .plugin(withPubSub({ adapter: redisPubSub(redis) }))
  .use(
    rateLimit({
      limiter: redisRateLimiter(redis, { capacity: 1000, tokensPerSecond: 50 }),
    }),
  );

@ws-kit/cloudflare

Cloudflare Workers adapters using Durable Objects and native rate limiting.

typescript
import { DurablePubSub, createDurableObjectHandler } from "@ws-kit/cloudflare";

// In your Durable Object class:
export class WebSocketDO {
  private pubsub: DurablePubSub;

  constructor(state: DurableObjectState, env: Env) {
    this.pubsub = new DurablePubSub(state);
    const router = createRouter().plugin(withPubSub({ adapter: this.pubsub }));
    // ... setup handler
  }
}

Custom Adapters

Users can implement custom adapters for proprietary backends.

Example: Kafka Pub/Sub:

typescript
import { PubSubAdapter } from "@ws-kit/core";

export function kafkaPubSub(producer: KafkaProducer): PubSubAdapter {
  return {
    subscribe: async (clientId, topic) => {
      // Track subscription client-side (Kafka doesn't have subscriptions)
      await subscriptionMap.set(`${topic}:${clientId}`, true);
    },
    unsubscribe: async (clientId, topic) => {
      await subscriptionMap.delete(`${topic}:${clientId}`);
    },
    publish: async (topic, message) => {
      await producer.send({
        topic,
        messages: [{ value: JSON.stringify(message) }],
      });
      const subscribers = await subscriptionMap.keys(`${topic}:*`);
      return { matched: subscribers.length, capability: "exact" };
    },
    list: async (clientId) => {
      const keys = await subscriptionMap.keys(`*:${clientId}`);
      return keys.map((k) => k.split(":")[0]);
    },
  };
}

Plugin Type Safety

Capability Gating

TypeScript enforces that methods only exist if their plugins are registered.

typescript
const router = createRouter();

// ❌ Error: Property 'send' does not exist
router.on(PingMessage, (ctx) => {
  ctx.send(PongMessage, {}); // Missing withMessaging() plugin
});

// ✅ OK after adding plugin
const router2 = createRouter().plugin(withMessaging());

router2.on(PingMessage, (ctx) => {
  ctx.send(PongMessage, {}); // ✅ Available now
});

How It Works

Each plugin returns a router with enhanced type signatures:

typescript
// Type-level tracking
interface Router<
  TValidator extends ValidatorAdapter,
  TContext extends ConnectionData,
  TCapabilities extends Record<string, unknown>,
> {
  plugin<TNewCapability>(
    plugin: Plugin<TContext, TNewCapability>,
  ): Router<TValidator, TContext, TCapabilities & TNewCapability>;

  on<TPayload, TResponse>(
    schema: MessageSchema<TPayload>,
    handler: (ctx: MinimalContext & TCapabilities) => void,
  ): this;
}

Plugin Definition (Advanced)

Creating a Custom Plugin

Use definePlugin() helper for full type safety:

typescript
import { definePlugin } from "@ws-kit/core";

export interface MyPluginCapability {
  myMethod(arg: string): void;
}

export function withMyPlugin(): Plugin<ConnectionData, MyPluginCapability> {
  return definePlugin<ConnectionData, MyPluginCapability>(
    "myPlugin",
    (router, config) => {
      // Register middleware
      router.use((ctx, next) => {
        // Enhance context
        (ctx as any).myMethod = (arg: string) => {
          console.log("My plugin method called with:", arg);
        };
        return next();
      });

      return router;
    },
  );
}

Usage:

typescript
const router = createRouter().plugin(withMyPlugin());

router.on(someSchema, (ctx) => {
  ctx.myMethod("hello"); // ✅ TypeScript knows about this
});

Plugin Composition Patterns

Pattern 1: Feature Feature Flags

Enable/disable features with configuration:

typescript
export function withMessaging(config?: { enabled?: boolean }): Plugin {
  if (config?.enabled === false) {
    return (router) => router; // No-op
  }

  return (router) => {
    // Register messaging middleware
    router.use((ctx, next) => {
      ctx.send = (schema, data) => {
        /* ... */
      };
      return next();
    });
    return router;
  };
}

Pattern 2: Conditional Adapters

Choose adapters based on environment:

typescript
const adapter =
  process.env.NODE_ENV === "production" ? redisPubSub(redis) : memoryPubSub();

const router = createRouter().plugin(withPubSub({ adapter }));

Pattern 3: Plugin Dependencies

Ensure plugins are registered in correct order. Type system enforces this:

typescript
// ❌ Type error: withRpc() requires withValidation()
const router = createRouter().plugin(withRpc());

// ✅ OK: withValidation() before withRpc()
const router = createRouter().plugin(withValidation()).plugin(withRpc());

Best Practices

  1. Always validate input schemas

    typescript
    // ❌ No validation
    router.on((ctx) => {
      console.log(ctx.payload); // Any type
    });
    
    // ✅ With validation
    router.on(message("PING", { text: z.string() }), (ctx) => {
      console.log(ctx.payload.text); // string
    });
  2. Use memory adapters for development, external for production

    typescript
    const adapter =
      process.env.NODE_ENV === "production"
        ? redisPubSub(redis)
        : memoryPubSub();
  3. Extract adapter initialization to functions

    typescript
    // ❌ Adapter logic scattered
    function createApp() {
      return createRouter().plugin(
        withPubSub({
          adapter:
            process.env.NODE_ENV === "production"
              ? redisPubSub(redis)
              : memoryPubSub(),
        }),
      );
    }
    
    // ✅ Clean initialization
    function createAdapter() {
      return process.env.NODE_ENV === "production"
        ? redisPubSub(redis)
        : memoryPubSub();
    }
    
    function createApp() {
      return createRouter().plugin(withPubSub({ adapter: createAdapter() }));
    }
  4. Configure rate limits per use case

    typescript
    import { rateLimit } from "@ws-kit/rate-limit";
    import { memoryRateLimiter } from "@ws-kit/memory";
    
    // Strict (API key)
    .use(rateLimit({
      limiter: memoryRateLimiter({ capacity: 100, tokensPerSecond: 5 }),
      key: (ctx) => ctx.data.apiKey,
    }))
    
    // Relaxed (authenticated user)
    .use(rateLimit({
      limiter: memoryRateLimiter({ capacity: 1000, tokensPerSecond: 50 }),
      key: (ctx) => `user:${ctx.data.userId}`,
    }))
  5. Use topic namespacing for organization

    typescript
    // ❌ Flat namespace (collision risk)
    await ctx.topics.subscribe("messages");
    
    // ✅ Hierarchical (clear organization)
    await ctx.topics.subscribe("chat:room:123:messages");
    await ctx.topics.subscribe("notifications:user:456");

Migration Guide

Canonical Import Sources (All Versions)

Always use canonical sources per ADR-032:

Recommended (validator convenience):

typescript
import { createRouter, withZod } from "@ws-kit/zod"; // or @ws-kit/valibot
import { withPubSub } from "@ws-kit/pubsub";
import { rateLimit } from "@ws-kit/rate-limit";
import { memoryRateLimiter } from "@ws-kit/memory";

const router = createRouter()
  .plugin(withZod())
  .plugin(withPubSub())
  .use(
    rateLimit({
      limiter: memoryRateLimiter({ capacity: 100, tokensPerSecond: 10 }),
    }),
  );

Explicit (canonical sources, same imports):

typescript
import { createRouter, withZod } from "@ws-kit/zod";
import { withMessaging, withRpc } from "@ws-kit/plugins";
import { withPubSub } from "@ws-kit/pubsub";
import { rateLimit } from "@ws-kit/rate-limit";
import { memoryRateLimiter } from "@ws-kit/memory";

const router = createRouter()
  .plugin(withZod())
  .plugin(withMessaging())
  .plugin(withRpc())
  .plugin(withPubSub())
  .use(
    rateLimit({
      limiter: memoryRateLimiter({ capacity: 100, tokensPerSecond: 10 }),
    }),
  );

Note: Importing from non-canonical sources (e.g., withPubSub from @ws-kit/plugins when it's actually in @ws-kit/pubsub) will fail. Always use the canonical source for each feature.


References