Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

Modes & Infrastructure

"If you need RabbitMQ, Redis, and MinIO running just to debug a button click, your architecture has failed the developer."


The Problem

Distributed systems have a dirty secret: the local development experience is terrible.

Your production saga uses RabbitMQ for messaging, MinIO for file storage, Redis for progress tracking and distributed locks. So to run the thing locally you need a docker-compose.yml with five services, 2 GB of RAM just for infrastructure, and a README that starts with "first, install Docker."

Unit tests? They either hit real infrastructure (slow, flaky, port-conflict-prone) or you hand-roll mocks for every queue, every lock, every storage call. Integration tests? Good luck running those in CI without a Docker-in-Docker setup.

The core insight: the saga orchestrator doesn't care where the messages go. It calls IQueuePublisher.PublishAsync. It calls IS3Client.UploadAsync. It calls IDistributedLock.TryAcquireAsync. Whether those calls hit RabbitMQ or an in-memory channel is a deployment decision, not a design decision.

DistributedTask.Dsl bakes this into the DSL itself.


Two Modes

Every distributed task declares a default mode:

[DistributedTask("CreateZipFromFiles", Mode = TaskMode.InProcess)]
public partial class CreateZipFromFilesTask
{
    // Steps, conditions, retry policies — all the same
}

TaskMode is a simple enum:

public enum TaskMode
{
    InProcess,
    Distributed,
}

The attribute sets the default. The DI registration can override it. This means you can declare Mode = TaskMode.InProcess for local-first development and flip to Distributed in production configuration without touching the task definition.


InProcess Mode

In InProcess mode, the API and the Worker run inside the same ASP.NET host. No external dependencies. No Docker. No ports to manage.

The Architecture

┌─────────────────────────────────────────────────┐
│              Single ASP.NET Host                │
│                                                 │
│  ┌──────────┐    Channel<T>    ┌─────────────┐  │
│  │   API    │ ──────────────→  │ Background  │  │
│  │Controller│                  │  Service     │  │
│  └──────────┘                  │  (Worker)    │  │
│       │                        └──────┬───────┘  │
│       │                               │          │
│  ┌────┴────────────────────────────────┴──────┐  │
│  │        ConcurrentDictionary<T>             │  │
│  │     (Files, Locks, Progress, State)        │  │
│  └────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────┘

The generated BackgroundService reads from a Channel<T> — .NET's built-in bounded, thread-safe, async producer-consumer primitive. The API controller writes to the channel. The worker reads from it. Same process, same memory space, zero serialization overhead for the handoff.

In-Memory Implementations

Each infrastructure abstraction gets a lightweight in-memory implementation:

InMemoryS3Client — file storage backed by a dictionary:

public sealed class InMemoryS3Client : IS3Client
{
    private readonly ConcurrentDictionary<string, byte[]> _store = new();

    public Task UploadAsync(string bucket, string key, Stream data,
        CancellationToken ct = default)
    {
        using var ms = new MemoryStream();
        data.CopyTo(ms);
        _store[$"{bucket}/{key}"] = ms.ToArray();
        return Task.CompletedTask;
    }

    public Task<Stream> DownloadAsync(string bucket, string key,
        CancellationToken ct = default)
    {
        if (!_store.TryGetValue($"{bucket}/{key}", out var bytes))
            throw new FileNotFoundException($"{bucket}/{key}");
        return Task.FromResult<Stream>(new MemoryStream(bytes));
    }

    public Task DeleteAsync(string bucket, string key,
        CancellationToken ct = default)
    {
        _store.TryRemove($"{bucket}/{key}", out _);
        return Task.CompletedTask;
    }

    public Task<IReadOnlyList<string>> ListObjectsAsync(string bucket,
        string? prefix = null, CancellationToken ct = default)
    {
        var keys = _store.Keys
            .Where(k => k.StartsWith($"{bucket}/"))
            .Select(k => k[($"{bucket}/".Length)..])
            .Where(k => prefix is null || k.StartsWith(prefix))
            .ToList();
        return Task.FromResult<IReadOnlyList<string>>(keys);
    }
}

