Testing
"A saga is a state machine with pluggable I/O. Replace the I/O with in-memory fakes, and every edge case becomes a unit test."
Why Sagas Are Testable
Most distributed-system tests require standing up real infrastructure: queues, storage, lock services, databases. The tests are slow, flaky, and port-conflict-prone. CI pipelines grow Docker-in-Docker setups that take longer to boot than the tests take to run.
Sagas as DistributedTask.Dsl defines them are different. The saga class is a callback orchestrator. It calls IS3Client.UploadAsync, IQueuePublisher.PublishAsync, IDistributedLock.TryAcquireAsync, ITaskProgressTracker.UpdateAsync. It never calls MinIO, RabbitMQ, or Redis directly. The infrastructure is behind interfaces.
Replace the interfaces with in-memory implementations, and the saga runs in-process, single-threaded, deterministically. No Docker. No ports. No flake.
In-Memory Infrastructure
Part VII introduced the four in-memory implementations. Here is the test host that wires them up:
public sealed class SagaTestHost : IAsyncDisposable
{
private readonly ServiceProvider _provider;
public IS3Client S3 => _provider.GetRequiredService<IS3Client>();
public ITaskProgressTracker Progress
=> _provider.GetRequiredService<ITaskProgressTracker>();
public SagaTestHost(Action<IServiceCollection>? configure = null)
{
var services = new ServiceCollection();
// ── In-memory infrastructure ──
services.AddSingleton<InMemoryChannelRegistry>();
services.AddSingleton<IS3Client, InMemoryS3Client>();
services.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
services.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
services.AddSingleton<ITaskProgressTracker, InMemoryTaskProgressTracker>();
// ── Generated saga registrations ──
services.AddCreateZipFromFilesSaga();
services.AddVideoTranscodeSaga();
// ── Allow tests to replace services ──
configure?.Invoke(services);
_provider = services.BuildServiceProvider();
}
public T GetService<T>() where T : notnull
=> _provider.GetRequiredService<T>();
public async ValueTask DisposeAsync()
=> await _provider.DisposeAsync();
}public sealed class SagaTestHost : IAsyncDisposable
{
private readonly ServiceProvider _provider;
public IS3Client S3 => _provider.GetRequiredService<IS3Client>();
public ITaskProgressTracker Progress
=> _provider.GetRequiredService<ITaskProgressTracker>();
public SagaTestHost(Action<IServiceCollection>? configure = null)
{
var services = new ServiceCollection();
// ── In-memory infrastructure ──
services.AddSingleton<InMemoryChannelRegistry>();
services.AddSingleton<IS3Client, InMemoryS3Client>();
services.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
services.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
services.AddSingleton<ITaskProgressTracker, InMemoryTaskProgressTracker>();
// ── Generated saga registrations ──
services.AddCreateZipFromFilesSaga();
services.AddVideoTranscodeSaga();
// ── Allow tests to replace services ──
configure?.Invoke(services);
_provider = services.BuildServiceProvider();
}
public T GetService<T>() where T : notnull
=> _provider.GetRequiredService<T>();
public async ValueTask DisposeAsync()
=> await _provider.DisposeAsync();
}The configure callback is the key. Tests that need a FailingS3Client or a CountingS3Client swap out the real in-memory implementation with a test double. The saga never knows the difference.
Happy Path Test
The simplest test: submit a task, let it run, verify the result.
[Fact]
public async Task CreateZip_HappyPath_ReturnsZipKey()
{
// Arrange
await using var host = new SagaTestHost();
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt", "b.txt", "c.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert
Assert.Equal(DistributedTaskStatus.Completed, result.Status);
Assert.NotNull(result.Response);
Assert.Equal("processed-files", result.Response.Bucket);
Assert.EndsWith(".zip", result.Response.Key);
// Verify the ZIP actually exists in the in-memory store
var s3 = host.S3;
var stream = await s3.DownloadAsync("processed-files", result.Response.Key);
Assert.True(stream.Length > 0);
// Verify all steps recorded as Completed
var progress = await host.Progress.GetAsync(result.TaskId);
Assert.All(progress!.Steps.Values, s => Assert.Equal(StepStatus.Completed, s));
}
private static List<IFormFile> CreateTestFiles(params string[] names)
=> names.Select(name =>
{
var bytes = Encoding.UTF8.GetBytes($"content of {name}");
return (IFormFile)new FormFile(
new MemoryStream(bytes), 0, bytes.Length, name, name);
}).ToList();[Fact]
public async Task CreateZip_HappyPath_ReturnsZipKey()
{
// Arrange
await using var host = new SagaTestHost();
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt", "b.txt", "c.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert
Assert.Equal(DistributedTaskStatus.Completed, result.Status);
Assert.NotNull(result.Response);
Assert.Equal("processed-files", result.Response.Bucket);
Assert.EndsWith(".zip", result.Response.Key);
// Verify the ZIP actually exists in the in-memory store
var s3 = host.S3;
var stream = await s3.DownloadAsync("processed-files", result.Response.Key);
Assert.True(stream.Length > 0);
// Verify all steps recorded as Completed
var progress = await host.Progress.GetAsync(result.TaskId);
Assert.All(progress!.Steps.Values, s => Assert.Equal(StepStatus.Completed, s));
}
private static List<IFormFile> CreateTestFiles(params string[] names)
=> names.Select(name =>
{
var bytes = Encoding.UTF8.GetBytes($"content of {name}");
return (IFormFile)new FormFile(
new MemoryStream(bytes), 0, bytes.Length, name, name);
}).ToList();Nothing special here. The test runs in-process, synchronously (from the test's perspective), and completes in milliseconds. No Docker, no ports, no cleanup scripts.
Compensation Chain Test
Force a failure at step N. Verify that steps N-1 through 1 are compensated in reverse order.
The trick: a FailingS3Client that throws on a specific call.
public sealed class FailingS3Client : InMemoryS3Client
{
private int _uploadCount;
private readonly int _failOnUpload;
public FailingS3Client(int failOnUpload) => _failOnUpload = failOnUpload;
public override Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
if (Interlocked.Increment(ref _uploadCount) == _failOnUpload)
throw new IOException("Simulated S3 failure");
return base.UploadAsync(bucket, key, data, ct);
}
}public sealed class FailingS3Client : InMemoryS3Client
{
private int _uploadCount;
private readonly int _failOnUpload;
public FailingS3Client(int failOnUpload) => _failOnUpload = failOnUpload;
public override Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
if (Interlocked.Increment(ref _uploadCount) == _failOnUpload)
throw new IOException("Simulated S3 failure");
return base.UploadAsync(bucket, key, data, ct);
}
}The test:
[Fact]
public async Task CreateZip_StepFails_CompensatesPreviousSteps()
{
// Arrange — fail on the 3rd upload (UploadZip step)
var failingS3 = new FailingS3Client(failOnUpload: 3);
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(failingS3);
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt", "b.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert — task failed
Assert.Equal(DistributedTaskStatus.Failed, result.Status);
Assert.Contains("Simulated S3 failure", result.Error);
// Assert — compensation ran in reverse order
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
// UploadSourceFiles (step 1) — completed then compensated
Assert.Equal(StepStatus.Compensated, steps["UploadSourceFiles"]);
// CreateZipArchive (step 2) — completed then compensated
Assert.Equal(StepStatus.Compensated, steps["CreateZipArchive"]);
// UploadZip (step 3) — failed, not compensated (nothing to undo)
Assert.Equal(StepStatus.Failed, steps["UploadZip"]);
// Verify compensation actually deleted the uploaded source files
var objects = await failingS3.ListObjectsAsync("incoming-files");
Assert.Empty(objects);
}[Fact]
public async Task CreateZip_StepFails_CompensatesPreviousSteps()
{
// Arrange — fail on the 3rd upload (UploadZip step)
var failingS3 = new FailingS3Client(failOnUpload: 3);
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(failingS3);
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt", "b.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert — task failed
Assert.Equal(DistributedTaskStatus.Failed, result.Status);
Assert.Contains("Simulated S3 failure", result.Error);
// Assert — compensation ran in reverse order
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
// UploadSourceFiles (step 1) — completed then compensated
Assert.Equal(StepStatus.Compensated, steps["UploadSourceFiles"]);
// CreateZipArchive (step 2) — completed then compensated
Assert.Equal(StepStatus.Compensated, steps["CreateZipArchive"]);
// UploadZip (step 3) — failed, not compensated (nothing to undo)
Assert.Equal(StepStatus.Failed, steps["UploadZip"]);
// Verify compensation actually deleted the uploaded source files
var objects = await failingS3.ListObjectsAsync("incoming-files");
Assert.Empty(objects);
}The saga uploaded two source files (step 1), created the zip archive (step 2), then failed uploading the zip (step 3). Compensation ran in reverse: step 2's compensation deleted the temp archive, step 1's compensation deleted the uploaded source files. The in-memory S3 store is empty because every artifact was cleaned up.
Compensation Failure Test
What happens when compensation itself throws? The saga must record CompensationFailed status and preserve the TraceId for manual investigation.
public sealed class FailingCompensationS3Client : InMemoryS3Client
{
private int _uploadCount;
private int _deleteCount;
public override Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
// Fail on 3rd upload (UploadZip step)
if (Interlocked.Increment(ref _uploadCount) == 3)
throw new IOException("Upload failed");
return base.UploadAsync(bucket, key, data, ct);
}
public override Task DeleteAsync(string bucket, string key,
CancellationToken ct = default)
{
// Fail on 1st delete (compensation of step 2)
if (Interlocked.Increment(ref _deleteCount) == 1)
throw new IOException("Compensation failed — disk is read-only");
return base.DeleteAsync(bucket, key, ct);
}
}
[Fact]
public async Task CreateZip_CompensationFails_RecordsErrorWithTraceId()
{
// Arrange
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(new FailingCompensationS3Client());
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert
Assert.Equal(DistributedTaskStatus.CompensationFailed, result.Status);
Assert.NotEqual(Guid.Empty, result.TraceId);
// The error message includes both the original failure and the compensation failure
Assert.Contains("Upload failed", result.Error);
Assert.Contains("Compensation failed", result.CompensationError);
// Steps reflect the mixed state
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
Assert.Equal(StepStatus.Compensated, steps["UploadSourceFiles"]);
Assert.Equal(StepStatus.CompensationFailed, steps["CreateZipArchive"]);
Assert.Equal(StepStatus.Failed, steps["UploadZip"]);
}public sealed class FailingCompensationS3Client : InMemoryS3Client
{
private int _uploadCount;
private int _deleteCount;
public override Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
// Fail on 3rd upload (UploadZip step)
if (Interlocked.Increment(ref _uploadCount) == 3)
throw new IOException("Upload failed");
return base.UploadAsync(bucket, key, data, ct);
}
public override Task DeleteAsync(string bucket, string key,
CancellationToken ct = default)
{
// Fail on 1st delete (compensation of step 2)
if (Interlocked.Increment(ref _deleteCount) == 1)
throw new IOException("Compensation failed — disk is read-only");
return base.DeleteAsync(bucket, key, ct);
}
}
[Fact]
public async Task CreateZip_CompensationFails_RecordsErrorWithTraceId()
{
// Arrange
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(new FailingCompensationS3Client());
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert
Assert.Equal(DistributedTaskStatus.CompensationFailed, result.Status);
Assert.NotEqual(Guid.Empty, result.TraceId);
// The error message includes both the original failure and the compensation failure
Assert.Contains("Upload failed", result.Error);
Assert.Contains("Compensation failed", result.CompensationError);
// Steps reflect the mixed state
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
Assert.Equal(StepStatus.Compensated, steps["UploadSourceFiles"]);
Assert.Equal(StepStatus.CompensationFailed, steps["CreateZipArchive"]);
Assert.Equal(StepStatus.Failed, steps["UploadZip"]);
}CompensationFailed is the worst-case status. It means the system has dangling artifacts. The TraceId is logged at Error severity with enough context for an operator to find the orphaned objects and clean them up manually. The saga does not retry compensation — that is a design decision. Retrying a failing delete in a loop risks masking the real problem (permissions, quota, disk failure).
Retry Test
A step fails transiently, then succeeds. Verify the retry count and the final status.
public sealed class CountingS3Client : InMemoryS3Client
{
private int _uploadCount;
private readonly int _failCount;
public int TotalUploadAttempts => _uploadCount;
public CountingS3Client(int failCount) => _failCount = failCount;
public override Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
var attempt = Interlocked.Increment(ref _uploadCount);
// Fail the first N attempts for the "processed-files" bucket (UploadZip step)
if (bucket == "processed-files" && attempt <= _failCount)
throw new IOException($"Transient failure (attempt {attempt})");
return base.UploadAsync(bucket, key, data, ct);
}
}
[Fact]
public async Task CreateZip_TransientFailure_RetriesAndSucceeds()
{
// Arrange — fail twice, succeed on the 3rd attempt
// The UploadZip step has RetryPolicy(MaxRetries = 3)
var countingS3 = new CountingS3Client(failCount: 2);
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(countingS3);
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert — task succeeded despite transient failures
Assert.Equal(DistributedTaskStatus.Completed, result.Status);
Assert.NotNull(result.Response);
// Assert — retries were recorded
var progress = await host.Progress.GetAsync(result.TaskId);
var uploadZipStep = progress!.Steps["UploadZip"];
Assert.Equal(StepStatus.Completed, uploadZipStep);
// The S3 client saw 2 failed uploads + 1 successful upload
// plus the initial source file upload = 4 total
Assert.Equal(4, countingS3.TotalUploadAttempts);
// Assert — retry metadata is captured in the audit trail
var auditEntries = result.AuditTrail
.Where(e => e.StepName == "UploadZip" && e.EventType == "Retry")
.ToList();
Assert.Equal(2, auditEntries.Count);
Assert.Equal(1, auditEntries[0].RetryAttempt);
Assert.Equal(2, auditEntries[1].RetryAttempt);
}public sealed class CountingS3Client : InMemoryS3Client
{
private int _uploadCount;
private readonly int _failCount;
public int TotalUploadAttempts => _uploadCount;
public CountingS3Client(int failCount) => _failCount = failCount;
public override Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
var attempt = Interlocked.Increment(ref _uploadCount);
// Fail the first N attempts for the "processed-files" bucket (UploadZip step)
if (bucket == "processed-files" && attempt <= _failCount)
throw new IOException($"Transient failure (attempt {attempt})");
return base.UploadAsync(bucket, key, data, ct);
}
}
[Fact]
public async Task CreateZip_TransientFailure_RetriesAndSucceeds()
{
// Arrange — fail twice, succeed on the 3rd attempt
// The UploadZip step has RetryPolicy(MaxRetries = 3)
var countingS3 = new CountingS3Client(failCount: 2);
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(countingS3);
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt"),
OutputFileName = "archive.zip",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert — task succeeded despite transient failures
Assert.Equal(DistributedTaskStatus.Completed, result.Status);
Assert.NotNull(result.Response);
// Assert — retries were recorded
var progress = await host.Progress.GetAsync(result.TaskId);
var uploadZipStep = progress!.Steps["UploadZip"];
Assert.Equal(StepStatus.Completed, uploadZipStep);
// The S3 client saw 2 failed uploads + 1 successful upload
// plus the initial source file upload = 4 total
Assert.Equal(4, countingS3.TotalUploadAttempts);
// Assert — retry metadata is captured in the audit trail
var auditEntries = result.AuditTrail
.Where(e => e.StepName == "UploadZip" && e.EventType == "Retry")
.ToList();
Assert.Equal(2, auditEntries.Count);
Assert.Equal(1, auditEntries[0].RetryAttempt);
Assert.Equal(2, auditEntries[1].RetryAttempt);
}The CountingS3Client counts every upload attempt. The saga's generated retry logic (exponential backoff with DelayMs = 500, declared in Part III) kicks in automatically. In test mode, the delay is real but short enough that tests complete fast. If you want instant retries in tests, override the delay in the test host configuration:
services.Configure<CreateZipRetryOptions>(opts =>
{
opts.UploadZip.DelayMs = 0; // No backoff in tests
});services.Configure<CreateZipRetryOptions>(opts =>
{
opts.UploadZip.DelayMs = 0; // No backoff in tests
});Concurrent Worker Test
Two workers race to pick up the same task. The distributed lock ensures only one executes.
[Fact]
public async Task CreateZip_ConcurrentWorkers_OnlyOneExecutes()
{
// Arrange — shared infrastructure, two orchestrators
var sharedServices = new ServiceCollection();
sharedServices.AddSingleton<InMemoryChannelRegistry>();
sharedServices.AddSingleton<IS3Client, InMemoryS3Client>();
sharedServices.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
sharedServices.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
sharedServices.AddSingleton<ITaskProgressTracker, InMemoryTaskProgressTracker>();
sharedServices.AddCreateZipFromFilesSaga();
await using var provider = sharedServices.BuildServiceProvider();
var orchestratorA = provider.GetRequiredService<ICreateZipFromFilesOrchestrator>();
var orchestratorB = provider.GetRequiredService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt"),
OutputFileName = "archive.zip",
};
// Act — both workers try to execute the same task concurrently
var taskId = Guid.NewGuid();
var workerA = orchestratorA.ExecuteAsync(request, taskId, CancellationToken.None);
var workerB = orchestratorB.ExecuteAsync(request, taskId, CancellationToken.None);
var results = await Task.WhenAll(workerA, workerB);
// Assert — exactly one succeeded, the other was rejected
var completed = results.Where(r =>
r.Status == DistributedTaskStatus.Completed).ToList();
var rejected = results.Where(r =>
r.Status == DistributedTaskStatus.Rejected).ToList();
Assert.Single(completed);
Assert.Single(rejected);
// The rejected result carries a clear reason
Assert.Equal("Lock not acquired — another worker is processing this task",
rejected[0].Error);
// Only one ZIP exists in storage
var objects = await provider.GetRequiredService<IS3Client>()
.ListObjectsAsync("processed-files");
Assert.Single(objects);
}[Fact]
public async Task CreateZip_ConcurrentWorkers_OnlyOneExecutes()
{
// Arrange — shared infrastructure, two orchestrators
var sharedServices = new ServiceCollection();
sharedServices.AddSingleton<InMemoryChannelRegistry>();
sharedServices.AddSingleton<IS3Client, InMemoryS3Client>();
sharedServices.AddSingleton<IQueuePublisher, InMemoryQueuePublisher>();
sharedServices.AddSingleton<IDistributedLock, InMemoryDistributedLock>();
sharedServices.AddSingleton<ITaskProgressTracker, InMemoryTaskProgressTracker>();
sharedServices.AddCreateZipFromFilesSaga();
await using var provider = sharedServices.BuildServiceProvider();
var orchestratorA = provider.GetRequiredService<ICreateZipFromFilesOrchestrator>();
var orchestratorB = provider.GetRequiredService<ICreateZipFromFilesOrchestrator>();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt"),
OutputFileName = "archive.zip",
};
// Act — both workers try to execute the same task concurrently
var taskId = Guid.NewGuid();
var workerA = orchestratorA.ExecuteAsync(request, taskId, CancellationToken.None);
var workerB = orchestratorB.ExecuteAsync(request, taskId, CancellationToken.None);
var results = await Task.WhenAll(workerA, workerB);
// Assert — exactly one succeeded, the other was rejected
var completed = results.Where(r =>
r.Status == DistributedTaskStatus.Completed).ToList();
var rejected = results.Where(r =>
r.Status == DistributedTaskStatus.Rejected).ToList();
Assert.Single(completed);
Assert.Single(rejected);
// The rejected result carries a clear reason
Assert.Equal("Lock not acquired — another worker is processing this task",
rejected[0].Error);
// Only one ZIP exists in storage
var objects = await provider.GetRequiredService<IS3Client>()
.ListObjectsAsync("processed-files");
Assert.Single(objects);
}The InMemoryDistributedLock uses SemaphoreSlim(1, 1). The first TryAcquireAsync succeeds, the second returns null. The generated orchestrator treats a null lock as a rejection — no retry, no compensation, just a Rejected status.
Cancellation Test
Submit a task, cancel it mid-saga, verify that completed steps are compensated and pending steps are skipped.
[Fact]
public async Task CreateZip_CancelMidSaga_CompensatesCompleted()
{
// Arrange — slow step 2 so we can cancel during it
var slowS3 = new SlowS3Client(
delayOnBucket: "processed-files",
delay: TimeSpan.FromSeconds(5));
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(slowS3);
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var cts = new CancellationTokenSource();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt", "b.txt"),
OutputFileName = "archive.zip",
};
// Act — start the saga and cancel after step 1 completes
var executeTask = orchestrator.ExecuteAsync(request, cts.Token);
// Wait until step 1 is done (source files uploaded)
await host.Progress.WaitForStepAsync(
orchestrator.CurrentTaskId, "UploadSourceFiles", StepStatus.Completed,
timeout: TimeSpan.FromSeconds(2));
// Cancel while step 2 (CreateZipArchive) is in progress
cts.Cancel();
var result = await executeTask;
// Assert
Assert.Equal(DistributedTaskStatus.Cancelled, result.Status);
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
// Step 1 completed, then compensated (source files cleaned up)
Assert.Equal(StepStatus.Compensated, steps["UploadSourceFiles"]);
// Step 2 was interrupted and compensated
Assert.Equal(StepStatus.Cancelled, steps["CreateZipArchive"]);
// Step 3 never started
Assert.Equal(StepStatus.Skipped, steps["UploadZip"]);
// Verify nothing remains in storage
var incoming = await slowS3.ListObjectsAsync("incoming-files");
var processed = await slowS3.ListObjectsAsync("processed-files");
Assert.Empty(incoming);
Assert.Empty(processed);
}
public sealed class SlowS3Client : InMemoryS3Client
{
private readonly string _delayOnBucket;
private readonly TimeSpan _delay;
public SlowS3Client(string delayOnBucket, TimeSpan delay)
{
_delayOnBucket = delayOnBucket;
_delay = delay;
}
public override async Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
if (bucket == _delayOnBucket)
await Task.Delay(_delay, ct); // Throws OperationCanceledException
await base.UploadAsync(bucket, key, data, ct);
}
}[Fact]
public async Task CreateZip_CancelMidSaga_CompensatesCompleted()
{
// Arrange — slow step 2 so we can cancel during it
var slowS3 = new SlowS3Client(
delayOnBucket: "processed-files",
delay: TimeSpan.FromSeconds(5));
await using var host = new SagaTestHost(services =>
{
services.AddSingleton<IS3Client>(slowS3);
});
var orchestrator = host.GetService<ICreateZipFromFilesOrchestrator>();
var cts = new CancellationTokenSource();
var request = new CreateZipRequest
{
Files = CreateTestFiles("a.txt", "b.txt"),
OutputFileName = "archive.zip",
};
// Act — start the saga and cancel after step 1 completes
var executeTask = orchestrator.ExecuteAsync(request, cts.Token);
// Wait until step 1 is done (source files uploaded)
await host.Progress.WaitForStepAsync(
orchestrator.CurrentTaskId, "UploadSourceFiles", StepStatus.Completed,
timeout: TimeSpan.FromSeconds(2));
// Cancel while step 2 (CreateZipArchive) is in progress
cts.Cancel();
var result = await executeTask;
// Assert
Assert.Equal(DistributedTaskStatus.Cancelled, result.Status);
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
// Step 1 completed, then compensated (source files cleaned up)
Assert.Equal(StepStatus.Compensated, steps["UploadSourceFiles"]);
// Step 2 was interrupted and compensated
Assert.Equal(StepStatus.Cancelled, steps["CreateZipArchive"]);
// Step 3 never started
Assert.Equal(StepStatus.Skipped, steps["UploadZip"]);
// Verify nothing remains in storage
var incoming = await slowS3.ListObjectsAsync("incoming-files");
var processed = await slowS3.ListObjectsAsync("processed-files");
Assert.Empty(incoming);
Assert.Empty(processed);
}
public sealed class SlowS3Client : InMemoryS3Client
{
private readonly string _delayOnBucket;
private readonly TimeSpan _delay;
public SlowS3Client(string delayOnBucket, TimeSpan delay)
{
_delayOnBucket = delayOnBucket;
_delay = delay;
}
public override async Task UploadAsync(string bucket, string key, Stream data,
CancellationToken ct = default)
{
if (bucket == _delayOnBucket)
await Task.Delay(_delay, ct); // Throws OperationCanceledException
await base.UploadAsync(bucket, key, data, ct);
}
}The SlowS3Client introduces artificial delay on a specific bucket. The test cancels the CancellationTokenSource after step 1 completes. The saga's generated kernel catches OperationCanceledException, transitions to the Cancelled status, and runs compensation for every step that reached Completed. Steps that never started are marked Skipped.
Condition Skip Test
From Part IV, the video pipeline has a conditional Transcode4K step that only runs when the source video is 4K or higher. When the condition evaluates to false, the step must be Skipped, not Failed.
[Fact]
public async Task VideoTranscode_LowResSource_Skips4KStep()
{
// Arrange — 1080p source video (below the 4K threshold)
await using var host = new SagaTestHost();
var orchestrator = host.GetService<IVideoTranscodeOrchestrator>();
var request = new VideoTranscodeRequest
{
VideoFile = CreateTestVideo("sample.mp4", width: 1920, height: 1080),
WebhookUrl = "https://example.com/callback",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert — task completed successfully
Assert.Equal(DistributedTaskStatus.Completed, result.Status);
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
// 720p and 1080p transcodes ran
Assert.Equal(StepStatus.Completed, steps["Transcode720p"]);
Assert.Equal(StepStatus.Completed, steps["Transcode1080p"]);
// 4K transcode was skipped — condition evaluated to false
Assert.Equal(StepStatus.Skipped, steps["Transcode4K"]);
// The condition evaluation is recorded in the audit trail
var conditionEntry = result.AuditTrail
.Single(e => e.StepName == "Transcode4K" && e.EventType == "ConditionEvaluated");
Assert.Equal("Source >= 4K", conditionEntry.ConditionExpression);
Assert.False(conditionEntry.ConditionResult);
// All other steps completed normally
Assert.Equal(StepStatus.Completed, steps["UploadVideo"]);
Assert.Equal(StepStatus.Completed, steps["ExtractMetadata"]);
Assert.Equal(StepStatus.Completed, steps["GenerateThumbnails"]);
Assert.Equal(StepStatus.Completed, steps["UpdateCatalog"]);
Assert.Equal(StepStatus.Completed, steps["NotifyWebhook"]);
}
private static IFormFile CreateTestVideo(string name, int width, int height)
{
// Minimal video metadata header — enough for ExtractMetadata step
var metadata = JsonSerializer.SerializeToUtf8Bytes(new { width, height });
return new FormFile(
new MemoryStream(metadata), 0, metadata.Length, name, name)
{
Headers = new HeaderDictionary(),
ContentType = "video/mp4",
};
}[Fact]
public async Task VideoTranscode_LowResSource_Skips4KStep()
{
// Arrange — 1080p source video (below the 4K threshold)
await using var host = new SagaTestHost();
var orchestrator = host.GetService<IVideoTranscodeOrchestrator>();
var request = new VideoTranscodeRequest
{
VideoFile = CreateTestVideo("sample.mp4", width: 1920, height: 1080),
WebhookUrl = "https://example.com/callback",
};
// Act
var result = await orchestrator.ExecuteAsync(request, CancellationToken.None);
// Assert — task completed successfully
Assert.Equal(DistributedTaskStatus.Completed, result.Status);
var progress = await host.Progress.GetAsync(result.TaskId);
var steps = progress!.Steps;
// 720p and 1080p transcodes ran
Assert.Equal(StepStatus.Completed, steps["Transcode720p"]);
Assert.Equal(StepStatus.Completed, steps["Transcode1080p"]);
// 4K transcode was skipped — condition evaluated to false
Assert.Equal(StepStatus.Skipped, steps["Transcode4K"]);
// The condition evaluation is recorded in the audit trail
var conditionEntry = result.AuditTrail
.Single(e => e.StepName == "Transcode4K" && e.EventType == "ConditionEvaluated");
Assert.Equal("Source >= 4K", conditionEntry.ConditionExpression);
Assert.False(conditionEntry.ConditionResult);
// All other steps completed normally
Assert.Equal(StepStatus.Completed, steps["UploadVideo"]);
Assert.Equal(StepStatus.Completed, steps["ExtractMetadata"]);
Assert.Equal(StepStatus.Completed, steps["GenerateThumbnails"]);
Assert.Equal(StepStatus.Completed, steps["UpdateCatalog"]);
Assert.Equal(StepStatus.Completed, steps["NotifyWebhook"]);
}
private static IFormFile CreateTestVideo(string name, int width, int height)
{
// Minimal video metadata header — enough for ExtractMetadata step
var metadata = JsonSerializer.SerializeToUtf8Bytes(new { width, height });
return new FormFile(
new MemoryStream(metadata), 0, metadata.Length, name, name)
{
Headers = new HeaderDictionary(),
ContentType = "video/mp4",
};
}The critical assertion is StepStatus.Skipped, not StepStatus.Failed. A skipped step is not a failure. It does not trigger compensation. It does not count against retry limits. It is a normal, expected outcome recorded in the audit trail with the condition expression that evaluated to false.
Snapshot Testing Generated Code
The tests above verify runtime behavior. Snapshot tests verify the generated source code itself. When a Roslyn source generator changes output unexpectedly, you want to know before it ships.
DistributedTask.Dsl uses the Verify library for snapshot testing. The approach is straightforward: run the generator, capture the output, compare it against a stored .verified.cs file.
[UsesVerify]
public class GeneratorSnapshotTests
{
[Fact]
public Task CreateZip_GeneratesExpectedOrchestrator()
{
// Arrange — the same source code from Part III
var source = """
using DistributedTask.Dsl;
[DistributedTask("CreateZipFromFiles",
Queue = "file-processing",
MaxRetries = 2,
TimeoutSeconds = 600)]
[Cancellable]
[FileUploadStep("UploadSourceFiles", Order = 1,
Bucket = "incoming-files", SourceProperty = "Files")]
[StepPlacement("UploadSourceFiles", Host = StepHost.Api)]
[CustomStep("CreateZipArchive", Order = 2)]
[StepPlacement("CreateZipArchive", Host = StepHost.Worker)]
[RetryPolicy("CreateZipArchive",
MaxRetries = 3, BackoffType = BackoffType.Exponential, DelayMs = 500)]
[FileUploadStep("UploadZip", Order = 3,
Bucket = "processed-files", SourceProperty = "ZipPath")]
[StepPlacement("UploadZip", Host = StepHost.Worker)]
public partial class CreateZipFromFilesTask { }
""";
// Act — run the generator through the Roslyn test harness
var driver = CSharpGeneratorDriver
.Create(new DistributedTaskGenerator())
.RunGenerators(CreateCompilation(source));
// Assert — compare output against stored snapshots
return Verify(driver);
}
[Fact]
public Task VideoTranscode_GeneratesConditionalStep()
{
var source = """
using DistributedTask.Dsl;
[DistributedTask("VideoTranscode",
Queue = "video-processing",
MaxRetries = 1,
TimeoutSeconds = 3600)]
[Cancellable]
[FileUploadStep("UploadVideo", Order = 1,
Bucket = "raw-videos", SourceProperty = "VideoFile")]
[StepPlacement("UploadVideo", Host = StepHost.Api)]
[CustomStep("ExtractMetadata", Order = 2)]
[StepPlacement("ExtractMetadata", Host = StepHost.Worker)]
[ParallelStepGroup("TranscodeAll", Order = 3, AwaitAll = true)]
[SagaStep("Transcode720p", Group = "TranscodeAll", Order = 3)]
[SagaStep("Transcode1080p", Group = "TranscodeAll", Order = 3)]
[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]
[StepCondition("Transcode4K", "Source >= 4K",
nameof(VideoTranscodeTask.IsSource4KOrHigher))]
[CustomStep("GenerateThumbnails", Order = 4)]
[CustomStep("UpdateCatalog", Order = 5)]
[CustomStep("NotifyWebhook", Order = 6)]
public partial class VideoTranscodeTask { }
""";
var driver = CSharpGeneratorDriver
.Create(new DistributedTaskGenerator())
.RunGenerators(CreateCompilation(source));
return Verify(driver);
}
private static CSharpCompilation CreateCompilation(string source)
=> CSharpCompilation.Create("Tests",
new[] { CSharpSyntaxTree.ParseText(source) },
new[]
{
MetadataReference.CreateFromFile(
typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(
typeof(DistributedTaskAttribute).Assembly.Location),
},
new CSharpCompilationOptions(OutputKind.DynamicallyLinkedLibrary));
}[UsesVerify]
public class GeneratorSnapshotTests
{
[Fact]
public Task CreateZip_GeneratesExpectedOrchestrator()
{
// Arrange — the same source code from Part III
var source = """
using DistributedTask.Dsl;
[DistributedTask("CreateZipFromFiles",
Queue = "file-processing",
MaxRetries = 2,
TimeoutSeconds = 600)]
[Cancellable]
[FileUploadStep("UploadSourceFiles", Order = 1,
Bucket = "incoming-files", SourceProperty = "Files")]
[StepPlacement("UploadSourceFiles", Host = StepHost.Api)]
[CustomStep("CreateZipArchive", Order = 2)]
[StepPlacement("CreateZipArchive", Host = StepHost.Worker)]
[RetryPolicy("CreateZipArchive",
MaxRetries = 3, BackoffType = BackoffType.Exponential, DelayMs = 500)]
[FileUploadStep("UploadZip", Order = 3,
Bucket = "processed-files", SourceProperty = "ZipPath")]
[StepPlacement("UploadZip", Host = StepHost.Worker)]
public partial class CreateZipFromFilesTask { }
""";
// Act — run the generator through the Roslyn test harness
var driver = CSharpGeneratorDriver
.Create(new DistributedTaskGenerator())
.RunGenerators(CreateCompilation(source));
// Assert — compare output against stored snapshots
return Verify(driver);
}
[Fact]
public Task VideoTranscode_GeneratesConditionalStep()
{
var source = """
using DistributedTask.Dsl;
[DistributedTask("VideoTranscode",
Queue = "video-processing",
MaxRetries = 1,
TimeoutSeconds = 3600)]
[Cancellable]
[FileUploadStep("UploadVideo", Order = 1,
Bucket = "raw-videos", SourceProperty = "VideoFile")]
[StepPlacement("UploadVideo", Host = StepHost.Api)]
[CustomStep("ExtractMetadata", Order = 2)]
[StepPlacement("ExtractMetadata", Host = StepHost.Worker)]
[ParallelStepGroup("TranscodeAll", Order = 3, AwaitAll = true)]
[SagaStep("Transcode720p", Group = "TranscodeAll", Order = 3)]
[SagaStep("Transcode1080p", Group = "TranscodeAll", Order = 3)]
[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]
[StepCondition("Transcode4K", "Source >= 4K",
nameof(VideoTranscodeTask.IsSource4KOrHigher))]
[CustomStep("GenerateThumbnails", Order = 4)]
[CustomStep("UpdateCatalog", Order = 5)]
[CustomStep("NotifyWebhook", Order = 6)]
public partial class VideoTranscodeTask { }
""";
var driver = CSharpGeneratorDriver
.Create(new DistributedTaskGenerator())
.RunGenerators(CreateCompilation(source));
return Verify(driver);
}
private static CSharpCompilation CreateCompilation(string source)
=> CSharpCompilation.Create("Tests",
new[] { CSharpSyntaxTree.ParseText(source) },
new[]
{
MetadataReference.CreateFromFile(
typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(
typeof(DistributedTaskAttribute).Assembly.Location),
},
new CSharpCompilationOptions(OutputKind.DynamicallyLinkedLibrary));
}When you run this the first time, Verify creates .verified.cs files containing the full generated output. On subsequent runs, any diff causes a test failure with a clear side-by-side comparison. You review the diff, accept it (with dotnet verify accept), and the new snapshot becomes the baseline.
What snapshot tests catch that runtime tests cannot:
- Whitespace and formatting regressions — the generated code is part of the developer's project; unexpected formatting changes are noise in pull requests
- Using directive changes — a generator that adds an unnecessary
using System.Linq;wastes compile time in large solutions - Attribute propagation —
[GeneratedCode]and[EditorBrowsable(Never)]attributes must be present on all generated types - Comment quality — the generated XML doc comments must match the DSL attribute names, not stale placeholder text
Test Organization
The full test suite for a DistributedTask.Dsl project has three layers:
| Layer | What it tests | Speed | Infrastructure |
|---|---|---|---|
| Snapshot tests | Generated source code text | < 1s per test | None (Roslyn in-memory) |
| Saga unit tests | Orchestration logic, compensation, retries, cancellation | < 100ms per test | In-memory fakes |
| Integration tests | Real queues, real storage, real locks | Seconds per test | Docker Compose |
The first two layers run on every commit. The third runs nightly or on PR merge. The ratio should be roughly 80% snapshot + unit, 20% integration.
The in-memory infrastructure from Part VII is not a test-only convenience. It is the same code that powers TaskMode.InProcess in development. Tests and local development share the exact same code paths. If a test passes with InMemoryS3Client, the saga will behave identically in InProcess mode. The only thing integration tests add is confidence that the real MinioS3Client and RabbitMqQueuePublisher wire up correctly.
What's Next
Part XII covers observability — structured logging, OpenTelemetry traces, and metrics dashboards for sagas in production.