Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

The Problem

"You wanted to zip three files. You wrote 600 lines of infrastructure."


The Scenario

A straightforward business requirement:

  1. A client uploads several large files via an API
  2. The files are stored on S3 (MinIO)
  3. A background worker downloads the files, creates a ZIP archive, and uploads it back to S3
  4. The client retrieves the ZIP path when the task completes

This is not exotic. Every enterprise application has some variant — PDF generation, CSV imports, image resizing, report compilation. The pattern is always the same: receive, store, dispatch, process, return.

Let's see what it takes to build this by hand.


Step 1: The API Controller

[ApiController]
[Route("api/files")]
public class FileProcessingController : ControllerBase
{
    private readonly IS3Client _s3;
    private readonly IQueuePublisher _queue;
    private readonly ITaskRepository _taskRepo;

    public FileProcessingController(
        IS3Client s3, IQueuePublisher queue, ITaskRepository taskRepo)
    {
        _s3 = s3;
        _queue = queue;
        _taskRepo = taskRepo;
    }

    [HttpPost("create-zip")]
    public async Task<IActionResult> CreateZipAsync(
        [FromForm] List<IFormFile> files, CancellationToken ct)
    {
        var taskId = Guid.NewGuid().ToString();
        var uploadedKeys = new List<string>();

        // Upload each file to S3
        foreach (var file in files)
        {
            var key = $"uploads/{taskId}/{file.FileName}";
            await using var stream = file.OpenReadStream();
            await _s3.UploadAsync("incoming-files", key, stream, ct);
            uploadedKeys.Add(key);
        }

        // Persist task record
        var task = new FileTask
        {
            Id = taskId,
            Status = "Submitted",
            UploadedKeys = uploadedKeys,
            CreatedAt = DateTimeOffset.UtcNow,
        };
        await _taskRepo.CreateAsync(task, ct);

        // Publish to queue
        await _queue.PublishAsync("file-processing", new CreateZipMessage
        {
            TaskId = taskId,
            S3Keys = uploadedKeys,
            Bucket = "incoming-files",
        }, ct);

        return Accepted(new { taskId });
    }

    [HttpGet("{taskId}/status")]
    public async Task<IActionResult> GetStatusAsync(string taskId, CancellationToken ct)
    {
        var task = await _taskRepo.FindByIdAsync(taskId, ct);
        return task is null ? NotFound() : Ok(new { task.Status, task.ErrorMessage });
    }

    [HttpGet("{taskId}/result")]
    public async Task<IActionResult> GetResultAsync(string taskId, CancellationToken ct)
    {
        var task = await _taskRepo.FindByIdAsync(taskId, ct);
        if (task is null) return NotFound();
        if (task.Status != "Completed") return BadRequest("Task not completed");
        return Ok(new { task.ZipS3Key, task.ZipS3Bucket });
    }
}

That's ~60 lines just for the API. And we haven't handled:

  • What happens if S3 upload fails mid-way (3 of 5 files uploaded)?
  • What happens if the queue publish fails after S3 uploads succeed?
  • How does the client know which step is currently running?
  • What if the client disconnects — polling? WebSocket? SSE?

Step 2: The Worker

public class CreateZipConsumer : IQueueConsumer<CreateZipMessage>
{
    private readonly IS3Client _s3;
    private readonly ITaskRepository _taskRepo;
    private readonly ILogger<CreateZipConsumer> _logger;