InMemoryQueuePublisher / InMemoryQueueConsumer — backed by Channel<T>:

public sealed class InMemoryQueuePublisher : IQueuePublisher
{
    private readonly InMemoryChannelRegistry _registry;

    public InMemoryQueuePublisher(InMemoryChannelRegistry registry)
        => _registry = registry;

    public Task PublishAsync<T>(string queueName, T message,
        CancellationToken ct = default) where T : class
    {
        var channel = _registry.GetOrCreate<T>(queueName);
        return channel.Writer.WriteAsync(message, ct).AsTask();
    }
}

public sealed class InMemoryQueueConsumer<T> : IQueueConsumer<T> where T : class
{
    private readonly Channel<T> _channel;

    public InMemoryQueueConsumer(InMemoryChannelRegistry registry,
        string queueName)
        => _channel = registry.GetOrCreate<T>(queueName);

    public async IAsyncEnumerable<T> ConsumeAsync(
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await foreach (var item in _channel.Reader.ReadAllAsync(ct))
        {
            yield return item;
        }
    }
}

The InMemoryChannelRegistry is a singleton that maps queue names to Channel<T> instances. Bounded capacity is configurable — default is 100 — so back-pressure works even in-process.

InMemoryDistributedLock — semaphore-based:

public sealed class InMemoryDistributedLock : IDistributedLock
{
    private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new();

    public async Task<IAsyncDisposable?> TryAcquireAsync(string resource,
        TimeSpan expiry, CancellationToken ct = default)
    {
        var semaphore = _locks.GetOrAdd(resource, _ => new SemaphoreSlim(1, 1));
        if (!await semaphore.WaitAsync(TimeSpan.Zero, ct))
            return null;

        return new LockRelease(semaphore);
    }

    private sealed class LockRelease : IAsyncDisposable
    {
        private readonly SemaphoreSlim _semaphore;
        public LockRelease(SemaphoreSlim semaphore) => _semaphore = semaphore;
        public ValueTask DisposeAsync()
        {
            _semaphore.Release();
            return ValueTask.CompletedTask;
        }
    }
}

InMemoryTaskProgressTracker — dictionary plus channel for streaming:

public sealed class InMemoryTaskProgressTracker : ITaskProgressTracker
{
    private readonly ConcurrentDictionary<Guid, TaskProgress> _state = new();
    private readonly ConcurrentDictionary<Guid, Channel<TaskProgress>> _streams = new();

    public Task InitializeAsync(Guid taskId, string taskName,
        int totalSteps, CancellationToken ct = default)
    {
        var progress = new TaskProgress(taskId, taskName, totalSteps);
        _state[taskId] = progress;
        return Task.CompletedTask;
    }

    public Task UpdateAsync(Guid taskId, int completedSteps,
        string? message = null, CancellationToken ct = default)
    {
        if (!_state.TryGetValue(taskId, out var progress))
            throw new InvalidOperationException($"Task {taskId} not initialized.");

        progress = progress with
        {
            CompletedSteps = completedSteps,
            Message = message,
            UpdatedAt = DateTimeOffset.UtcNow,
        };
        _state[taskId] = progress;

        if (_streams.TryGetValue(taskId, out var channel))
            channel.Writer.TryWrite(progress);

        return Task.CompletedTask;
    }

    public Task CompleteAsync(Guid taskId, bool success,
        string? message = null, CancellationToken ct = default)
    {
        if (_state.TryGetValue(taskId, out var progress))
        {
            progress = progress with
            {
                IsComplete = true,
                Success = success,
                Message = message,
                UpdatedAt = DateTimeOffset.UtcNow,
            };
            _state[taskId] = progress;

            if (_streams.TryGetValue(taskId, out var channel))
            {
                channel.Writer.TryWrite(progress);
                channel.Writer.TryComplete();
            }
        }
        return Task.CompletedTask;
    }

    public Task<TaskProgress?> GetAsync(Guid taskId,
        CancellationToken ct = default)
    {
        _state.TryGetValue(taskId, out var progress);
        return Task.FromResult(progress);
    }

