Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

In-Process Broadcast with Isolated Failures

A domain event without a bus has nowhere to go. @frenchexdev/ddd-event-bus is the simplest part of the integration backbone: an in-process broadcaster that fans an event out to every subscriber registered for its constructor, isolates failures so one bad subscriber cannot abort the others, and returns the failures rather than swallowing them. Three responsibilities, around forty lines of code, and a contract sharp enough that adapters for asynchronous or distributed delivery can satisfy the same shape later.


What EventBus Reifies

In a layered architecture, handler resolution is the missing piece between the aggregate and its consumers. The aggregate raises a SubscriptionStarted; somebody has to notice; the consumers — a metrics counter, a webhook dispatcher, a read-model projection — each need to be wired up to it. The naive shape is direct method calls from the aggregate to each consumer, which couples the aggregate to every downstream concern and turns adding a fourth consumer into a refactor of the aggregate. The clean shape is a bus: the aggregate publishes, the bus distributes, the consumers subscribe.

The bus has two non-obvious responsibilities that distinguish it from a Map<EventCtor, Subscriber[]> you might write in a hurry. First, subscriber isolation: a subscriber that throws on a particular event must not prevent the other subscribers from receiving the same event. The publisher gets back a list of failures and decides what to do with them — log, retry, escalate — but it never sees an unhandled exception from the bus itself. Second, typed dispatch by constructor: subscribers register against the event class, not against a string name, so renaming the event class is a compile error rather than a silent runtime mis-route.

The in-process bus is the simplest member of a family. The same interface — subscribe(ctor, sub) + publish(event) — works behind an adapter that buffers to an outbox table, ships across a Kafka topic, or batches into a single async commit. The bus's job is to be the dispatch shape; the adapter's job is to make that shape distributed.


The Runtime: ddd-event-bus

event-bus.ts declares the surface in 45 working lines. Three exported types and one class.

EventCtor<TEvent> is the typed event-class reference — new (...args: never[]) => TEvent. The never[] for arguments is deliberate: the bus does not need to know how the event was constructed, only that it has a constructor it can use as a dispatch key. EventSubscriber<TEvent> is the subscriber shape — a single method on(event: TEvent): Promise<void> | void, sync or async, returning nothing. SubscriberFailure is the failure record returned from publish{ subscriberName, error }, not thrown.

The EventBus class itself holds a Map<Function, Array<EventSubscriber<unknown>>> indexed by event constructor. subscribe pushes the subscriber into the list for that constructor. publish looks up the list, walks it, calls await sub.on(event) for each, catches any throw, and returns the failures at the end:

async publish<TEvent>(event: TEvent): Promise<readonly SubscriberFailure[]> {
  const ctor = (event as object).constructor as Function;
  const list = this.subs.get(ctor) ?? [];
  const failures: SubscriberFailure[] = [];
  for (const sub of list) {
    try {
      await sub.on(event);
    } catch (err) {
      failures.push({ subscriberName: sub.constructor.name, error: err });
    }
  }
  return failures;
}

The contract is the iterator. Subscribers receive the event in registration order, sequentially. A subscriber that throws produces a SubscriberFailure entry; the loop continues to the next subscriber. The caller of publish reads the failures array and decides — logs them, surfaces them, accepts them, retries them. There is no unhandled-promise-rejection, no swallowed error, no implicit failure-mode contract; the bus tells you what failed, the application service decides what that means.

In our invented architecture, a metrics subscriber and a webhook dispatcher both subscribed to SubscriptionStarted:

import { EventBus } from '@frenchexdev/ddd-event-bus';
import { SubscriptionStarted } from '../events/subscription-started.js';

const bus = new EventBus();

bus.subscribe(SubscriptionStarted, {
  async on(e) {
    metrics.counter('subscription.started').increment({ plan: e.planId });
  },
});

bus.subscribe(SubscriptionStarted, {
  async on(e) {
    await webhooks.dispatch('subscription.started', e);
  },
});

const failures = await bus.publish(new SubscriptionStarted(/* ... */));
if (failures.length > 0) for (const f of failures) log.error({ f }, 'subscriber failed');

