Composable Middleware for Mediator and Event Bus
A handler that has to log its invocation, check authorisation, record an audit-trail entry, validate its input, capture metrics, and only then do its actual work has more lines of cross-cutting concern than business logic. @PipelineBehavior from @frenchexdev/ddd-pipeline-behavior reifies the cross-cutting concerns as ordered middleware so the handler stays focused on its single responsibility.
What @PipelineBehavior Reifies
The pattern, popularised by MediatR's IPipelineBehavior<TRequest, TResponse>, wraps a handler with a stack of behaviours. Each behaviour receives the request and a next continuation; the behaviour decides what to do before calling next, after next returns, or instead of next (short-circuit). The composition is a function-of-function chain: behaviour 1 wraps behaviour 2 wraps the handler. Order matters — outer behaviours see the request first and the response last.
The decorator carries scope ('mediator' | 'event-bus' | 'both') and optional order: number (lower runs outer). The scope controls where the behaviour applies — some make sense for commands (idempotency), some for events (retry), some everywhere (logging). The order makes the stacking explicit so a refactor that adds a new behaviour does not silently change the wrap order.
The Runtime: ddd-pipeline-behavior
pipeline.ts declares the surface in 48 lines. The Behavior<TInput, TOutput> interface — one method, invoke(input, next) — is the contract. The composeBehaviors helper folds an array of behaviours around a terminal function using reduceRight:
export function composeBehaviors<TInput, TOutput>(
behaviors: readonly Behavior<TInput, TOutput>[],
terminal: (input: TInput) => Promise<TOutput>,
): (input: TInput) => Promise<TOutput> {
return behaviors.reduceRight<(input: TInput) => Promise<TOutput>>(
(next, behavior) => async (input) => behavior.invoke(input, next),
terminal,
);
}export function composeBehaviors<TInput, TOutput>(
behaviors: readonly Behavior<TInput, TOutput>[],
terminal: (input: TInput) => Promise<TOutput>,
): (input: TInput) => Promise<TOutput> {
return behaviors.reduceRight<(input: TInput) => Promise<TOutput>>(
(next, behavior) => async (input) => behavior.invoke(input, next),
terminal,
);
}The reduceRight direction is what makes the first behaviour in the array the outermost in the chain. Three behaviours [logging, idempotency, rbac] produce a chain logging(idempotency(rbac(handler))) — logging sees the request first, rbac sees it last, handler runs only if all three approve.
A typical logging behaviour:
@PipelineBehavior({ scope: 'mediator', order: -100 }) // outermost
export class LoggingBehavior implements Behavior<unknown, unknown> {
constructor(private readonly logger: LoggerPort) {}
async invoke(input: unknown, next: (input: unknown) => Promise<unknown>): Promise<unknown> {
const name = (input as object).constructor.name;
const t0 = performance.now();
this.logger.info('dispatch.start', { name });
try {
const out = await next(input);
this.logger.info('dispatch.end', { name, latencyMs: performance.now() - t0 });
return out;
} catch (err) {
this.logger.error('dispatch.error', err, { name, latencyMs: performance.now() - t0 });
throw err;
}
}
}@PipelineBehavior({ scope: 'mediator', order: -100 }) // outermost
export class LoggingBehavior implements Behavior<unknown, unknown> {
constructor(private readonly logger: LoggerPort) {}
async invoke(input: unknown, next: (input: unknown) => Promise<unknown>): Promise<unknown> {
const name = (input as object).constructor.name;
const t0 = performance.now();
this.logger.info('dispatch.start', { name });
try {
const out = await next(input);
this.logger.info('dispatch.end', { name, latencyMs: performance.now() - t0 });
return out;
} catch (err) {
this.logger.error('dispatch.error', err, { name, latencyMs: performance.now() - t0 });
throw err;
}
}
}The behaviour wraps the handler, captures start time, invokes next, captures end time, logs success or failure. The handler itself does not log; the behaviour does it once, for every dispatch.
The Analyzer: ddd-pipeline-behavior-analyzer
The analyzer is hand-written legacy, in the same cohort as Mediator, Domain Event, and Value Object. codes.ts exports two diagnostic factories in the DDD0NNN namespace.
DDD0350_PIPELINE_DUPLICATE_ORDER is the non-determinism warning. Two behaviours in the same scope sharing the same order value have undefined execution order under Array.sort (not guaranteed stable across all engines). Often equivalent (two read-only logging behaviours), but a real concern worth surfacing — warning severity rather than error:
export const DDD0350_PIPELINE_DUPLICATE_ORDER = 'DDD0350';
export function pipelineDuplicateOrder(scope: string, order: number, behaviorA: string, behaviorB: string, file: string): Diagnostic {
return {
code: DDD0350_PIPELINE_DUPLICATE_ORDER,
severity: 'warning',
message: `Pipeline behaviors "${behaviorA}" and "${behaviorB}" share order ${order} in scope "${scope}". Execution order is then non-deterministic — assign distinct orders.`,
file,
};
}export const DDD0350_PIPELINE_DUPLICATE_ORDER = 'DDD0350';
export function pipelineDuplicateOrder(scope: string, order: number, behaviorA: string, behaviorB: string, file: string): Diagnostic {
return {
code: DDD0350_PIPELINE_DUPLICATE_ORDER,
severity: 'warning',
message: `Pipeline behaviors "${behaviorA}" and "${behaviorB}" share order ${order} in scope "${scope}". Execution order is then non-deterministic — assign distinct orders.`,
file,
};
}DDD0351_PIPELINE_INVALID_SCOPE is the typo/stale-string check at error severity. The scope must be one of three literal values; anything else is a typo or a stale string after a rename, and the wireup codegen would silently skip the behaviour. Structural, not stylistic.
The hand-written shape places this analyzer on the same migration list as the mediator's — PROP-PIPELINE-001 (to file) will lift it to defineAnalyzerSpec. The two codes (DDD0350, DDD0351) are part of the contract that must survive the migration.
The Codegen: ddd-pipeline-behavior-codegen
The codegen is hand-written, like the analyzer. generator.ts exports generatePipelineWireup(input) taking a GeneratePipelineWireupInput (scope, behaviors: { behaviorClass, order }[]) and returning a banner-stamped pipelineBehaviors() function. The behaviours are sorted by order ascending before emission, so the consumer hands the resulting array directly to composeBehaviors:
// AUTO-GENERATED by ddd-pipeline-behavior-codegen@0.0.1 — do not edit.
/* eslint-disable */
// Pipeline wireup for scope "mediator". Behaviors sorted by order ascending.
export function pipelineBehaviors() {
return [
new LoggingBehavior(),
new TracingBehavior(),
new RbacBehavior(),
new IdempotencyBehavior(),
];
}// AUTO-GENERATED by ddd-pipeline-behavior-codegen@0.0.1 — do not edit.
/* eslint-disable */
// Pipeline wireup for scope "mediator". Behaviors sorted by order ascending.
export function pipelineBehaviors() {
return [
new LoggingBehavior(),
new TracingBehavior(),
new RbacBehavior(),
new IdempotencyBehavior(),
];
}The banner is emitted via withDddBanner from ddd-core-codegen — but no defineCodegenSpec ratifies the formal invariants yet. PROP-PIPELINE-002 (to file) will lift the codegen to spec-first with banner + idempotence invariants declared.
Cross-Links
- Wraps
@Mediatordispatches (single-dispatch with response) and@EventBuspublications (broadcast with failure aggregation). - Common behaviours invoke
@Log,@Metrics,@Tracing,@RBAC,@AuditTrail. - An idempotency behaviour reads the
idempotencyKeydeclared by@CommandHandler.
Back to the series index.