    public async IAsyncEnumerable<TaskProgress> StreamAsync(Guid taskId,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        var channel = _streams.GetOrAdd(taskId,
            _ => Channel.CreateUnbounded<TaskProgress>());

        await foreach (var p in channel.Reader.ReadAllAsync(ct))
        {
            yield return p;
        }
    }
}

What This Gives You

  • Zero external dependenciesdotnet run and you're working
  • Deterministic tests — no network, no timing, no port conflicts
  • Fast feedback — no container startup, no connection handshakes
  • Debuggable — set a breakpoint in the worker, it hits in the same process
  • Demos — show the system working without infrastructure prerequisites

Distributed Mode

In Distributed mode, the abstractions bind to real infrastructure:

┌──────────────┐         ┌──────────────┐
│   API Host   │         │ Worker Host  │
│              │         │              │
│  Controller  │         │ Background   │
│      │       │         │  Service     │
│      ▼       │         │      ▲       │
│  QueuePub    │         │  QueueCon    │
└──────┬───────┘         └──────┬───────┘
       │                        │
       ▼                        │
┌──────────────┐         ┌──────┴───────┐
│  RabbitMQ /  │────────→│  RabbitMQ /  │
│  Redis Strm  │         │  Redis Strm  │
└──────────────┘         └──────────────┘
       │                        │
       ▼                        ▼
┌──────────────┐         ┌──────────────┐
│    MinIO     │         │    Redis     │
│  (S3 files)  │         │  (progress,  │
│              │         │   locks)     │
└──────────────┘         └──────────────┘

The implementations change. The generated orchestrator code does not.

Abstraction InProcess Distributed
IS3Client ConcurrentDictionary<string, byte[]> MinIO / AWS S3
IQueuePublisher Channel<T>.Writer RabbitMQ / Redis Streams / Azure Service Bus
IQueueConsumer<T> Channel<T>.Reader RabbitMQ consumer / Redis XREADGROUP
IDistributedLock SemaphoreSlim RedLock (Redis)
ITaskProgressTracker ConcurrentDictionary + Channel<T> Redis hash + pub/sub

The API and Worker are separate processes (or containers, or Kubernetes pods). They share nothing except the message contract and the infrastructure services.


The Abstraction Layer

Six interfaces form the infrastructure boundary. The generated code depends only on these — never on a concrete client.

IS3Client

public interface IS3Client
{
    Task UploadAsync(string bucket, string key, Stream data,
        CancellationToken ct = default);

    Task<Stream> DownloadAsync(string bucket, string key,
        CancellationToken ct = default);

    Task DeleteAsync(string bucket, string key,
        CancellationToken ct = default);

    Task<IReadOnlyList<string>> ListObjectsAsync(string bucket,
        string? prefix = null, CancellationToken ct = default);
}

Upload and download use Stream — not byte[] — to support large files without buffering everything in memory. The in-memory implementation cheats (it copies to a byte array), but the MinIO implementation streams directly.

IQueuePublisher

public interface IQueuePublisher
{
    Task PublishAsync<T>(string queueName, T message,
        CancellationToken ct = default) where T : class;
}

One method. The queue name is a string — the source generator emits it as a constant (CreateZipFromFilesQueues.WorkerQueue). The message type T is the generated worker message class.

IQueueConsumer<T>

public interface IQueueConsumer<T> where T : class
{
    IAsyncEnumerable<T> ConsumeAsync(CancellationToken ct = default);
}

Returns an IAsyncEnumerable<T> — the BackgroundService does await foreach over it. Clean, cancellable, back-pressure-aware.

ITaskProgressTracker

public interface ITaskProgressTracker
{
    Task InitializeAsync(Guid taskId, string taskName,
        int totalSteps, CancellationToken ct = default);

    Task UpdateAsync(Guid taskId, int completedSteps,
        string? message = null, CancellationToken ct = default);

    Task CompleteAsync(Guid taskId, bool success,
        string? message = null, CancellationToken ct = default);

