Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

Composable Middleware for Mediator and Event Bus

A handler that has to log its invocation, check authorisation, record an audit-trail entry, validate its input, capture metrics, and only then do its actual work has more lines of cross-cutting concern than business logic. @PipelineBehavior from @frenchexdev/ddd-pipeline-behavior reifies the cross-cutting concerns as ordered middleware so the handler stays focused on its single responsibility.


What @PipelineBehavior Reifies

The pattern, popularised by MediatR's IPipelineBehavior<TRequest, TResponse>, wraps a handler with a stack of behaviours. Each behaviour receives the request and a next continuation; the behaviour decides what to do before calling next, after next returns, or instead of next (short-circuit). The composition is a function-of-function chain: behaviour 1 wraps behaviour 2 wraps the handler. Order matters — outer behaviours see the request first and the response last.

The decorator carries scope ('mediator' | 'event-bus' | 'both') and optional order: number (lower runs outer). The scope controls where the behaviour applies — some make sense for commands (idempotency), some for events (retry), some everywhere (logging). The order makes the stacking explicit so a refactor that adds a new behaviour does not silently change the wrap order.


The Runtime: ddd-pipeline-behavior

pipeline.ts declares the surface in 48 lines. The Behavior<TInput, TOutput> interface — one method, invoke(input, next) — is the contract. The composeBehaviors helper folds an array of behaviours around a terminal function using reduceRight:

export function composeBehaviors<TInput, TOutput>(
  behaviors: readonly Behavior<TInput, TOutput>[],
  terminal:  (input: TInput) => Promise<TOutput>,
): (input: TInput) => Promise<TOutput> {
  return behaviors.reduceRight<(input: TInput) => Promise<TOutput>>(
    (next, behavior) => async (input) => behavior.invoke(input, next),
    terminal,
  );
}

The reduceRight direction is what makes the first behaviour in the array the outermost in the chain. Three behaviours [logging, idempotency, rbac] produce a chain logging(idempotency(rbac(handler))) — logging sees the request first, rbac sees it last, handler runs only if all three approve.

A typical logging behaviour:

@PipelineBehavior({ scope: 'mediator', order: -100 }) // outermost
export class LoggingBehavior implements Behavior<unknown, unknown> {
  constructor(private readonly logger: LoggerPort) {}

  async invoke(input: unknown, next: (input: unknown) => Promise<unknown>): Promise<unknown> {
    const name = (input as object).constructor.name;
    const t0 = performance.now();
    this.logger.info('dispatch.start', { name });
    try {
      const out = await next(input);
      this.logger.info('dispatch.end', { name, latencyMs: performance.now() - t0 });
      return out;
    } catch (err) {
      this.logger.error('dispatch.error', err, { name, latencyMs: performance.now() - t0 });
      throw err;
    }
  }
}

The behaviour wraps the handler, captures start time, invokes next, captures end time, logs success or failure. The handler itself does not log; the behaviour does it once, for every dispatch.


The Analyzer: ddd-pipeline-behavior-analyzer

The analyzer is hand-written legacy, in the same cohort as Mediator, Domain Event, and Value Object. codes.ts exports two diagnostic factories in the DDD0NNN namespace.

DDD0350_PIPELINE_DUPLICATE_ORDER is the non-determinism warning. Two behaviours in the same scope sharing the same order value have undefined execution order under Array.sort (not guaranteed stable across all engines). Often equivalent (two read-only logging behaviours), but a real concern worth surfacing — warning severity rather than error:

export const DDD0350_PIPELINE_DUPLICATE_ORDER = 'DDD0350';

export function pipelineDuplicateOrder(scope: string, order: number, behaviorA: string, behaviorB: string, file: string): Diagnostic {
  return {
    code: DDD0350_PIPELINE_DUPLICATE_ORDER,
    severity: 'warning',
    message: `Pipeline behaviors "${behaviorA}" and "${behaviorB}" share order ${order} in scope "${scope}". Execution order is then non-deterministic — assign distinct orders.`,
    file,
  };
}

DDD0351_PIPELINE_INVALID_SCOPE is the typo/stale-string check at error severity. The scope must be one of three literal values; anything else is a typo or a stale string after a rename, and the wireup codegen would silently skip the behaviour. Structural, not stylistic.

The hand-written shape places this analyzer on the same migration list as the mediator's — PROP-PIPELINE-001 (to file) will lift it to defineAnalyzerSpec. The two codes (DDD0350, DDD0351) are part of the contract that must survive the migration.


The Codegen: ddd-pipeline-behavior-codegen

The codegen is hand-written, like the analyzer. generator.ts exports generatePipelineWireup(input) taking a GeneratePipelineWireupInput (scope, behaviors: { behaviorClass, order }[]) and returning a banner-stamped pipelineBehaviors() function. The behaviours are sorted by order ascending before emission, so the consumer hands the resulting array directly to composeBehaviors:

// AUTO-GENERATED by ddd-pipeline-behavior-codegen@0.0.1 — do not edit.
/* eslint-disable */
// Pipeline wireup for scope "mediator". Behaviors sorted by order ascending.

export function pipelineBehaviors() {
  return [
    new LoggingBehavior(),
    new TracingBehavior(),
    new RbacBehavior(),
    new IdempotencyBehavior(),
  ];
}

The banner is emitted via withDddBanner from ddd-core-codegen — but no defineCodegenSpec ratifies the formal invariants yet. PROP-PIPELINE-002 (to file) will lift the codegen to spec-first with banner + idempotence invariants declared.


Back to the series index.

⬇ Download