    public async Task ConsumeAsync(CreateZipMessage message, CancellationToken ct)
    {
        var task = await _taskRepo.FindByIdAsync(message.TaskId, ct);
        if (task is null) return;

        task.Status = "Running";
        await _taskRepo.UpdateAsync(task, ct);

        var downloadDir = Path.Combine(Path.GetTempPath(), message.TaskId);
        Directory.CreateDirectory(downloadDir);

        try
        {
            // Download files from S3
            foreach (var key in message.S3Keys)
            {
                var localPath = Path.Combine(downloadDir, Path.GetFileName(key));
                await _s3.DownloadAsync(message.Bucket, key, localPath, ct);
            }

            // Create ZIP
            var zipPath = Path.Combine(Path.GetTempPath(), $"{message.TaskId}.zip");
            ZipFile.CreateFromDirectory(downloadDir, zipPath);

            // Upload ZIP to S3
            var zipKey = $"zips/{message.TaskId}/output.zip";
            await using var zipStream = File.OpenRead(zipPath);
            await _s3.UploadAsync("processed-files", zipKey, zipStream, ct);

            // Update task record
            task.Status = "Completed";
            task.ZipS3Key = zipKey;
            task.ZipS3Bucket = "processed-files";
            task.CompletedAt = DateTimeOffset.UtcNow;
            await _taskRepo.UpdateAsync(task, ct);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "ZIP creation failed for task {TaskId}", message.TaskId);
            task.Status = "Failed";
            task.ErrorMessage = ex.Message;
            await _taskRepo.UpdateAsync(task, ct);

            // Cleanup: delete uploaded source files from S3
            foreach (var key in message.S3Keys)
            {
                try { await _s3.DeleteAsync(message.Bucket, key, ct); }
                catch (Exception cleanupEx)
                {
                    _logger.LogWarning(cleanupEx,
                        "Failed to cleanup S3 key {Key}", key);
                }
            }

            // Cleanup: delete local files
            if (Directory.Exists(downloadDir))
                Directory.Delete(downloadDir, recursive: true);
        }
    }
}

Another ~60 lines. And the problems keep piling up:

  • No retry logic — a transient S3 timeout kills the entire task
  • Cleanup is best-effort — if cleanup fails, orphaned files accumulate
  • No idempotency — if the message is delivered twice, we process twice
  • No concurrency control — two workers could pick up the same task
  • No per-step progress — the client only sees "Running" or "Failed"
  • No cancellation — once started, there's no stopping it
  • No audit trail — no record of what happened, when, or why

Step 3: The Missing Infrastructure

To make this production-ready, you also need:

// Task entity + EF Core configuration
public class FileTask { ... }
public class FileTaskConfiguration : IEntityTypeConfiguration<FileTask> { ... }

// Repository
public interface ITaskRepository { ... }
public class TaskRepository : ITaskRepository { ... }

// DbContext
public class TaskDbContext : DbContext { ... }

// DI registration
services.AddDbContext<TaskDbContext>(...);
services.AddScoped<ITaskRepository, TaskRepository>();
services.AddScoped<IQueueConsumer<CreateZipMessage>, CreateZipConsumer>();

// Retry policies
services.AddSingleton<IAsyncPolicy>(Policy
    .Handle<HttpRequestException>()
    .WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt))));

// Distributed lock (so two workers don't process the same task)
services.AddSingleton<IDistributedLock, RedisDistributedLock>();

// Progress tracking (Redis pub/sub for real-time updates)
services.AddSingleton<ITaskProgressTracker, RedisTaskProgressTracker>();

// Health checks
services.AddHealthChecks()
    .AddCheck<QueueHealthCheck>("queue")
    .AddCheck<S3HealthCheck>("s3");

// SignalR hub (if you want real-time updates)
public class TaskProgressHub : Hub { ... }
app.MapHub<TaskProgressHub>("/hubs/tasks");

// Background cleanup service (orphaned S3 objects)
public class OrphanCleanupService : BackgroundService { ... }

The Running Total

Component Lines Purpose
API Controller ~60 Receive files, upload to S3, publish to queue
Worker Consumer ~60 Download, ZIP, upload, update status
Task Entity + Config ~40 EF Core persistence
Repository + Interface ~30 Data access
DbContext + Registration ~20 EF Core wiring
Retry Policies ~15 Polly configuration
Distributed Lock ~30 Concurrency control
Progress Tracker ~40 Real-time status updates
SignalR Hub ~20 WebSocket push
Cleanup Service ~30 S3 orphan reconciliation
DI Registration ~20 Wire everything together
Total ~365 One ZIP task