    Task<TaskProgress?> GetAsync(Guid taskId,
        CancellationToken ct = default);

    IAsyncEnumerable<TaskProgress> StreamAsync(Guid taskId,
        CancellationToken ct = default);
}

StreamAsync powers the SSE/WebSocket/SignalR listening strategies. In distributed mode it subscribes to a Redis pub/sub channel keyed by task ID. In InProcess mode it reads from a Channel<TaskProgress>.

IDistributedLock

public interface IDistributedLock
{
    Task<IAsyncDisposable?> TryAcquireAsync(string resource,
        TimeSpan expiry, CancellationToken ct = default);
}

Returns null if the lock is already held. Returns an IAsyncDisposable that releases the lock on dispose. The expiry prevents deadlocks if the holder crashes. Usage:

await using var @lock = await _lock.TryAcquireAsync(
    $"task:{taskId}", TimeSpan.FromMinutes(5), ct);

if (@lock is null)
{
    _logger.LogWarning("Task {TaskId} already being processed.", taskId);
    return;
}

// Safe to proceed — we hold the lock
await ExecuteStepAsync(taskId, ct);

ISagaStateSerializer

public interface ISagaStateSerializer
{
    byte[] Serialize<T>(T value) where T : class;
    T Deserialize<T>(byte[] data) where T : class;
}

Used for step data round-tripping through the queue and for saga state snapshots. The default implementation uses System.Text.Json, but you can swap in MessagePack or Protobuf.


Switching Modes

The generated DI extension method returns a builder that chains mode configuration:

// Development — everything in-process, zero dependencies
services.AddCreateZipFromFilesTask()
    .UseInProcessMode();

// Production — real infrastructure
services.AddCreateZipFromFilesTask()
    .UseDistributedMode(options =>
    {
        options.UseRabbitMq("amqp://guest:guest@rabbitmq:5672");
        options.UseMinIO("minio:9000", "minioadmin", "minioadmin");
        options.UseRedis("redis:6379");
    });

Under the hood, UseInProcessMode() registers all the in-memory implementations:

public static TaskBuilder UseInProcessMode(this TaskBuilder builder)
{
    builder.Services.AddSingleton<InMemoryChannelRegistry>();
    builder.Services.AddSingleton<IS3Client, InMemoryS3Client>();
    builder.Services.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
    builder.Services.AddSingleton(typeof(IQueueConsumer<>),
        typeof(InMemoryQueueConsumer<>));
    builder.Services.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
    builder.Services.AddSingleton<ITaskProgressTracker,
        InMemoryTaskProgressTracker>();
    builder.Services.AddSingleton<ISagaStateSerializer,
        JsonSagaStateSerializer>();

    // The worker runs as a BackgroundService in the same host
    builder.Services.AddHostedService<CreateZipFromFilesWorkerService>();

    return builder;
}

And UseDistributedMode registers the real clients:

public static TaskBuilder UseDistributedMode(this TaskBuilder builder,
    Action<DistributedModeOptions> configure)
{
    var options = new DistributedModeOptions();
    configure(options);

    builder.Services.AddSingleton<IS3Client>(
        new MinIOClient(options.S3Endpoint, options.S3AccessKey,
            options.S3SecretKey));
    builder.Services.AddSingleton<IQueuePublisher>(
        new RabbitMqPublisher(options.RabbitMqConnectionString));
    builder.Services.AddSingleton(typeof(IQueueConsumer<>),
        typeof(RabbitMqConsumer<>));
    builder.Services.AddSingleton<IDistributedLock>(
        new RedLockDistributedLock(options.RedisConnectionString));
    builder.Services.AddSingleton<ITaskProgressTracker>(
        new RedisTaskProgressTracker(options.RedisConnectionString));
    builder.Services.AddSingleton<ISagaStateSerializer,
        JsonSagaStateSerializer>();

    return builder;
}

The generated orchestrator, the generated controller, the generated worker service — none of them know which mode they're in. They depend on the interfaces. The container resolves the right implementation.

You can also override individual services. Want MinIO in development but in-memory queues?

