Part 09: The Event Bus — Cross-Stage Communication Without Coupling
"
set -xis not events. Stop pretending."
Why
Part 07 showed the pipeline publishing StageStarted and StageCompleted to an IHomeLabEventBus. That sentence sounds innocent. It is, in fact, the second-most important architectural decision in the whole lib (the most important being Result<T> everywhere).
The reason is observability. A pipeline that logs to Console.WriteLine cannot be tested for what it logged. A pipeline that uses ILogger<T> directly couples every stage to a specific logging framework and produces unstructured output that downstream tools have to parse. A pipeline that publishes typed events into an in-process bus, on the other hand, makes every meaningful step interrogable — by tests, by progress reporters, by audit logs, by the observability stack, by plugins, by future use cases nobody has thought of yet.
Events are not logging. Events are not tracing. Events are not metrics. Events are facts about what the system did. Logs, traces, and metrics are projections of events. If you start with events, you get all three downstream surfaces for free. If you start with logs, you get logs and you have to work uphill for everything else.
The thesis of this part is: IHomeLabEventBus is the cross-stage communication mechanism. Stages publish typed events. Anything that wants to react subscribes. The bus does not know which stages exist. The stages do not know who is listening. The result is loose coupling, testability, and a free observability surface.
The shape
The event bus is a simple interface backed by FrenchExDev.Net.Reactive (which is itself a thin wrapper over System.Reactive):
public interface IHomeLabEvent
{
DateTimeOffset Timestamp { get; }
}
public interface IHomeLabEventBus
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : IHomeLabEvent;
IDisposable Subscribe<TEvent>(Func<TEvent, CancellationToken, Task> handler)
where TEvent : IHomeLabEvent;
IDisposable SubscribeAll(Func<IHomeLabEvent, CancellationToken, Task> handler);
}public interface IHomeLabEvent
{
DateTimeOffset Timestamp { get; }
}
public interface IHomeLabEventBus
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : IHomeLabEvent;
IDisposable Subscribe<TEvent>(Func<TEvent, CancellationToken, Task> handler)
where TEvent : IHomeLabEvent;
IDisposable SubscribeAll(Func<IHomeLabEvent, CancellationToken, Task> handler);
}Three methods. Publish (called by stages, contributors, plugins, anything in the lib). Subscribe to a specific event type (called by typed handlers — the loggers, progress reporters, etc.). SubscribeAll for the bus dump (used by the audit log and the test recording fixture).
The default implementation:
[Injectable(ServiceLifetime.Singleton)]
public sealed class HomeLabEventBus : IHomeLabEventBus
{
private readonly IEventStream<IHomeLabEvent> _stream = new EventStream<IHomeLabEvent>();
private readonly IClock _clock;
public HomeLabEventBus(IClock clock) => _clock = clock;
public Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : IHomeLabEvent
{
_stream.Publish(@event);
return Task.CompletedTask;
}
public IDisposable Subscribe<TEvent>(Func<TEvent, CancellationToken, Task> handler)
where TEvent : IHomeLabEvent
=> _stream.OfType<TEvent>().SubscribeAsync(handler);
public IDisposable SubscribeAll(Func<IHomeLabEvent, CancellationToken, Task> handler)
=> _stream.SubscribeAsync(handler);
}[Injectable(ServiceLifetime.Singleton)]
public sealed class HomeLabEventBus : IHomeLabEventBus
{
private readonly IEventStream<IHomeLabEvent> _stream = new EventStream<IHomeLabEvent>();
private readonly IClock _clock;
public HomeLabEventBus(IClock clock) => _clock = clock;
public Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : IHomeLabEvent
{
_stream.Publish(@event);
return Task.CompletedTask;
}
public IDisposable Subscribe<TEvent>(Func<TEvent, CancellationToken, Task> handler)
where TEvent : IHomeLabEvent
=> _stream.OfType<TEvent>().SubscribeAsync(handler);
public IDisposable SubscribeAll(Func<IHomeLabEvent, CancellationToken, Task> handler)
=> _stream.SubscribeAsync(handler);
}The bus is in-process and synchronous by default. Synchronous matters: when a stage publishes StageCompleted, every handler subscribed to it has finished running by the time the Publish call returns. This guarantees that progress is rendered before the next stage starts, that audit entries are persisted before the pipeline moves on, and that tests can assert on events without race conditions.
Async-ready is the second part: handlers can be Func<TEvent, CancellationToken, Task>, so anyone who wants to do work asynchronously (e.g. publish to a remote audit endpoint) can await it without blocking. The bus does not care.
The event taxonomy
Events are records. Every event is named after a fact, in the past tense. Every event is immutable.
public sealed record PipelineStarted(HomeLabRequest Request, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PipelineCompleted(HomeLabRequest Request, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PipelineFailed(HomeLabRequest Request, string FailedAtStage, IReadOnlyList<string> Errors, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record StageStarted(string StageName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record StageCompleted(string StageName, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record StageFailed(string StageName, IReadOnlyList<string> Errors, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PackerBuildStarted(string ImageName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PackerBuildCompleted(string ImageName, string BoxPath, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VagrantBoxAdded(string BoxName, string BoxFile, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VagrantBoxPublished(string BoxName, string Version, string RegistryUrl, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosUpStarted(string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosUpCompleted(string MachineName, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosHaltCompleted(string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosDestroyCompleted(string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record ComposeStackGenerated(string ProjectName, string FilePath, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record ComposeStackDeployed(string ProjectName, string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record ComposeStackTornDown(string ProjectName, string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record TlsCaGenerationStarted(string CaName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record TlsCaGenerated(string CaName, string CaPath, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record TlsCertIssued(string Domain, string CertPath, string KeyPath, DateTimeOffset Expiry, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record DnsEntryAdded(string Hostname, string Ip, string Provider, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record DnsEntryRemoved(string Hostname, string Provider, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record GitLabConfigured(string Url, string AdminEmail, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record GitLabRunnerRegistered(string RunnerName, string Executor, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PluginLoaded(string PluginName, string Version, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record OpsConstraintViolated(string ConstraintName, string Subject, string Message, DateTimeOffset Timestamp) : IHomeLabEvent;public sealed record PipelineStarted(HomeLabRequest Request, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PipelineCompleted(HomeLabRequest Request, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PipelineFailed(HomeLabRequest Request, string FailedAtStage, IReadOnlyList<string> Errors, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record StageStarted(string StageName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record StageCompleted(string StageName, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record StageFailed(string StageName, IReadOnlyList<string> Errors, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PackerBuildStarted(string ImageName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PackerBuildCompleted(string ImageName, string BoxPath, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VagrantBoxAdded(string BoxName, string BoxFile, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VagrantBoxPublished(string BoxName, string Version, string RegistryUrl, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosUpStarted(string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosUpCompleted(string MachineName, TimeSpan Duration, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosHaltCompleted(string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record VosDestroyCompleted(string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record ComposeStackGenerated(string ProjectName, string FilePath, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record ComposeStackDeployed(string ProjectName, string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record ComposeStackTornDown(string ProjectName, string MachineName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record TlsCaGenerationStarted(string CaName, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record TlsCaGenerated(string CaName, string CaPath, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record TlsCertIssued(string Domain, string CertPath, string KeyPath, DateTimeOffset Expiry, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record DnsEntryAdded(string Hostname, string Ip, string Provider, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record DnsEntryRemoved(string Hostname, string Provider, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record GitLabConfigured(string Url, string AdminEmail, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record GitLabRunnerRegistered(string RunnerName, string Executor, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record PluginLoaded(string PluginName, string Version, DateTimeOffset Timestamp) : IHomeLabEvent;
public sealed record OpsConstraintViolated(string ConstraintName, string Subject, string Message, DateTimeOffset Timestamp) : IHomeLabEvent;Roughly thirty events covering the meaningful steps of the lab lifecycle. The list grows as the lib grows. There is no central registry — events are discovered by their IHomeLabEvent marker. Adding a new event is one new record.
The naming convention is strict: noun + past-tense verb. Not PackerBuildEvent. Not OnPackerBuild. Not PackerBuildHandler. PackerBuildStarted and PackerBuildCompleted. The fact is in the name. The verb is past tense. The handler decides what to do with it.
Subscribers register themselves
A subscriber implements IHomeLabEventSubscriber, declares [Injectable(ServiceLifetime.Singleton)], and gets a chance to subscribe at startup:
public interface IHomeLabEventSubscriber
{
void Subscribe(IHomeLabEventBus bus);
}
[Injectable(ServiceLifetime.Singleton)]
public sealed class ConsoleProgressReporter : IHomeLabEventSubscriber
{
private readonly IHomeLabConsole _console;
public ConsoleProgressReporter(IHomeLabConsole console) => _console = console;
public void Subscribe(IHomeLabEventBus bus)
{
bus.Subscribe<StageStarted>(OnStageStarted);
bus.Subscribe<StageCompleted>(OnStageCompleted);
bus.Subscribe<StageFailed>(OnStageFailed);
}
private Task OnStageStarted(StageStarted e, CancellationToken ct)
{
_console.WriteLine($" → {e.StageName} ...");
return Task.CompletedTask;
}
private Task OnStageCompleted(StageCompleted e, CancellationToken ct)
{
_console.WriteLine($" ✓ {e.StageName} ({e.Duration.TotalMilliseconds:N0} ms)");
return Task.CompletedTask;
}
private Task OnStageFailed(StageFailed e, CancellationToken ct)
{
_console.WriteLine($" ✗ {e.StageName} ({e.Duration.TotalMilliseconds:N0} ms)");
foreach (var err in e.Errors) _console.WriteLine($" {err}");
return Task.CompletedTask;
}
}public interface IHomeLabEventSubscriber
{
void Subscribe(IHomeLabEventBus bus);
}
[Injectable(ServiceLifetime.Singleton)]
public sealed class ConsoleProgressReporter : IHomeLabEventSubscriber
{
private readonly IHomeLabConsole _console;
public ConsoleProgressReporter(IHomeLabConsole console) => _console = console;
public void Subscribe(IHomeLabEventBus bus)
{
bus.Subscribe<StageStarted>(OnStageStarted);
bus.Subscribe<StageCompleted>(OnStageCompleted);
bus.Subscribe<StageFailed>(OnStageFailed);
}
private Task OnStageStarted(StageStarted e, CancellationToken ct)
{
_console.WriteLine($" → {e.StageName} ...");
return Task.CompletedTask;
}
private Task OnStageCompleted(StageCompleted e, CancellationToken ct)
{
_console.WriteLine($" ✓ {e.StageName} ({e.Duration.TotalMilliseconds:N0} ms)");
return Task.CompletedTask;
}
private Task OnStageFailed(StageFailed e, CancellationToken ct)
{
_console.WriteLine($" ✗ {e.StageName} ({e.Duration.TotalMilliseconds:N0} ms)");
foreach (var err in e.Errors) _console.WriteLine($" {err}");
return Task.CompletedTask;
}
}The composition root iterates over every IHomeLabEventSubscriber at startup and calls Subscribe:
// (Generated by [Injectable] / called once after BuildServiceProvider)
public static void WireEventSubscribers(IServiceProvider sp)
{
var bus = sp.GetRequiredService<IHomeLabEventBus>();
foreach (var sub in sp.GetServices<IHomeLabEventSubscriber>())
sub.Subscribe(bus);
}// (Generated by [Injectable] / called once after BuildServiceProvider)
public static void WireEventSubscribers(IServiceProvider sp)
{
var bus = sp.GetRequiredService<IHomeLabEventBus>();
foreach (var sub in sp.GetServices<IHomeLabEventSubscriber>())
sub.Subscribe(bus);
}Adding a new subscriber is one new class with [Injectable] plus IHomeLabEventSubscriber. No registration list to edit. No central switch statement. No risk of forgetting.
Subscribers we ship by default
| Subscriber | Purpose |
|---|---|
ConsoleProgressReporter |
Renders stage progress to stdout |
JsonLineLogger |
Writes structured .jsonl log lines for homelab logs |
AuditLogPersister |
Appends events to .homelab/audit.jsonl for compliance |
EventBusObservabilityBridge |
Forwards events to Ops.Observability metrics (counters, timings) |
PluginEventForwarder |
Forwards events to plugins that registered for them |
Five subscribers, all built-in, all [Injectable]. Adding a sixth is trivial. Removing one is services.RemoveAll<JsonLineLogger>() in a test.
The test
Events are first-class assertion targets:
public sealed class RecordingEventBus : IHomeLabEventBus
{
private readonly List<IHomeLabEvent> _recorded = new();
public IReadOnlyList<IHomeLabEvent> Recorded => _recorded;
public Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : IHomeLabEvent
{
_recorded.Add(@event);
return Task.CompletedTask;
}
public IDisposable Subscribe<TEvent>(...) where TEvent : IHomeLabEvent => NoopDisposable.Instance;
public IDisposable SubscribeAll(...) => NoopDisposable.Instance;
}
[Fact]
public async Task vos_up_publishes_started_and_completed_events_in_order()
{
var bus = new RecordingEventBus();
var clock = new FakeClock(DateTimeOffset.Parse("2026-04-08T12:00:00Z"));
var vagrant = new FakeVagrantClient();
var handler = new VosUpRequestHandler(vagrant, bus, clock);
var result = await handler.HandleAsync(new VosUpRequest("main-01"), CancellationToken.None);
result.IsSuccess.Should().BeTrue();
bus.Recorded.OfType<VosUpStarted>().Should().ContainSingle()
.Which.MachineName.Should().Be("main-01");
bus.Recorded.OfType<VosUpCompleted>().Should().ContainSingle()
.Which.MachineName.Should().Be("main-01");
bus.Recorded.IndexOf(bus.Recorded.OfType<VosUpStarted>().Single())
.Should().BeLessThan(bus.Recorded.IndexOf(bus.Recorded.OfType<VosUpCompleted>().Single()));
}
[Fact]
public async Task pipeline_publishes_stage_events_in_order_and_pipeline_completed_at_the_end()
{
var bus = new RecordingEventBus();
var stages = new IHomeLabStage[]
{
new FakeStage("a", order: 0, returnFailure: false),
new FakeStage("b", order: 1, returnFailure: false),
new FakeStage("c", order: 2, returnFailure: false),
};
var pipeline = new HomeLabPipeline(stages, bus, new FakeClock(DateTimeOffset.UtcNow));
await pipeline.RunAsync(new TestRequest(), CancellationToken.None);
var events = bus.Recorded.ToList();
events.OfType<StageStarted>().Select(e => e.StageName).Should().Equal("a", "b", "c");
events.OfType<StageCompleted>().Select(e => e.StageName).Should().Equal("a", "b", "c");
events.Last().Should().BeOfType<PipelineCompleted>();
}public sealed class RecordingEventBus : IHomeLabEventBus
{
private readonly List<IHomeLabEvent> _recorded = new();
public IReadOnlyList<IHomeLabEvent> Recorded => _recorded;
public Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : IHomeLabEvent
{
_recorded.Add(@event);
return Task.CompletedTask;
}
public IDisposable Subscribe<TEvent>(...) where TEvent : IHomeLabEvent => NoopDisposable.Instance;
public IDisposable SubscribeAll(...) => NoopDisposable.Instance;
}
[Fact]
public async Task vos_up_publishes_started_and_completed_events_in_order()
{
var bus = new RecordingEventBus();
var clock = new FakeClock(DateTimeOffset.Parse("2026-04-08T12:00:00Z"));
var vagrant = new FakeVagrantClient();
var handler = new VosUpRequestHandler(vagrant, bus, clock);
var result = await handler.HandleAsync(new VosUpRequest("main-01"), CancellationToken.None);
result.IsSuccess.Should().BeTrue();
bus.Recorded.OfType<VosUpStarted>().Should().ContainSingle()
.Which.MachineName.Should().Be("main-01");
bus.Recorded.OfType<VosUpCompleted>().Should().ContainSingle()
.Which.MachineName.Should().Be("main-01");
bus.Recorded.IndexOf(bus.Recorded.OfType<VosUpStarted>().Single())
.Should().BeLessThan(bus.Recorded.IndexOf(bus.Recorded.OfType<VosUpCompleted>().Single()));
}
[Fact]
public async Task pipeline_publishes_stage_events_in_order_and_pipeline_completed_at_the_end()
{
var bus = new RecordingEventBus();
var stages = new IHomeLabStage[]
{
new FakeStage("a", order: 0, returnFailure: false),
new FakeStage("b", order: 1, returnFailure: false),
new FakeStage("c", order: 2, returnFailure: false),
};
var pipeline = new HomeLabPipeline(stages, bus, new FakeClock(DateTimeOffset.UtcNow));
await pipeline.RunAsync(new TestRequest(), CancellationToken.None);
var events = bus.Recorded.ToList();
events.OfType<StageStarted>().Select(e => e.StageName).Should().Equal("a", "b", "c");
events.OfType<StageCompleted>().Select(e => e.StageName).Should().Equal("a", "b", "c");
events.Last().Should().BeOfType<PipelineCompleted>();
}Three things to notice:
- The recording bus is ten lines. Every test that needs to assert on events uses it. No mocking framework, no setup boilerplate.
- The assertions are typed.
bus.Recorded.OfType<VosUpStarted>()is a compile-time-checked filter. If you rename the event, the test refuses to compile. - Order is asserted explicitly. Because the bus is synchronous, order is deterministic. The test does not have to sleep, retry, or poll.
What this gives you that bash doesn't
Bash gives you set -x, which prints every command to stderr. There is no structure. There is no taxonomy. There is no test that can assert "the script ran the build before the publish". The closest you can come is grep build before grep publish in the captured output, which is pattern-matching on free-form text.
A typed event bus gives you, for the same surface area:
- Structured progress rendered by a swappable subscriber
- Structured logs in
.jsonlthatjq,lnav, Loki, and the observability stack can ingest - Structured audit that compliance can read without parsing free-form English
- Structured tests that assert on events as values, not on log lines as strings
- A bridge to observability without adding metrics calls to every stage
- A plugin point: plugins subscribe to the same bus the lib publishes to
- Replay: dump events to a file, replay them in a test, assert the same outcomes
The bargain is the cheapest of all the architectural decisions in this series. Events are records. Records are ten lines. Recording is ten lines. Subscribing is ten lines. The total cost is maybe two hundred lines of infrastructure across the entire lib, and the payoff is that every observability surface — logs, metrics, audit, tests, progress, plugins — comes for free thereafter.
A note on IClock in events
Every event carries a DateTimeOffset Timestamp. The clock that produces the timestamp is IClock from FrenchExDev.Net.Clock. In production, IClock is SystemClock.Instance. In tests, IClock is a FakeClock that the test controls. This means tests can assert on event ordering and timing without flakiness, and the same code paths run in production and in test. We will see IClock again in Part 11 — it is the smallest and most underrated library in the toolbelt.
Cross-links
- Part 07: The Pipeline
- Part 08: SOLID and DRY in Practice
- Part 10: The Plugin System
- Part 11: The FrenchExDev Toolbelt
- Part 14: The Test Pyramid
- Part 44: The Observability Stack — where events get bridged to Prometheus
- Reactive (FrenchExDev.Net.Reactive) — the library the bus is built on