The Scenario
A straightforward business requirement:
- A client uploads several large files via an API
- The files are stored on S3 (MinIO)
- A background worker downloads the files, creates a ZIP archive, and uploads it back to S3
- The client retrieves the ZIP path when the task completes
This is not exotic. Every enterprise application has some variant — PDF generation, CSV imports, image resizing, report compilation. The pattern is always the same: receive, store, dispatch, process, return.
Let's see what it takes to build this by hand.
Step 1: The API Controller
[ApiController]
[Route("api/files")]
public class FileProcessingController : ControllerBase
{
private readonly IS3Client _s3;
private readonly IQueuePublisher _queue;
private readonly ITaskRepository _taskRepo;
public FileProcessingController(
IS3Client s3, IQueuePublisher queue, ITaskRepository taskRepo)
{
_s3 = s3;
_queue = queue;
_taskRepo = taskRepo;
}
[HttpPost("create-zip")]
public async Task<IActionResult> CreateZipAsync(
[FromForm] List<IFormFile> files, CancellationToken ct)
{
var taskId = Guid.NewGuid().ToString();
var uploadedKeys = new List<string>();
// Upload each file to S3
foreach (var file in files)
{
var key = $"uploads/{taskId}/{file.FileName}";
await using var stream = file.OpenReadStream();
await _s3.UploadAsync("incoming-files", key, stream, ct);
uploadedKeys.Add(key);
}
// Persist task record
var task = new FileTask
{
Id = taskId,
Status = "Submitted",
UploadedKeys = uploadedKeys,
CreatedAt = DateTimeOffset.UtcNow,
};
await _taskRepo.CreateAsync(task, ct);
// Publish to queue
await _queue.PublishAsync("file-processing", new CreateZipMessage
{
TaskId = taskId,
S3Keys = uploadedKeys,
Bucket = "incoming-files",
}, ct);
return Accepted(new { taskId });
}
[HttpGet("{taskId}/status")]
public async Task<IActionResult> GetStatusAsync(string taskId, CancellationToken ct)
{
var task = await _taskRepo.FindByIdAsync(taskId, ct);
return task is null ? NotFound() : Ok(new { task.Status, task.ErrorMessage });
}
[HttpGet("{taskId}/result")]
public async Task<IActionResult> GetResultAsync(string taskId, CancellationToken ct)
{
var task = await _taskRepo.FindByIdAsync(taskId, ct);
if (task is null) return NotFound();
if (task.Status != "Completed") return BadRequest("Task not completed");
return Ok(new { task.ZipS3Key, task.ZipS3Bucket });
}
}[ApiController]
[Route("api/files")]
public class FileProcessingController : ControllerBase
{
private readonly IS3Client _s3;
private readonly IQueuePublisher _queue;
private readonly ITaskRepository _taskRepo;
public FileProcessingController(
IS3Client s3, IQueuePublisher queue, ITaskRepository taskRepo)
{
_s3 = s3;
_queue = queue;
_taskRepo = taskRepo;
}
[HttpPost("create-zip")]
public async Task<IActionResult> CreateZipAsync(
[FromForm] List<IFormFile> files, CancellationToken ct)
{
var taskId = Guid.NewGuid().ToString();
var uploadedKeys = new List<string>();
// Upload each file to S3
foreach (var file in files)
{
var key = $"uploads/{taskId}/{file.FileName}";
await using var stream = file.OpenReadStream();
await _s3.UploadAsync("incoming-files", key, stream, ct);
uploadedKeys.Add(key);
}
// Persist task record
var task = new FileTask
{
Id = taskId,
Status = "Submitted",
UploadedKeys = uploadedKeys,
CreatedAt = DateTimeOffset.UtcNow,
};
await _taskRepo.CreateAsync(task, ct);
// Publish to queue
await _queue.PublishAsync("file-processing", new CreateZipMessage
{
TaskId = taskId,
S3Keys = uploadedKeys,
Bucket = "incoming-files",
}, ct);
return Accepted(new { taskId });
}
[HttpGet("{taskId}/status")]
public async Task<IActionResult> GetStatusAsync(string taskId, CancellationToken ct)
{
var task = await _taskRepo.FindByIdAsync(taskId, ct);
return task is null ? NotFound() : Ok(new { task.Status, task.ErrorMessage });
}
[HttpGet("{taskId}/result")]
public async Task<IActionResult> GetResultAsync(string taskId, CancellationToken ct)
{
var task = await _taskRepo.FindByIdAsync(taskId, ct);
if (task is null) return NotFound();
if (task.Status != "Completed") return BadRequest("Task not completed");
return Ok(new { task.ZipS3Key, task.ZipS3Bucket });
}
}That's ~60 lines just for the API. And we haven't handled:
- What happens if S3 upload fails mid-way (3 of 5 files uploaded)?
- What happens if the queue publish fails after S3 uploads succeed?
- How does the client know which step is currently running?
- What if the client disconnects — polling? WebSocket? SSE?
Step 2: The Worker
public class CreateZipConsumer : IQueueConsumer<CreateZipMessage>
{
private readonly IS3Client _s3;
private readonly ITaskRepository _taskRepo;
private readonly ILogger<CreateZipConsumer> _logger;
public async Task ConsumeAsync(CreateZipMessage message, CancellationToken ct)
{
var task = await _taskRepo.FindByIdAsync(message.TaskId, ct);
if (task is null) return;
task.Status = "Running";
await _taskRepo.UpdateAsync(task, ct);
var downloadDir = Path.Combine(Path.GetTempPath(), message.TaskId);
Directory.CreateDirectory(downloadDir);
try
{
// Download files from S3
foreach (var key in message.S3Keys)
{
var localPath = Path.Combine(downloadDir, Path.GetFileName(key));
await _s3.DownloadAsync(message.Bucket, key, localPath, ct);
}
// Create ZIP
var zipPath = Path.Combine(Path.GetTempPath(), $"{message.TaskId}.zip");
ZipFile.CreateFromDirectory(downloadDir, zipPath);
// Upload ZIP to S3
var zipKey = $"zips/{message.TaskId}/output.zip";
await using var zipStream = File.OpenRead(zipPath);
await _s3.UploadAsync("processed-files", zipKey, zipStream, ct);
// Update task record
task.Status = "Completed";
task.ZipS3Key = zipKey;
task.ZipS3Bucket = "processed-files";
task.CompletedAt = DateTimeOffset.UtcNow;
await _taskRepo.UpdateAsync(task, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "ZIP creation failed for task {TaskId}", message.TaskId);
task.Status = "Failed";
task.ErrorMessage = ex.Message;
await _taskRepo.UpdateAsync(task, ct);
// Cleanup: delete uploaded source files from S3
foreach (var key in message.S3Keys)
{
try { await _s3.DeleteAsync(message.Bucket, key, ct); }
catch (Exception cleanupEx)
{
_logger.LogWarning(cleanupEx,
"Failed to cleanup S3 key {Key}", key);
}
}
// Cleanup: delete local files
if (Directory.Exists(downloadDir))
Directory.Delete(downloadDir, recursive: true);
}
}
}public class CreateZipConsumer : IQueueConsumer<CreateZipMessage>
{
private readonly IS3Client _s3;
private readonly ITaskRepository _taskRepo;
private readonly ILogger<CreateZipConsumer> _logger;
public async Task ConsumeAsync(CreateZipMessage message, CancellationToken ct)
{
var task = await _taskRepo.FindByIdAsync(message.TaskId, ct);
if (task is null) return;
task.Status = "Running";
await _taskRepo.UpdateAsync(task, ct);
var downloadDir = Path.Combine(Path.GetTempPath(), message.TaskId);
Directory.CreateDirectory(downloadDir);
try
{
// Download files from S3
foreach (var key in message.S3Keys)
{
var localPath = Path.Combine(downloadDir, Path.GetFileName(key));
await _s3.DownloadAsync(message.Bucket, key, localPath, ct);
}
// Create ZIP
var zipPath = Path.Combine(Path.GetTempPath(), $"{message.TaskId}.zip");
ZipFile.CreateFromDirectory(downloadDir, zipPath);
// Upload ZIP to S3
var zipKey = $"zips/{message.TaskId}/output.zip";
await using var zipStream = File.OpenRead(zipPath);
await _s3.UploadAsync("processed-files", zipKey, zipStream, ct);
// Update task record
task.Status = "Completed";
task.ZipS3Key = zipKey;
task.ZipS3Bucket = "processed-files";
task.CompletedAt = DateTimeOffset.UtcNow;
await _taskRepo.UpdateAsync(task, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "ZIP creation failed for task {TaskId}", message.TaskId);
task.Status = "Failed";
task.ErrorMessage = ex.Message;
await _taskRepo.UpdateAsync(task, ct);
// Cleanup: delete uploaded source files from S3
foreach (var key in message.S3Keys)
{
try { await _s3.DeleteAsync(message.Bucket, key, ct); }
catch (Exception cleanupEx)
{
_logger.LogWarning(cleanupEx,
"Failed to cleanup S3 key {Key}", key);
}
}
// Cleanup: delete local files
if (Directory.Exists(downloadDir))
Directory.Delete(downloadDir, recursive: true);
}
}
}Another ~60 lines. And the problems keep piling up:
- No retry logic — a transient S3 timeout kills the entire task
- Cleanup is best-effort — if cleanup fails, orphaned files accumulate
- No idempotency — if the message is delivered twice, we process twice
- No concurrency control — two workers could pick up the same task
- No per-step progress — the client only sees "Running" or "Failed"
- No cancellation — once started, there's no stopping it
- No audit trail — no record of what happened, when, or why
Step 3: The Missing Infrastructure
To make this production-ready, you also need:
// Task entity + EF Core configuration
public class FileTask { ... }
public class FileTaskConfiguration : IEntityTypeConfiguration<FileTask> { ... }
// Repository
public interface ITaskRepository { ... }
public class TaskRepository : ITaskRepository { ... }
// DbContext
public class TaskDbContext : DbContext { ... }
// DI registration
services.AddDbContext<TaskDbContext>(...);
services.AddScoped<ITaskRepository, TaskRepository>();
services.AddScoped<IQueueConsumer<CreateZipMessage>, CreateZipConsumer>();
// Retry policies
services.AddSingleton<IAsyncPolicy>(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt))));
// Distributed lock (so two workers don't process the same task)
services.AddSingleton<IDistributedLock, RedisDistributedLock>();
// Progress tracking (Redis pub/sub for real-time updates)
services.AddSingleton<ITaskProgressTracker, RedisTaskProgressTracker>();
// Health checks
services.AddHealthChecks()
.AddCheck<QueueHealthCheck>("queue")
.AddCheck<S3HealthCheck>("s3");
// SignalR hub (if you want real-time updates)
public class TaskProgressHub : Hub { ... }
app.MapHub<TaskProgressHub>("/hubs/tasks");
// Background cleanup service (orphaned S3 objects)
public class OrphanCleanupService : BackgroundService { ... }// Task entity + EF Core configuration
public class FileTask { ... }
public class FileTaskConfiguration : IEntityTypeConfiguration<FileTask> { ... }
// Repository
public interface ITaskRepository { ... }
public class TaskRepository : ITaskRepository { ... }
// DbContext
public class TaskDbContext : DbContext { ... }
// DI registration
services.AddDbContext<TaskDbContext>(...);
services.AddScoped<ITaskRepository, TaskRepository>();
services.AddScoped<IQueueConsumer<CreateZipMessage>, CreateZipConsumer>();
// Retry policies
services.AddSingleton<IAsyncPolicy>(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt))));
// Distributed lock (so two workers don't process the same task)
services.AddSingleton<IDistributedLock, RedisDistributedLock>();
// Progress tracking (Redis pub/sub for real-time updates)
services.AddSingleton<ITaskProgressTracker, RedisTaskProgressTracker>();
// Health checks
services.AddHealthChecks()
.AddCheck<QueueHealthCheck>("queue")
.AddCheck<S3HealthCheck>("s3");
// SignalR hub (if you want real-time updates)
public class TaskProgressHub : Hub { ... }
app.MapHub<TaskProgressHub>("/hubs/tasks");
// Background cleanup service (orphaned S3 objects)
public class OrphanCleanupService : BackgroundService { ... }The Running Total
| Component | Lines | Purpose |
|---|---|---|
| API Controller | ~60 | Receive files, upload to S3, publish to queue |
| Worker Consumer | ~60 | Download, ZIP, upload, update status |
| Task Entity + Config | ~40 | EF Core persistence |
| Repository + Interface | ~30 | Data access |
| DbContext + Registration | ~20 | EF Core wiring |
| Retry Policies | ~15 | Polly configuration |
| Distributed Lock | ~30 | Concurrency control |
| Progress Tracker | ~40 | Real-time status updates |
| SignalR Hub | ~20 | WebSocket push |
| Cleanup Service | ~30 | S3 orphan reconciliation |
| DI Registration | ~20 | Wire everything together |
| Total | ~365 | One ZIP task |
And this is the minimal version. No SSE, no webhooks, no cancellation, no idempotency, no OpenTelemetry, no audit trail, no archival, no graceful shutdown.
The Real Cost: The Second Task
The first task is annoying but manageable. The real cost hits when you build the second one.
A video transcoding pipeline. An invoice PDF generator. A CSV import processor. Each one needs the same infrastructure — saga orchestration, retry, compensation, progress tracking, locking, cleanup — but wired slightly differently.
You have two choices:
Copy-paste the infrastructure and adapt it. Now you maintain N copies of the same patterns, each with subtle differences and independent bugs.
Abstract the common parts into a framework. Now you maintain a framework — which is exactly what this DSL is, except it runs at compile time instead of runtime.
The Pattern Behind the Boilerplate
Look at the imperative code again. Every distributed task follows the same structure:
1. Receive a request (typed input)
2. Execute ordered steps (some on API, some on Worker)
3. Each step can fail → compensate previous steps in reverse
4. Track progress per step
5. Handle retries per step
6. Ensure only one worker processes a task
7. Produce a typed response
8. Notify the client (polling, SSE, WebSocket, SignalR, webhook)
9. Clean up on failure
10. Archive completed tasks1. Receive a request (typed input)
2. Execute ordered steps (some on API, some on Worker)
3. Each step can fail → compensate previous steps in reverse
4. Track progress per step
5. Handle retries per step
6. Ensure only one worker processes a task
7. Produce a typed response
8. Notify the client (polling, SSE, WebSocket, SignalR, webhook)
9. Clean up on failure
10. Archive completed tasksThis structure is data, not logic. It can be declared, not coded.
The Declarative Alternative
What if you could write this instead?
[DistributedTask("CreateZipFromFiles",
Queue = "file-processing",
MaxRetries = 2,
TimeoutSeconds = 600)]
[Cancellable]
[FileUploadStep("UploadSourceFiles", Order = 1,
Bucket = "incoming-files", SourceProperty = "Files")]
[StepPlacement("UploadSourceFiles", Host = StepHost.Api)]
[CustomStep("CreateZipArchive", Order = 2)]
[StepPlacement("CreateZipArchive", Host = StepHost.Worker)]
[RetryPolicy("CreateZipArchive", MaxRetries = 3,
BackoffType = BackoffType.Exponential, DelayMs = 500)]
[FileUploadStep("UploadZip", Order = 3,
Bucket = "processed-files", SourceProperty = "ZipPath")]
[StepPlacement("UploadZip", Host = StepHost.Worker)]
public partial class CreateZipFromFilesTask { }[DistributedTask("CreateZipFromFiles",
Queue = "file-processing",
MaxRetries = 2,
TimeoutSeconds = 600)]
[Cancellable]
[FileUploadStep("UploadSourceFiles", Order = 1,
Bucket = "incoming-files", SourceProperty = "Files")]
[StepPlacement("UploadSourceFiles", Host = StepHost.Api)]
[CustomStep("CreateZipArchive", Order = 2)]
[StepPlacement("CreateZipArchive", Host = StepHost.Worker)]
[RetryPolicy("CreateZipArchive", MaxRetries = 3,
BackoffType = BackoffType.Exponential, DelayMs = 500)]
[FileUploadStep("UploadZip", Order = 3,
Bucket = "processed-files", SourceProperty = "ZipPath")]
[StepPlacement("UploadZip", Host = StepHost.Worker)]
public partial class CreateZipFromFilesTask { }From these ~20 lines, the source generator produces:
- A saga orchestrator kernel (shared retry, compensation, audit logic)
- An API orchestrator (executes API-placed steps, publishes to queue)
- A worker orchestrator (consumes messages, executes Worker-placed steps)
- Typed step data ValueObjects (compile-time checked, not JSON bags)
- A worker message (no
IFormFile, just S3 keys) - A queue consumer with distributed locking
- An API controller with submit, status, stream, WebSocket, result, and cancel endpoints
- A SignalR hub
- A webhook notifier
- Polly retry policies per step
- Domain events
- An S3 reconciliation service
- An archive service
- DI registration
~20 lines of attributes. ~700+ lines of generated, production-ready infrastructure.
The developer writes one thing: the custom business logic for the ZIP creation step.
public partial class CreateZipFromFilesTask
{
protected override async Task<StepResult> ExecuteCreateZipArchiveAsync(
SagaContext<CreateZipRequest> context,
CreateZipArchiveStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadSourceFilesStepData>();
var zipPath = Path.Combine(Path.GetTempPath(),
context.Request.OutputFileName ?? $"{context.TaskId}.zip");
ZipFile.CreateFromDirectory(
context.StepData.Get<string>("DownloadDirectory"), zipPath);
stepData.ZipPath = zipPath;
stepData.ZipSizeBytes = new FileInfo(zipPath).Length;
return StepResult.Success();
}
}public partial class CreateZipFromFilesTask
{
protected override async Task<StepResult> ExecuteCreateZipArchiveAsync(
SagaContext<CreateZipRequest> context,
CreateZipArchiveStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadSourceFilesStepData>();
var zipPath = Path.Combine(Path.GetTempPath(),
context.Request.OutputFileName ?? $"{context.TaskId}.zip");
ZipFile.CreateFromDirectory(
context.StepData.Get<string>("DownloadDirectory"), zipPath);
stepData.ZipPath = zipPath;
stepData.ZipSizeBytes = new FileInfo(zipPath).Length;
return StepResult.Success();
}
}That's it. Everything else is generated.
What's Next
Part II introduces the full attribute surface — every concept, every property, every constraint — and shows how each attribute maps to the M3 meta-metamodel.