services.AddCreateZipFromFilesTask()
    .UseInProcessMode()
    .OverrideS3(new MinIOClient("localhost:9000", "dev", "dev"));

Or swap the serializer:

services.AddCreateZipFromFilesTask()
    .UseDistributedMode(options => { /* ... */ })
    .UseSerializer<MessagePackSagaStateSerializer>();

Serialization

The ISagaStateSerializer interface controls how step data, worker messages, and saga state snapshots are serialized. Three implementations ship out of the box:

JSON (System.Text.Json) — The Default

public sealed class JsonSagaStateSerializer : ISagaStateSerializer
{
    private static readonly JsonSerializerOptions Options = new()
    {
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
        DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
        Converters = { new JsonStringEnumConverter() },
    };

    public byte[] Serialize<T>(T value) where T : class
        => JsonSerializer.SerializeToUtf8Bytes(value, Options);

    public T Deserialize<T>(byte[] data) where T : class
        => JsonSerializer.Deserialize<T>(data, Options)
           ?? throw new InvalidOperationException(
               $"Failed to deserialize {typeof(T).Name}.");
}

Human-readable. Debuggable — you can inspect queue messages in the RabbitMQ management UI. Good enough for most workloads.

MessagePack — Speed and Size

public sealed class MessagePackSagaStateSerializer : ISagaStateSerializer
{
    private static readonly MessagePackSerializerOptions Options =
        MessagePackSerializerOptions.Standard
            .WithResolver(ContractlessStandardResolver.Instance);

    public byte[] Serialize<T>(T value) where T : class
        => MessagePackSerializer.Serialize(value, Options);

    public T Deserialize<T>(byte[] data) where T : class
        => MessagePackSerializer.Deserialize<T>(data, Options);
}

3-5x faster serialization, 30-50% smaller payloads compared to JSON. The contractless resolver means you don't need [Key] attributes on your step data — it uses property names. Good for high-throughput pipelines where the queue is the bottleneck.

Protobuf — Schema Evolution and Cross-Language

public sealed class ProtobufSagaStateSerializer : ISagaStateSerializer
{
    public byte[] Serialize<T>(T value) where T : class, IMessage<T>
    {
        using var ms = new MemoryStream();
        value.WriteTo(ms);
        return ms.ToArray();
    }

    public T Deserialize<T>(byte[] data) where T : class, IMessage<T>, new()
    {
        var msg = new T();
        msg.MergeFrom(data);
        return msg;
    }
}

Protobuf gives you forward/backward compatible schema evolution and cross-language interop. If your Worker is in Go or Python and your API is in C#, Protobuf is the natural choice. The trade-off: your step data types must implement IMessage<T> (generated from .proto files), which couples you to the Protobuf toolchain.

Choosing

Concern JSON MessagePack Protobuf
Human-readable Yes No No
Serialization speed Baseline 3-5x faster 2-3x faster
Payload size Largest ~50-70% of JSON ~30-50% of JSON
Schema evolution Fragile Fragile Built-in
Cross-language Yes (text) Yes (binary) Yes (binary + codegen)
Setup cost Zero NuGet package .proto files + codegen
Debuggability Excellent Poor Poor

Default to JSON. Switch to MessagePack when profiling shows serialization as a bottleneck. Use Protobuf when you need cross-language consumers or guaranteed schema evolution.


Summary

The two-mode architecture is not a convenience feature — it's a correctness feature. InProcess mode ensures that every developer, every CI run, and every demo exercises the same saga logic that runs in production. The abstraction layer (IS3Client, IQueuePublisher, IQueueConsumer<T>, ITaskProgressTracker, IDistributedLock, ISagaStateSerializer) is the contract between the generated orchestrator and the outside world. The generated code never reaches past these interfaces.

The DSL declares the task. The mode declares the infrastructure. They are orthogonal.


What's Next

Part VIII: Resilience — Polly integration per step, idempotency keys, RedLock distributed locking, cancellation with compensation, and S3 orphan reconciliation. The saga can survive anything except kill -9 on every node simultaneously (and even then, reconciliation picks up the pieces).