ADR-023: Split PubSubDriver and BrokerConsumer for Clean Responsibility Separation
Date: 2025-01-XX Status: Accepted Replaces: ADR-022 (integration details) Related: docs/specs/pubsub.md, docs/specs/adapters.md
Context
The previous pub/sub adapter design included an optional onRemotePublished hook within the PubSubDriver interface. This created ambiguity about adapter responsibilities:
- Adapters were documented as "local subscription index + local fan-out only"
- Yet the interface included broker ingress wiring (
onRemotePublished) - This blurred the line between adapter concerns (subscription tracking) and platform concerns (broker consumption)
Distributed deployments (Redis, Cloudflare DO, Kafka) need to:
- Maintain a local subscription index
- Publish to a broker
- Consume broker messages and invoke router callbacks
The combined onRemotePublished hook didn't clearly separate these concerns, making it hard to test, compose, or understand each layer's responsibility.
Decision
Split the pub/sub layer into two explicit interfaces:
PubSubDriver— Pure local responsibility:- Maintains subscription index
- Tracks per-client topic subscriptions
- Broadcasts router-materialized messages to matching local subscribers
- Returns local subscriber stats (capability + matchedLocal count)
- Never consumes broker messages or calls back into router
BrokerConsumer— Pure inbound responsibility:- Consumes messages from broker (Redis SUBSCRIBE, Kafka, Cloudflare DO callbacks, etc.)
- Invokes router/platform callback with
PublishEnvelope - Returns teardown function for cleanup
- Never maintains subscription state or delivers WebSocket frames
Design Rationale
Single Responsibility
Each interface has one clear job:
- PubSubDriver: "Where are my local subscribers for this topic?"
- BrokerConsumer: "Tell me when messages arrive from the broker"
Testing and mocking become straightforward—mock each concern independently.
Composability
Distributed adapters export both utilities, but they're loosely coupled:
const adapter = redisPubSub(redis); // Local index + egress
const ingress = redisConsumer(redis); // Broker ingress only
// Router/platform wires them together
ingress.start((envelope) => deliverLocally(adapter, envelope));Advanced deployments can:
- Use multiple broker sources (Redis + Kafka)
- Apply custom filtering/transformation between ingress and delivery
- Mock either piece independently for testing
- Scale subscription index separately from broker consumption
Backward Compatibility
Memory adapter needs no changes—it's purely local, has no ingress:
const adapter = memoryPubSub(); // Local only, no ingress neededRouter Integration
Router's responsibility is clear:
- Validate schema and build
PublishEnvelope - Call
adapter.publish(envelope)→ returns local stats - Iterate
adapter.getLocalSubscribers(topic)and deliver to WebSockets - (Separately) Initialize
ingress.start(handler)to wire broker → local delivery
This keeps the router's role orthogonal to adapter concerns.
Implementation
Types (core/pubsub)
/**
* Pub/Sub adapter: subscription index + local fan-out only.
* Never consumes broker messages or calls back into router.
*/
export interface PubSubDriver {
publish(
envelope: PublishEnvelope,
opts?: PublishOptions,
): Promise<PublishResult>;
subscribe(clientId: string, topic: string): Promise<void>;
unsubscribe(clientId: string, topic: string): Promise<void>;
getLocalSubscribers(topic: string): AsyncIterable<string>;
listTopics?(): Promise<readonly string[]>;
hasTopic?(topic: string): Promise<boolean>;
close?(): Promise<void>;
}
/**
* Broker ingress: message consumption only.
* Decoupled from subscription indexing and delivery.
*/
export interface BrokerConsumer {
start(
onMessage: (envelope: PublishEnvelope) => void | Promise<void>,
): () => void;
}Memory Adapter (unchanged)
const adapter = memoryPubSub(); // No ingress; local-only
// Works as-isRedis Adapter + Ingress
// Adapter: local index + Redis egress
const adapter = redisPubSub(redis, { channelPrefix: "ws:" });
// Ingress: Redis inbound
const ingress = redisConsumer(redis, { channelPrefix: "ws:" });
// Wire together (platform/router responsibility)
ingress.start((envelope) => deliverLocally(adapter, envelope));Cloudflare DO Adapter + Ingress
// Adapter: local index + DO egress
const adapter = durableObjectsPubSub(env.DO_NAMESPACE);
// Ingress: DO callback handler
const ingress = durableObjectsConsumer();
// Router/DO calls ingress when messages arrive
ingress.start((envelope) => deliverLocally(adapter, envelope));
// In your DO's fetch() handler:
if (request.method === "POST" && request.url.endsWith("/publish")) {
const envelope = await request.json();
await ingressHandler(envelope);
}Consequences
Benefits
- Clarity: Each layer has one job, clearly documented
- Testability: Mock adapter and ingress independently
- Composability: Multiple brokers, selective fan-in, custom filters
- Flexibility: Apps choose how to wire ingress (HTTP callback, alarms, queues, etc.)
Drawbacks
- One extra import for distributed setups (adapter + ingress)
- Router integration changes needed (from monolithic adapter to split design)
- Slight boilerplate for wiring ingress (mitigated by examples)
Migration Path
- Define
BrokerConsumerinterface (✓ completed) - Remove
BrokerConsumerfromPubSubDriver(✓ completed) - Create Redis adapter + ingress (✓ completed)
- Create Cloudflare DO adapter + ingress (✓ completed)
- Update router to use new adapter interface (pending)
- Deprecate old
BrokerConsumerhook (deferred) - Update documentation and examples (pending)
Related Standards
- publish(): Returns
PublishResultdiscriminated union with capability level - getLocalSubscribers(): Lazy
AsyncIterable<string>for backpressure support - matchedLocal: Always present on success (0 if no subscribers)
- capability: "exact" (memory), "estimate" (distributed with lower-bound), "unknown" (can't count)