And this is the minimal version. No SSE, no webhooks, no cancellation, no idempotency, no OpenTelemetry, no audit trail, no archival, no graceful shutdown.


The Real Cost: The Second Task

The first task is annoying but manageable. The real cost hits when you build the second one.

A video transcoding pipeline. An invoice PDF generator. A CSV import processor. Each one needs the same infrastructure — saga orchestration, retry, compensation, progress tracking, locking, cleanup — but wired slightly differently.

You have two choices:

  1. Copy-paste the infrastructure and adapt it. Now you maintain N copies of the same patterns, each with subtle differences and independent bugs.

  2. Abstract the common parts into a framework. Now you maintain a framework — which is exactly what this DSL is, except it runs at compile time instead of runtime.


The Pattern Behind the Boilerplate

Look at the imperative code again. Every distributed task follows the same structure:

1. Receive a request (typed input)
2. Execute ordered steps (some on API, some on Worker)
3. Each step can fail → compensate previous steps in reverse
4. Track progress per step
5. Handle retries per step
6. Ensure only one worker processes a task
7. Produce a typed response
8. Notify the client (polling, SSE, WebSocket, SignalR, webhook)
9. Clean up on failure
10. Archive completed tasks

This structure is data, not logic. It can be declared, not coded.


The Declarative Alternative

What if you could write this instead?

[DistributedTask("CreateZipFromFiles",
    Queue = "file-processing",
    MaxRetries = 2,
    TimeoutSeconds = 600)]
[Cancellable]

[FileUploadStep("UploadSourceFiles", Order = 1,
    Bucket = "incoming-files", SourceProperty = "Files")]
[StepPlacement("UploadSourceFiles", Host = StepHost.Api)]

[CustomStep("CreateZipArchive", Order = 2)]
[StepPlacement("CreateZipArchive", Host = StepHost.Worker)]
[RetryPolicy("CreateZipArchive", MaxRetries = 3,
    BackoffType = BackoffType.Exponential, DelayMs = 500)]

[FileUploadStep("UploadZip", Order = 3,
    Bucket = "processed-files", SourceProperty = "ZipPath")]
[StepPlacement("UploadZip", Host = StepHost.Worker)]
public partial class CreateZipFromFilesTask { }

From these ~20 lines, the source generator produces:

  • A saga orchestrator kernel (shared retry, compensation, audit logic)
  • An API orchestrator (executes API-placed steps, publishes to queue)
  • A worker orchestrator (consumes messages, executes Worker-placed steps)
  • Typed step data ValueObjects (compile-time checked, not JSON bags)
  • A worker message (no IFormFile, just S3 keys)
  • A queue consumer with distributed locking
  • An API controller with submit, status, stream, WebSocket, result, and cancel endpoints
  • A SignalR hub
  • A webhook notifier
  • Polly retry policies per step
  • Domain events
  • An S3 reconciliation service
  • An archive service
  • DI registration

~20 lines of attributes. ~700+ lines of generated, production-ready infrastructure.

The developer writes one thing: the custom business logic for the ZIP creation step.

public partial class CreateZipFromFilesTask
{
    protected override async Task<StepResult> ExecuteCreateZipArchiveAsync(
        SagaContext<CreateZipRequest> context,
        CreateZipArchiveStepData stepData,
        CancellationToken ct)
    {
        var uploadData = context.GetStepData<UploadSourceFilesStepData>();
        var zipPath = Path.Combine(Path.GetTempPath(),
            context.Request.OutputFileName ?? $"{context.TaskId}.zip");

        ZipFile.CreateFromDirectory(
            context.StepData.Get<string>("DownloadDirectory"), zipPath);

        stepData.ZipPath = zipPath;
        stepData.ZipSizeBytes = new FileInfo(zipPath).Length;
        return StepResult.Success();
    }
}

That's it. Everything else is generated.


What's Next

Part II introduces the full attribute surface — every concept, every property, every constraint — and shows how each attribute maps to the M3 meta-metamodel.