Modes & Infrastructure
"If you need RabbitMQ, Redis, and MinIO running just to debug a button click, your architecture has failed the developer."
The Problem
Distributed systems have a dirty secret: the local development experience is terrible.
Your production saga uses RabbitMQ for messaging, MinIO for file storage, Redis for progress tracking and distributed locks. So to run the thing locally you need a docker-compose.yml with five services, 2 GB of RAM just for infrastructure, and a README that starts with "first, install Docker."
Unit tests? They either hit real infrastructure (slow, flaky, port-conflict-prone) or you hand-roll mocks for every queue, every lock, every storage call. Integration tests? Good luck running those in CI without a Docker-in-Docker setup.
The core insight: the saga orchestrator doesn't care where the messages go. It calls IQueuePublisher.PublishAsync. It calls IS3Client.UploadAsync. It calls IDistributedLock.TryAcquireAsync. Whether those calls hit RabbitMQ or an in-memory channel is a deployment decision, not a design decision.
DistributedTask.Dsl bakes this into the DSL itself.
Two Modes
Every distributed task declares a default mode:
[DistributedTask("CreateZipFromFiles", Mode = TaskMode.InProcess)]
public partial class CreateZipFromFilesTask
{
// Steps, conditions, retry policies — all the same
}[DistributedTask("CreateZipFromFiles", Mode = TaskMode.InProcess)]
public partial class CreateZipFromFilesTask
{
// Steps, conditions, retry policies — all the same
}TaskMode is a simple enum:
public enum TaskMode
{
InProcess,
Distributed,
}public enum TaskMode
{
InProcess,
Distributed,
}The attribute sets the default. The DI registration can override it. This means you can declare Mode = TaskMode.InProcess for local-first development and flip to Distributed in production configuration without touching the task definition.
InProcess Mode
In InProcess mode, the API and the Worker run inside the same ASP.NET host. No external dependencies. No Docker. No ports to manage.
The Architecture
┌─────────────────────────────────────────────────┐
│ Single ASP.NET Host │
│ │
│ ┌──────────┐ Channel<T> ┌─────────────┐ │
│ │ API │ ──────────────→ │ Background │ │
│ │Controller│ │ Service │ │
│ └──────────┘ │ (Worker) │ │
│ │ └──────┬───────┘ │
│ │ │ │
│ ┌────┴────────────────────────────────┴──────┐ │
│ │ ConcurrentDictionary<T> │ │
│ │ (Files, Locks, Progress, State) │ │
│ └────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘┌─────────────────────────────────────────────────┐
│ Single ASP.NET Host │
│ │
│ ┌──────────┐ Channel<T> ┌─────────────┐ │
│ │ API │ ──────────────→ │ Background │ │
│ │Controller│ │ Service │ │
│ └──────────┘ │ (Worker) │ │
│ │ └──────┬───────┘ │
│ │ │ │
│ ┌────┴────────────────────────────────┴──────┐ │
│ │ ConcurrentDictionary<T> │ │
│ │ (Files, Locks, Progress, State) │ │
│ └────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘The generated BackgroundService reads from a Channel<T> — .NET's built-in bounded, thread-safe, async producer-consumer primitive. The API controller writes to the channel. The worker reads from it. Same process, same memory space, zero serialization overhead for the handoff.
In-Memory Implementations
Each infrastructure abstraction gets a lightweight in-memory implementation:
InMemoryS3Client — file storage backed by a dictionary:
public sealed class InMemoryS3Client : IS3Client
{
private readonly ConcurrentDictionary<string, byte[]> _store = new();
public Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
using var ms = new MemoryStream();
data.CopyTo(ms);
_store[$"{bucket}/{key}"] = ms.ToArray();
return Task.CompletedTask;
}
public Task<Stream> DownloadAsync(string bucket, string key,
CancellationToken ct = default)
{
if (!_store.TryGetValue($"{bucket}/{key}", out var bytes))
throw new FileNotFoundException($"{bucket}/{key}");
return Task.FromResult<Stream>(new MemoryStream(bytes));
}
public Task DeleteAsync(string bucket, string key,
CancellationToken ct = default)
{
_store.TryRemove($"{bucket}/{key}", out _);
return Task.CompletedTask;
}
public Task<IReadOnlyList<string>> ListObjectsAsync(string bucket,
string? prefix = null, CancellationToken ct = default)
{
var keys = _store.Keys
.Where(k => k.StartsWith($"{bucket}/"))
.Select(k => k[($"{bucket}/".Length)..])
.Where(k => prefix is null || k.StartsWith(prefix))
.ToList();
return Task.FromResult<IReadOnlyList<string>>(keys);
}
}public sealed class InMemoryS3Client : IS3Client
{
private readonly ConcurrentDictionary<string, byte[]> _store = new();
public Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
using var ms = new MemoryStream();
data.CopyTo(ms);
_store[$"{bucket}/{key}"] = ms.ToArray();
return Task.CompletedTask;
}
public Task<Stream> DownloadAsync(string bucket, string key,
CancellationToken ct = default)
{
if (!_store.TryGetValue($"{bucket}/{key}", out var bytes))
throw new FileNotFoundException($"{bucket}/{key}");
return Task.FromResult<Stream>(new MemoryStream(bytes));
}
public Task DeleteAsync(string bucket, string key,
CancellationToken ct = default)
{
_store.TryRemove($"{bucket}/{key}", out _);
return Task.CompletedTask;
}
public Task<IReadOnlyList<string>> ListObjectsAsync(string bucket,
string? prefix = null, CancellationToken ct = default)
{
var keys = _store.Keys
.Where(k => k.StartsWith($"{bucket}/"))
.Select(k => k[($"{bucket}/".Length)..])
.Where(k => prefix is null || k.StartsWith(prefix))
.ToList();
return Task.FromResult<IReadOnlyList<string>>(keys);
}
}InMemoryQueuePublisher / InMemoryQueueConsumer — backed by Channel<T>:
public sealed class InMemoryQueuePublisher : IQueuePublisher
{
private readonly InMemoryChannelRegistry _registry;
public InMemoryQueuePublisher(InMemoryChannelRegistry registry)
=> _registry = registry;
public Task PublishAsync<T>(string queueName, T message,
CancellationToken ct = default) where T : class
{
var channel = _registry.GetOrCreate<T>(queueName);
return channel.Writer.WriteAsync(message, ct).AsTask();
}
}
public sealed class InMemoryQueueConsumer<T> : IQueueConsumer<T> where T : class
{
private readonly Channel<T> _channel;
public InMemoryQueueConsumer(InMemoryChannelRegistry registry,
string queueName)
=> _channel = registry.GetOrCreate<T>(queueName);
public async IAsyncEnumerable<T> ConsumeAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
{
yield return item;
}
}
}public sealed class InMemoryQueuePublisher : IQueuePublisher
{
private readonly InMemoryChannelRegistry _registry;
public InMemoryQueuePublisher(InMemoryChannelRegistry registry)
=> _registry = registry;
public Task PublishAsync<T>(string queueName, T message,
CancellationToken ct = default) where T : class
{
var channel = _registry.GetOrCreate<T>(queueName);
return channel.Writer.WriteAsync(message, ct).AsTask();
}
}
public sealed class InMemoryQueueConsumer<T> : IQueueConsumer<T> where T : class
{
private readonly Channel<T> _channel;
public InMemoryQueueConsumer(InMemoryChannelRegistry registry,
string queueName)
=> _channel = registry.GetOrCreate<T>(queueName);
public async IAsyncEnumerable<T> ConsumeAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
{
yield return item;
}
}
}The InMemoryChannelRegistry is a singleton that maps queue names to Channel<T> instances. Bounded capacity is configurable — default is 100 — so back-pressure works even in-process.
InMemoryDistributedLock — semaphore-based:
public sealed class InMemoryDistributedLock : IDistributedLock
{
private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new();
public async Task<IAsyncDisposable?> TryAcquireAsync(string resource,
TimeSpan expiry, CancellationToken ct = default)
{
var semaphore = _locks.GetOrAdd(resource, _ => new SemaphoreSlim(1, 1));
if (!await semaphore.WaitAsync(TimeSpan.Zero, ct))
return null;
return new LockRelease(semaphore);
}
private sealed class LockRelease : IAsyncDisposable
{
private readonly SemaphoreSlim _semaphore;
public LockRelease(SemaphoreSlim semaphore) => _semaphore = semaphore;
public ValueTask DisposeAsync()
{
_semaphore.Release();
return ValueTask.CompletedTask;
}
}
}public sealed class InMemoryDistributedLock : IDistributedLock
{
private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new();
public async Task<IAsyncDisposable?> TryAcquireAsync(string resource,
TimeSpan expiry, CancellationToken ct = default)
{
var semaphore = _locks.GetOrAdd(resource, _ => new SemaphoreSlim(1, 1));
if (!await semaphore.WaitAsync(TimeSpan.Zero, ct))
return null;
return new LockRelease(semaphore);
}
private sealed class LockRelease : IAsyncDisposable
{
private readonly SemaphoreSlim _semaphore;
public LockRelease(SemaphoreSlim semaphore) => _semaphore = semaphore;
public ValueTask DisposeAsync()
{
_semaphore.Release();
return ValueTask.CompletedTask;
}
}
}InMemoryTaskProgressTracker — dictionary plus channel for streaming:
public sealed class InMemoryTaskProgressTracker : ITaskProgressTracker
{
private readonly ConcurrentDictionary<Guid, TaskProgress> _state = new();
private readonly ConcurrentDictionary<Guid, Channel<TaskProgress>> _streams = new();
public Task InitializeAsync(Guid taskId, string taskName,
int totalSteps, CancellationToken ct = default)
{
var progress = new TaskProgress(taskId, taskName, totalSteps);
_state[taskId] = progress;
return Task.CompletedTask;
}
public Task UpdateAsync(Guid taskId, int completedSteps,
string? message = null, CancellationToken ct = default)
{
if (!_state.TryGetValue(taskId, out var progress))
throw new InvalidOperationException($"Task {taskId} not initialized.");
progress = progress with
{
CompletedSteps = completedSteps,
Message = message,
UpdatedAt = DateTimeOffset.UtcNow,
};
_state[taskId] = progress;
if (_streams.TryGetValue(taskId, out var channel))
channel.Writer.TryWrite(progress);
return Task.CompletedTask;
}
public Task CompleteAsync(Guid taskId, bool success,
string? message = null, CancellationToken ct = default)
{
if (_state.TryGetValue(taskId, out var progress))
{
progress = progress with
{
IsComplete = true,
Success = success,
Message = message,
UpdatedAt = DateTimeOffset.UtcNow,
};
_state[taskId] = progress;
if (_streams.TryGetValue(taskId, out var channel))
{
channel.Writer.TryWrite(progress);
channel.Writer.TryComplete();
}
}
return Task.CompletedTask;
}
public Task<TaskProgress?> GetAsync(Guid taskId,
CancellationToken ct = default)
{
_state.TryGetValue(taskId, out var progress);
return Task.FromResult(progress);
}
public async IAsyncEnumerable<TaskProgress> StreamAsync(Guid taskId,
[EnumeratorCancellation] CancellationToken ct = default)
{
var channel = _streams.GetOrAdd(taskId,
_ => Channel.CreateUnbounded<TaskProgress>());
await foreach (var p in channel.Reader.ReadAllAsync(ct))
{
yield return p;
}
}
}public sealed class InMemoryTaskProgressTracker : ITaskProgressTracker
{
private readonly ConcurrentDictionary<Guid, TaskProgress> _state = new();
private readonly ConcurrentDictionary<Guid, Channel<TaskProgress>> _streams = new();
public Task InitializeAsync(Guid taskId, string taskName,
int totalSteps, CancellationToken ct = default)
{
var progress = new TaskProgress(taskId, taskName, totalSteps);
_state[taskId] = progress;
return Task.CompletedTask;
}
public Task UpdateAsync(Guid taskId, int completedSteps,
string? message = null, CancellationToken ct = default)
{
if (!_state.TryGetValue(taskId, out var progress))
throw new InvalidOperationException($"Task {taskId} not initialized.");
progress = progress with
{
CompletedSteps = completedSteps,
Message = message,
UpdatedAt = DateTimeOffset.UtcNow,
};
_state[taskId] = progress;
if (_streams.TryGetValue(taskId, out var channel))
channel.Writer.TryWrite(progress);
return Task.CompletedTask;
}
public Task CompleteAsync(Guid taskId, bool success,
string? message = null, CancellationToken ct = default)
{
if (_state.TryGetValue(taskId, out var progress))
{
progress = progress with
{
IsComplete = true,
Success = success,
Message = message,
UpdatedAt = DateTimeOffset.UtcNow,
};
_state[taskId] = progress;
if (_streams.TryGetValue(taskId, out var channel))
{
channel.Writer.TryWrite(progress);
channel.Writer.TryComplete();
}
}
return Task.CompletedTask;
}
public Task<TaskProgress?> GetAsync(Guid taskId,
CancellationToken ct = default)
{
_state.TryGetValue(taskId, out var progress);
return Task.FromResult(progress);
}
public async IAsyncEnumerable<TaskProgress> StreamAsync(Guid taskId,
[EnumeratorCancellation] CancellationToken ct = default)
{
var channel = _streams.GetOrAdd(taskId,
_ => Channel.CreateUnbounded<TaskProgress>());
await foreach (var p in channel.Reader.ReadAllAsync(ct))
{
yield return p;
}
}
}What This Gives You
- Zero external dependencies —
dotnet runand you're working - Deterministic tests — no network, no timing, no port conflicts
- Fast feedback — no container startup, no connection handshakes
- Debuggable — set a breakpoint in the worker, it hits in the same process
- Demos — show the system working without infrastructure prerequisites
Distributed Mode
In Distributed mode, the abstractions bind to real infrastructure:
┌──────────────┐ ┌──────────────┐
│ API Host │ │ Worker Host │
│ │ │ │
│ Controller │ │ Background │
│ │ │ │ Service │
│ ▼ │ │ ▲ │
│ QueuePub │ │ QueueCon │
└──────┬───────┘ └──────┬───────┘
│ │
▼ │
┌──────────────┐ ┌──────┴───────┐
│ RabbitMQ / │────────→│ RabbitMQ / │
│ Redis Strm │ │ Redis Strm │
└──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MinIO │ │ Redis │
│ (S3 files) │ │ (progress, │
│ │ │ locks) │
└──────────────┘ └──────────────┘┌──────────────┐ ┌──────────────┐
│ API Host │ │ Worker Host │
│ │ │ │
│ Controller │ │ Background │
│ │ │ │ Service │
│ ▼ │ │ ▲ │
│ QueuePub │ │ QueueCon │
└──────┬───────┘ └──────┬───────┘
│ │
▼ │
┌──────────────┐ ┌──────┴───────┐
│ RabbitMQ / │────────→│ RabbitMQ / │
│ Redis Strm │ │ Redis Strm │
└──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MinIO │ │ Redis │
│ (S3 files) │ │ (progress, │
│ │ │ locks) │
└──────────────┘ └──────────────┘The implementations change. The generated orchestrator code does not.
| Abstraction | InProcess | Distributed |
|---|---|---|
IS3Client |
ConcurrentDictionary<string, byte[]> |
MinIO / AWS S3 |
IQueuePublisher |
Channel<T>.Writer |
RabbitMQ / Redis Streams / Azure Service Bus |
IQueueConsumer<T> |
Channel<T>.Reader |
RabbitMQ consumer / Redis XREADGROUP |
IDistributedLock |
SemaphoreSlim |
RedLock (Redis) |
ITaskProgressTracker |
ConcurrentDictionary + Channel<T> |
Redis hash + pub/sub |
The API and Worker are separate processes (or containers, or Kubernetes pods). They share nothing except the message contract and the infrastructure services.
The Abstraction Layer
Six interfaces form the infrastructure boundary. The generated code depends only on these — never on a concrete client.
IS3Client
public interface IS3Client
{
Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default);
Task<Stream> DownloadAsync(string bucket, string key,
CancellationToken ct = default);
Task DeleteAsync(string bucket, string key,
CancellationToken ct = default);
Task<IReadOnlyList<string>> ListObjectsAsync(string bucket,
string? prefix = null, CancellationToken ct = default);
}public interface IS3Client
{
Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default);
Task<Stream> DownloadAsync(string bucket, string key,
CancellationToken ct = default);
Task DeleteAsync(string bucket, string key,
CancellationToken ct = default);
Task<IReadOnlyList<string>> ListObjectsAsync(string bucket,
string? prefix = null, CancellationToken ct = default);
}Upload and download use Stream — not byte[] — to support large files without buffering everything in memory. The in-memory implementation cheats (it copies to a byte array), but the MinIO implementation streams directly.
IQueuePublisher
public interface IQueuePublisher
{
Task PublishAsync<T>(string queueName, T message,
CancellationToken ct = default) where T : class;
}public interface IQueuePublisher
{
Task PublishAsync<T>(string queueName, T message,
CancellationToken ct = default) where T : class;
}One method. The queue name is a string — the source generator emits it as a constant (CreateZipFromFilesQueues.WorkerQueue). The message type T is the generated worker message class.
IQueueConsumer<T>
public interface IQueueConsumer<T> where T : class
{
IAsyncEnumerable<T> ConsumeAsync(CancellationToken ct = default);
}public interface IQueueConsumer<T> where T : class
{
IAsyncEnumerable<T> ConsumeAsync(CancellationToken ct = default);
}Returns an IAsyncEnumerable<T> — the BackgroundService does await foreach over it. Clean, cancellable, back-pressure-aware.
ITaskProgressTracker
public interface ITaskProgressTracker
{
Task InitializeAsync(Guid taskId, string taskName,
int totalSteps, CancellationToken ct = default);
Task UpdateAsync(Guid taskId, int completedSteps,
string? message = null, CancellationToken ct = default);
Task CompleteAsync(Guid taskId, bool success,
string? message = null, CancellationToken ct = default);
Task<TaskProgress?> GetAsync(Guid taskId,
CancellationToken ct = default);
IAsyncEnumerable<TaskProgress> StreamAsync(Guid taskId,
CancellationToken ct = default);
}public interface ITaskProgressTracker
{
Task InitializeAsync(Guid taskId, string taskName,
int totalSteps, CancellationToken ct = default);
Task UpdateAsync(Guid taskId, int completedSteps,
string? message = null, CancellationToken ct = default);
Task CompleteAsync(Guid taskId, bool success,
string? message = null, CancellationToken ct = default);
Task<TaskProgress?> GetAsync(Guid taskId,
CancellationToken ct = default);
IAsyncEnumerable<TaskProgress> StreamAsync(Guid taskId,
CancellationToken ct = default);
}StreamAsync powers the SSE/WebSocket/SignalR listening strategies. In distributed mode it subscribes to a Redis pub/sub channel keyed by task ID. In InProcess mode it reads from a Channel<TaskProgress>.
IDistributedLock
public interface IDistributedLock
{
Task<IAsyncDisposable?> TryAcquireAsync(string resource,
TimeSpan expiry, CancellationToken ct = default);
}public interface IDistributedLock
{
Task<IAsyncDisposable?> TryAcquireAsync(string resource,
TimeSpan expiry, CancellationToken ct = default);
}Returns null if the lock is already held. Returns an IAsyncDisposable that releases the lock on dispose. The expiry prevents deadlocks if the holder crashes. Usage:
await using var @lock = await _lock.TryAcquireAsync(
$"task:{taskId}", TimeSpan.FromMinutes(5), ct);
if (@lock is null)
{
_logger.LogWarning("Task {TaskId} already being processed.", taskId);
return;
}
// Safe to proceed — we hold the lock
await ExecuteStepAsync(taskId, ct);await using var @lock = await _lock.TryAcquireAsync(
$"task:{taskId}", TimeSpan.FromMinutes(5), ct);
if (@lock is null)
{
_logger.LogWarning("Task {TaskId} already being processed.", taskId);
return;
}
// Safe to proceed — we hold the lock
await ExecuteStepAsync(taskId, ct);ISagaStateSerializer
public interface ISagaStateSerializer
{
byte[] Serialize<T>(T value) where T : class;
T Deserialize<T>(byte[] data) where T : class;
}public interface ISagaStateSerializer
{
byte[] Serialize<T>(T value) where T : class;
T Deserialize<T>(byte[] data) where T : class;
}Used for step data round-tripping through the queue and for saga state snapshots. The default implementation uses System.Text.Json, but you can swap in MessagePack or Protobuf.
Switching Modes
The generated DI extension method returns a builder that chains mode configuration:
// Development — everything in-process, zero dependencies
services.AddCreateZipFromFilesTask()
.UseInProcessMode();
// Production — real infrastructure
services.AddCreateZipFromFilesTask()
.UseDistributedMode(options =>
{
options.UseRabbitMq("amqp://guest:guest@rabbitmq:5672");
options.UseMinIO("minio:9000", "minioadmin", "minioadmin");
options.UseRedis("redis:6379");
});// Development — everything in-process, zero dependencies
services.AddCreateZipFromFilesTask()
.UseInProcessMode();
// Production — real infrastructure
services.AddCreateZipFromFilesTask()
.UseDistributedMode(options =>
{
options.UseRabbitMq("amqp://guest:guest@rabbitmq:5672");
options.UseMinIO("minio:9000", "minioadmin", "minioadmin");
options.UseRedis("redis:6379");
});Under the hood, UseInProcessMode() registers all the in-memory implementations:
public static TaskBuilder UseInProcessMode(this TaskBuilder builder)
{
builder.Services.AddSingleton<InMemoryChannelRegistry>();
builder.Services.AddSingleton<IS3Client, InMemoryS3Client>();
builder.Services.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
builder.Services.AddSingleton(typeof(IQueueConsumer<>),
typeof(InMemoryQueueConsumer<>));
builder.Services.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
builder.Services.AddSingleton<ITaskProgressTracker,
InMemoryTaskProgressTracker>();
builder.Services.AddSingleton<ISagaStateSerializer,
JsonSagaStateSerializer>();
// The worker runs as a BackgroundService in the same host
builder.Services.AddHostedService<CreateZipFromFilesWorkerService>();
return builder;
}public static TaskBuilder UseInProcessMode(this TaskBuilder builder)
{
builder.Services.AddSingleton<InMemoryChannelRegistry>();
builder.Services.AddSingleton<IS3Client, InMemoryS3Client>();
builder.Services.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
builder.Services.AddSingleton(typeof(IQueueConsumer<>),
typeof(InMemoryQueueConsumer<>));
builder.Services.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
builder.Services.AddSingleton<ITaskProgressTracker,
InMemoryTaskProgressTracker>();
builder.Services.AddSingleton<ISagaStateSerializer,
JsonSagaStateSerializer>();
// The worker runs as a BackgroundService in the same host
builder.Services.AddHostedService<CreateZipFromFilesWorkerService>();
return builder;
}And UseDistributedMode registers the real clients:
public static TaskBuilder UseDistributedMode(this TaskBuilder builder,
Action<DistributedModeOptions> configure)
{
var options = new DistributedModeOptions();
configure(options);
builder.Services.AddSingleton<IS3Client>(
new MinIOClient(options.S3Endpoint, options.S3AccessKey,
options.S3SecretKey));
builder.Services.AddSingleton<IQueuePublisher>(
new RabbitMqPublisher(options.RabbitMqConnectionString));
builder.Services.AddSingleton(typeof(IQueueConsumer<>),
typeof(RabbitMqConsumer<>));
builder.Services.AddSingleton<IDistributedLock>(
new RedLockDistributedLock(options.RedisConnectionString));
builder.Services.AddSingleton<ITaskProgressTracker>(
new RedisTaskProgressTracker(options.RedisConnectionString));
builder.Services.AddSingleton<ISagaStateSerializer,
JsonSagaStateSerializer>();
return builder;
}public static TaskBuilder UseDistributedMode(this TaskBuilder builder,
Action<DistributedModeOptions> configure)
{
var options = new DistributedModeOptions();
configure(options);
builder.Services.AddSingleton<IS3Client>(
new MinIOClient(options.S3Endpoint, options.S3AccessKey,
options.S3SecretKey));
builder.Services.AddSingleton<IQueuePublisher>(
new RabbitMqPublisher(options.RabbitMqConnectionString));
builder.Services.AddSingleton(typeof(IQueueConsumer<>),
typeof(RabbitMqConsumer<>));
builder.Services.AddSingleton<IDistributedLock>(
new RedLockDistributedLock(options.RedisConnectionString));
builder.Services.AddSingleton<ITaskProgressTracker>(
new RedisTaskProgressTracker(options.RedisConnectionString));
builder.Services.AddSingleton<ISagaStateSerializer,
JsonSagaStateSerializer>();
return builder;
}The generated orchestrator, the generated controller, the generated worker service — none of them know which mode they're in. They depend on the interfaces. The container resolves the right implementation.
You can also override individual services. Want MinIO in development but in-memory queues?
services.AddCreateZipFromFilesTask()
.UseInProcessMode()
.OverrideS3(new MinIOClient("localhost:9000", "dev", "dev"));services.AddCreateZipFromFilesTask()
.UseInProcessMode()
.OverrideS3(new MinIOClient("localhost:9000", "dev", "dev"));Or swap the serializer:
services.AddCreateZipFromFilesTask()
.UseDistributedMode(options => { /* ... */ })
.UseSerializer<MessagePackSagaStateSerializer>();services.AddCreateZipFromFilesTask()
.UseDistributedMode(options => { /* ... */ })
.UseSerializer<MessagePackSagaStateSerializer>();Serialization
The ISagaStateSerializer interface controls how step data, worker messages, and saga state snapshots are serialized. Three implementations ship out of the box:
JSON (System.Text.Json) — The Default
public sealed class JsonSagaStateSerializer : ISagaStateSerializer
{
private static readonly JsonSerializerOptions Options = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
Converters = { new JsonStringEnumConverter() },
};
public byte[] Serialize<T>(T value) where T : class
=> JsonSerializer.SerializeToUtf8Bytes(value, Options);
public T Deserialize<T>(byte[] data) where T : class
=> JsonSerializer.Deserialize<T>(data, Options)
?? throw new InvalidOperationException(
$"Failed to deserialize {typeof(T).Name}.");
}public sealed class JsonSagaStateSerializer : ISagaStateSerializer
{
private static readonly JsonSerializerOptions Options = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
Converters = { new JsonStringEnumConverter() },
};
public byte[] Serialize<T>(T value) where T : class
=> JsonSerializer.SerializeToUtf8Bytes(value, Options);
public T Deserialize<T>(byte[] data) where T : class
=> JsonSerializer.Deserialize<T>(data, Options)
?? throw new InvalidOperationException(
$"Failed to deserialize {typeof(T).Name}.");
}Human-readable. Debuggable — you can inspect queue messages in the RabbitMQ management UI. Good enough for most workloads.
MessagePack — Speed and Size
public sealed class MessagePackSagaStateSerializer : ISagaStateSerializer
{
private static readonly MessagePackSerializerOptions Options =
MessagePackSerializerOptions.Standard
.WithResolver(ContractlessStandardResolver.Instance);
public byte[] Serialize<T>(T value) where T : class
=> MessagePackSerializer.Serialize(value, Options);
public T Deserialize<T>(byte[] data) where T : class
=> MessagePackSerializer.Deserialize<T>(data, Options);
}public sealed class MessagePackSagaStateSerializer : ISagaStateSerializer
{
private static readonly MessagePackSerializerOptions Options =
MessagePackSerializerOptions.Standard
.WithResolver(ContractlessStandardResolver.Instance);
public byte[] Serialize<T>(T value) where T : class
=> MessagePackSerializer.Serialize(value, Options);
public T Deserialize<T>(byte[] data) where T : class
=> MessagePackSerializer.Deserialize<T>(data, Options);
}3-5x faster serialization, 30-50% smaller payloads compared to JSON. The contractless resolver means you don't need [Key] attributes on your step data — it uses property names. Good for high-throughput pipelines where the queue is the bottleneck.
Protobuf — Schema Evolution and Cross-Language
public sealed class ProtobufSagaStateSerializer : ISagaStateSerializer
{
public byte[] Serialize<T>(T value) where T : class, IMessage<T>
{
using var ms = new MemoryStream();
value.WriteTo(ms);
return ms.ToArray();
}
public T Deserialize<T>(byte[] data) where T : class, IMessage<T>, new()
{
var msg = new T();
msg.MergeFrom(data);
return msg;
}
}public sealed class ProtobufSagaStateSerializer : ISagaStateSerializer
{
public byte[] Serialize<T>(T value) where T : class, IMessage<T>
{
using var ms = new MemoryStream();
value.WriteTo(ms);
return ms.ToArray();
}
public T Deserialize<T>(byte[] data) where T : class, IMessage<T>, new()
{
var msg = new T();
msg.MergeFrom(data);
return msg;
}
}Protobuf gives you forward/backward compatible schema evolution and cross-language interop. If your Worker is in Go or Python and your API is in C#, Protobuf is the natural choice. The trade-off: your step data types must implement IMessage<T> (generated from .proto files), which couples you to the Protobuf toolchain.
Choosing
| Concern | JSON | MessagePack | Protobuf |
|---|---|---|---|
| Human-readable | Yes | No | No |
| Serialization speed | Baseline | 3-5x faster | 2-3x faster |
| Payload size | Largest | ~50-70% of JSON | ~30-50% of JSON |
| Schema evolution | Fragile | Fragile | Built-in |
| Cross-language | Yes (text) | Yes (binary) | Yes (binary + codegen) |
| Setup cost | Zero | NuGet package | .proto files + codegen |
| Debuggability | Excellent | Poor | Poor |
Default to JSON. Switch to MessagePack when profiling shows serialization as a bottleneck. Use Protobuf when you need cross-language consumers or guaranteed schema evolution.
Summary
The two-mode architecture is not a convenience feature — it's a correctness feature. InProcess mode ensures that every developer, every CI run, and every demo exercises the same saga logic that runs in production. The abstraction layer (IS3Client, IQueuePublisher, IQueueConsumer<T>, ITaskProgressTracker, IDistributedLock, ISagaStateSerializer) is the contract between the generated orchestrator and the outside world. The generated code never reaches past these interfaces.
The DSL declares the task. The mode declares the infrastructure. They are orthogonal.
What's Next
Part VIII: Resilience — Polly integration per step, idempotency keys, RedLock distributed locking, cancellation with compensation, and S3 orphan reconciliation. The saga can survive anything except kill -9 on every node simultaneously (and even then, reconciliation picks up the pieces).