Both subscribers receive the event. If the webhook dispatch throws because the customer's endpoint is down, the metrics counter still increments and the failure shows up in the returned array. The aggregate's emission does not care which subscribers exist — that is the bus's responsibility — and the bus does not care what the subscribers do.

subscriberCount(eventCtor) is the only inspection method on the bus. It returns the number of subscribers currently registered for an event class — useful for tests that want to verify wireup, and for the analyzer that wants to detect publications with zero subscribers.


The Analyzer: ddd-event-bus-analyzer

The analyzer is hand-written and ships a single diagnostic, DDD0340_EVENT_BUS_NO_SUBSCRIBER from codes.ts. The rule walks the workspace, finds every call to bus.publish(new SomeEvent(...)), walks every call to bus.subscribe(SomeEvent, ...), and flags any event class published with no registered subscriber:

export function eventBusNoSubscriber(eventClass: string, file: string): Diagnostic {
  return {
    code: DDD0340_EVENT_BUS_NO_SUBSCRIBER,
    severity: 'warning',
    message: `@DomainEvent "${eventClass}" is published but has no subscriber. The event will be silently dropped at runtime.`,
    file,
  };
}

The severity is warning, not error. The reason is real: a team may deliberately publish events for which no in-process subscriber exists yet — perhaps the only consumer is an outbox-fed external system, perhaps the subscriber is conditionally registered in production but not in dev. Surfacing the orphan publication makes the situation visible without blocking. A team that wants the rule at error severity overrides it in their build configuration; the analyzer ships the default that fits most projects.

The hand-written shape of this analyzer puts it in the same migration cohort as ddd-domain-event-analyzer — a future milestone will rewrite it on top of defineAnalyzerSpec so the rule list is editable from the spec rather than from a code change.


The Codegen: ddd-event-bus-codegen

The codegen emits the bus wireup. generator.ts consumes a list of { eventClass, subscriberClass } pairs and emits a wireEventBus(bus) function that calls bus.subscribe(EventCtor, new SubscriberCtor()) for each. The generated function is what the composition root imports — application bootstrap calls wireEventBus(bus) once, after constructing the bus, and every declared subscription is registered in one place:

// AUTO-GENERATED by ddd-event-bus-codegen@0.0.1 — do not edit.
/* eslint-disable */
// EventBus wireup: registers every (Event, Subscriber) pair at composition root.

export function wireEventBus(bus: { subscribe(eventCtor: unknown, sub: unknown): void }): void {
  bus.subscribe(SubscriptionStarted,   new MetricsCounterSubscriber());
  bus.subscribe(SubscriptionStarted,   new WebhookDispatcherSubscriber());
  bus.subscribe(SubscriptionCancelled, new CancellationAuditSubscriber());
  bus.subscribe(PaymentCaptured,       new ReadModelProjectionSubscriber());
}

The bus parameter type is intentionally minimal — { subscribe(eventCtor: unknown, sub: unknown): void } — so the wireup works against the real EventBus, against a test fake, and against any future adapter that conforms to the same subscribe shape. The composition root passes its real bus instance, the test harness passes a recording fake, and the same wireEventBus call produces the same wiring graph in both.

The lack of fixture-derived invariant tests on this triplet — and the directly-emitted wireup file rather than the spec-driven *.generated.ts shape of the strategic patterns — is again a marker of the earlier hand-written era. The contract (the shape of the emitted function, the codes the analyzer surfaces) is what the migration to spec-first must preserve.


The event bus is the in-process delivery surface for the integration backbone.

  • It dispatches the events emitted by @AggregateRoot and decorated with @DomainEvent.
  • For cross-process delivery, the bus is fronted by an @Outbox so events become transactional with the aggregate's persistence.
  • A bus subscriber may write the event to an @EventStore — that is the wireup that makes the system event-sourced.
  • A bus subscriber may also be a @Projection that maintains a read-optimised view.
  • An @ApplicationService typically owns the bus instance and calls wireEventBus(bus) during construction.

Back to the series index.

⬇ Download