Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

The ZIP Example

"35 lines of attributes. 20 lines of business logic. Everything else is generated."


The Declaration

This is the complete task definition. Every attribute was introduced in Part II — here they come together for a real use case.

[DistributedTask("CreateZipFromFiles",
    Queue = "file-processing",
    MaxRetries = 2,
    TimeoutSeconds = 600)]
[Cancellable]

// Step 1: Upload source files to S3 (runs in the API process)
[FileUploadStep("UploadSourceFiles", Order = 1,
    Bucket = "incoming-files",
    SourceProperty = "Files")]
[StepPlacement("UploadSourceFiles", Host = StepHost.Api)]

// Step 2: Create the ZIP archive (runs in the Worker process)
[CustomStep("CreateZipArchive", Order = 2)]
[StepPlacement("CreateZipArchive", Host = StepHost.Worker)]
[RetryPolicy("CreateZipArchive",
    MaxRetries = 3,
    BackoffType = BackoffType.Exponential,
    DelayMs = 500)]

// Step 3: Upload the ZIP back to S3 (runs in the Worker process)
[FileUploadStep("UploadZip", Order = 3,
    Bucket = "processed-files",
    SourceProperty = "ZipPath")]
[StepPlacement("UploadZip", Host = StepHost.Worker)]
public partial class CreateZipFromFilesTask { }

Thirty-five lines. Three steps. Two processes. One saga.

The [StepPlacement] attributes split the pipeline: Step 1 runs in the API process (the files arrive as IFormFile from the HTTP request), Steps 2 and 3 run in the Worker process (where the heavy I/O happens). The source generator reads these placements and produces two separate orchestrators — but both share the same kernel.


The Request and Response

Every distributed task needs a typed input and a typed output. These are plain C# classes decorated with [TaskRequest] and [TaskResponse].

TaskRequest

[TaskRequest("CreateZipFromFiles")]
public partial class CreateZipRequest
{
    public List<IFormFile> Files { get; set; } = new();
    public string? OutputFileName { get; set; }
}

Files is the property referenced by SourceProperty = "Files" on the UploadSourceFiles step. The generator validates this reference at compile time — if you rename the property without updating the attribute, you get diagnostic DST005.

The partial keyword is required. The generator injects the ListeningStrategy property:

// Generated: CreateZipRequest.g.cs
public partial class CreateZipRequest
{
    public ListeningStrategy Listening { get; set; } = ListeningStrategy.Polling;
    public string? WebhookUrl { get; set; }
}

TaskResponse

[TaskResponse("CreateZipFromFiles")]
public partial class CreateZipResponse
{
    public string ZipS3Key { get; set; } = "";
    public string ZipS3Bucket { get; set; } = "";
    public long ZipSizeBytes { get; set; }
    public int FileCount { get; set; }
}

The response is what the client receives when the task completes. The developer populates it in the MapResponse override — the generator calls that method after all steps succeed.


Typed Step Data ValueObjects

Each step produces typed inter-step data. Instead of passing values through a Dictionary<string, object> and praying the keys match, the generator emits a ValueObject per step.

UploadSourceFilesStepData

Generated from the [FileUploadStep] definition. The generator knows that a file upload step produces a list of S3 keys and the bucket name.

// Generated: UploadSourceFilesStepData.g.cs
public sealed class UploadSourceFilesStepData
{
    public List<string> UploadedKeys { get; set; } = new();
    public string Bucket { get; set; } = "incoming-files";
}

CreateZipArchiveStepData

Generated from the [CustomStep] definition. For custom steps, the generator analyzes the properties written in the developer's override method and generates the corresponding ValueObject.

// Generated: CreateZipArchiveStepData.g.cs
public sealed class CreateZipArchiveStepData
{
    public string ZipPath { get; set; } = "";
    public long ZipSizeBytes { get; set; }
}

UploadZipStepData

Generated from the second [FileUploadStep]. This step uploads a single file (the ZIP), so the data contains the resulting S3 key and bucket.

// Generated: UploadZipStepData.g.cs
public sealed class UploadZipStepData
{
    public string ZipS3Key { get; set; } = "";
    public string ZipS3Bucket { get; set; } = "processed-files";
}

Accessing Step Data

Step data flows forward. A later step can read the data produced by an earlier step through the SagaContext:

// In the CreateZipArchive step, read the upload results from Step 1
var uploadData = context.GetStepData<UploadSourceFilesStepData>();
var keys = uploadData.UploadedKeys;  // Compile-time checked, IDE auto-complete

No string keys. No casts. No runtime errors from typos. The generic type parameter is the only key, and the compiler enforces it.


The WorkerMessage

The request class contains List<IFormFile> Files — but IFormFile is an ASP.NET Core abstraction tied to the HTTP request. It cannot be serialized and sent through a queue. The generator solves this by producing a WorkerMessage — a serializable projection of the request where IFormFile properties are replaced by the S3 keys from the upload steps.

// Generated: CreateZipFromFilesWorkerMessage.g.cs
public sealed class CreateZipFromFilesWorkerMessage
{
    public string TaskId { get; set; } = "";

    // IFormFile replaced by S3 keys from UploadSourceFiles step
    public List<string> FileS3Keys { get; set; } = new();
    public string FileS3Bucket { get; set; } = "";

    // Non-file properties carried through as-is
    public string? OutputFileName { get; set; }

    // Step data from completed API steps (serialized)
    public UploadSourceFilesStepData UploadSourceFilesData { get; set; } = new();
}

The API orchestrator builds this message after executing all StepHost.Api steps and publishes it to the queue. The Worker orchestrator deserializes it, restores the saga context, and continues from where the API left off.

The developer never defines this class. The generator infers it entirely from the request type and the step placements.


The Orchestrator Kernel

The kernel is the shared base class containing the saga logic common to both API and Worker. It is generated as an abstract class that both orchestrators inherit from.

// Generated: CreateZipOrchestratorKernel.g.cs
public abstract class CreateZipOrchestratorKernel
{
    protected readonly IS3Client _s3;
    protected readonly ITaskRepository _taskRepo;
    protected readonly IDistributedLock _lock;
    protected readonly ITaskProgressTracker _progress;
    protected readonly ISagaStateSerializer _serializer;
    protected readonly ILogger _logger;

    // ── Step execution with retry and audit ──

    protected async Task<StepResult> ExecuteStepWithPoliciesAsync<TStepData>(
        SagaContext<CreateZipRequest> context,
        string stepName,
        int stepOrder,
        Func<SagaContext<CreateZipRequest>, TStepData, CancellationToken, Task<StepResult>> execute,
        IAsyncPolicy? retryPolicy,
        CancellationToken ct)
        where TStepData : new()
    {
        var stepData = new TStepData();
        var record = context.BeginStep(stepName, stepOrder);

        try
        {
            var result = retryPolicy is not null
                ? await retryPolicy.ExecuteAsync(() => execute(context, stepData, ct))
                : await execute(context, stepData, ct);

            record.Complete(result);
            context.SetStepData(stepData);
            await _progress.UpdateAsync(context.TaskId, stepName, StepStatus.Completed);
            await AuditAsync(context, stepName, "Completed");
            return result;
        }
        catch (OperationCanceledException) when (ct.IsCancellationRequested)
        {
            record.Cancel();
            await AuditAsync(context, stepName, "Cancelled");
            throw;
        }
        catch (Exception ex)
        {
            record.Fail(ex);
            await AuditAsync(context, stepName, "Failed", ex.Message);
            throw;
        }
    }

    // ── Compensation: reverse-order execution ──

    protected async Task CompensateAsync(
        SagaContext<CreateZipRequest> context,
        CancellationToken ct)
    {
        var completedSteps = context.CompletedSteps
            .OrderByDescending(s => s.Order)
            .ToList();

        foreach (var step in completedSteps)
        {
            try
            {
                await CompensateStepAsync(context, step.Name, ct);
                await AuditAsync(context, step.Name, "Compensated");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex,
                    "Compensation failed for step {Step} in task {TaskId}",
                    step.Name, context.TaskId);
                await AuditAsync(context, step.Name, "CompensationFailed", ex.Message);
            }
        }
    }

    // ── Step-specific compensation (virtual, overridable) ──

    protected virtual Task CompensateUploadSourceFilesAsync(
        SagaContext<CreateZipRequest> context, CancellationToken ct)
    {
        // Built-in: delete uploaded S3 keys
        var data = context.GetStepData<UploadSourceFilesStepData>();
        return Task.WhenAll(data.UploadedKeys
            .Select(key => _s3.DeleteAsync(data.Bucket, key, ct)));
    }

    protected virtual Task CompensateCreateZipArchiveAsync(
        SagaContext<CreateZipRequest> context, CancellationToken ct)
        => Task.CompletedTask;  // Abstract — developer may override

    protected virtual Task CompensateUploadZipAsync(
        SagaContext<CreateZipRequest> context, CancellationToken ct)
    {
        // Built-in: delete uploaded ZIP from S3
        var data = context.GetStepData<UploadZipStepData>();
        return _s3.DeleteAsync(data.ZipS3Bucket, data.ZipS3Key, ct);
    }

    // ── Audit trail ──

    protected async Task AuditAsync(
        SagaContext<CreateZipRequest> context,
        string stepName,
        string action,
        string? detail = null)
    {
        await _taskRepo.AddAuditEntryAsync(context.TaskId, new TaskAuditEntry
        {
            StepName = stepName,
            Action = action,
            Detail = detail,
            Timestamp = DateTimeOffset.UtcNow,
        });
    }

    // ── Abstract methods the developer must implement ──

    protected abstract Task<StepResult> ExecuteCreateZipArchiveAsync(
        SagaContext<CreateZipRequest> context,
        CreateZipArchiveStepData stepData,
        CancellationToken ct);

    // ── Response mapping ──

    protected abstract CreateZipResponse MapResponse(
        SagaContext<CreateZipRequest> context);
}

Key design decisions:

  • ExecuteStepWithPoliciesAsync wraps every step execution with retry, progress tracking, and audit. The Polly policy is injected per step — only CreateZipArchive has a retry policy in this example.
  • Compensation runs in reverse order. Built-in steps (FileUploadStep) get generated compensation logic (delete the uploaded files). Custom steps get an empty virtual method — the developer overrides it if cleanup is needed.
  • ExecuteCreateZipArchiveAsync is abstract because [CustomStep] means the generator cannot know the business logic. The developer must provide it.
  • MapResponse is abstract because only the developer knows how to assemble the response from the step data.

The API Orchestrator

The API orchestrator executes the steps placed on StepHost.Api, builds the worker message, and publishes it to the queue.

// Generated: CreateZipApiOrchestrator.g.cs
public class CreateZipApiOrchestrator : CreateZipOrchestratorKernel
{
    private readonly IQueuePublisher _queue;

    public async Task<string> SubmitAsync(
        CreateZipRequest request,
        CancellationToken ct)
    {
        var taskId = Guid.NewGuid().ToString();
        var context = SagaContext<CreateZipRequest>.Create(taskId, request);

        await _taskRepo.CreateAsync(context.TaskInstance, ct);
        await _progress.UpdateAsync(taskId, "Task", StepStatus.Submitted);

        try
        {
            // ── Step 1: UploadSourceFiles (StepHost.Api) ──

            await ExecuteStepWithPoliciesAsync<UploadSourceFilesStepData>(
                context,
                stepName: "UploadSourceFiles",
                stepOrder: 1,
                execute: ExecuteUploadSourceFilesAsync,
                retryPolicy: null,
                ct);

            // ── Dispatch to Worker ──

            var message = new CreateZipFromFilesWorkerMessage
            {
                TaskId = taskId,
                FileS3Keys = context.GetStepData<UploadSourceFilesStepData>().UploadedKeys,
                FileS3Bucket = context.GetStepData<UploadSourceFilesStepData>().Bucket,
                OutputFileName = request.OutputFileName,
                UploadSourceFilesData = context.GetStepData<UploadSourceFilesStepData>(),
            };

            await _queue.PublishAsync("file-processing", message, ct);
            await _progress.UpdateAsync(taskId, "Task", StepStatus.Dispatched);
            await AuditAsync(context, "Dispatch", "Published to queue");

            return taskId;
        }
        catch (Exception)
        {
            await CompensateAsync(context, ct);
            await _taskRepo.UpdateStatusAsync(taskId, TaskStatus.Failed, ct);
            throw;
        }
    }

    // Built-in: FileUploadStep has a generated implementation
    private async Task<StepResult> ExecuteUploadSourceFilesAsync(
        SagaContext<CreateZipRequest> context,
        UploadSourceFilesStepData stepData,
        CancellationToken ct)
    {
        var files = context.Request.Files;
        var keys = new List<string>();

        foreach (var file in files)
        {
            var key = $"uploads/{context.TaskId}/{file.FileName}";
            await using var stream = file.OpenReadStream();
            await _s3.UploadAsync("incoming-files", key, stream, ct);
            keys.Add(key);
        }

        stepData.UploadedKeys = keys;
        stepData.Bucket = "incoming-files";
        return StepResult.Success();
    }
}

Notice what the developer did not write:

  • The S3 upload loop for source files — generated from [FileUploadStep]
  • The worker message construction — generated from the request type and step data
  • The queue publish — generated from Queue = "file-processing"
  • The compensation on failure — generated from the kernel
  • The progress tracking — generated from the step structure

The Worker Orchestrator

The Worker orchestrator consumes the message from the queue and executes the remaining steps.

// Generated: CreateZipWorkerOrchestrator.g.cs
public class CreateZipWorkerOrchestrator : CreateZipOrchestratorKernel
{
    private readonly IAsyncPolicy _createZipArchiveRetryPolicy;

    public CreateZipWorkerOrchestrator(/* ... */)
    {
        // Polly policy from [RetryPolicy("CreateZipArchive", ...)]
        _createZipArchiveRetryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 3,
                sleepDurationProvider: attempt =>
                    TimeSpan.FromMilliseconds(500 * Math.Pow(2, attempt - 1)));
    }

    public async Task ProcessAsync(
        CreateZipFromFilesWorkerMessage message,
        CancellationToken ct)
    {
        await using var lockHandle = await _lock.AcquireAsync(
            $"task:{message.TaskId}", TimeSpan.FromSeconds(600), ct);

        var context = await RestoreContextAsync(message, ct);

        try
        {
            // ── Step 2: CreateZipArchive (StepHost.Worker) ──

            await ExecuteStepWithPoliciesAsync<CreateZipArchiveStepData>(
                context,
                stepName: "CreateZipArchive",
                stepOrder: 2,
                execute: ExecuteCreateZipArchiveAsync,  // Developer override
                retryPolicy: _createZipArchiveRetryPolicy,
                ct);

            // ── Step 3: UploadZip (StepHost.Worker) ──

            await ExecuteStepWithPoliciesAsync<UploadZipStepData>(
                context,
                stepName: "UploadZip",
                stepOrder: 3,
                execute: ExecuteUploadZipAsync,
                retryPolicy: null,
                ct);

            // ── Map response and complete ──

            var response = MapResponse(context);
            await _taskRepo.CompleteAsync(context.TaskId, response, ct);
            await _progress.UpdateAsync(context.TaskId, "Task", StepStatus.Completed);
            await AuditAsync(context, "Task", "Completed");
        }
        catch (OperationCanceledException) when (ct.IsCancellationRequested)
        {
            await CompensateAsync(context, ct);
            await _taskRepo.UpdateStatusAsync(message.TaskId, TaskStatus.Cancelled, ct);
        }
        catch (Exception)
        {
            await CompensateAsync(context, ct);
            await _taskRepo.UpdateStatusAsync(message.TaskId, TaskStatus.Failed, ct);
            throw;
        }
    }

    // Built-in: FileUploadStep has a generated implementation
    private async Task<StepResult> ExecuteUploadZipAsync(
        SagaContext<CreateZipRequest> context,
        UploadZipStepData stepData,
        CancellationToken ct)
    {
        var zipArchiveData = context.GetStepData<CreateZipArchiveStepData>();
        var zipKey = $"zips/{context.TaskId}/output.zip";

        await using var stream = File.OpenRead(zipArchiveData.ZipPath);
        await _s3.UploadAsync("processed-files", zipKey, stream, ct);

        stepData.ZipS3Key = zipKey;
        stepData.ZipS3Bucket = "processed-files";
        return StepResult.Success();
    }

    private async Task<SagaContext<CreateZipRequest>> RestoreContextAsync(
        CreateZipFromFilesWorkerMessage message,
        CancellationToken ct)
    {
        var instance = await _taskRepo.FindByIdAsync(message.TaskId, ct);
        var context = SagaContext<CreateZipRequest>.Restore(instance!);

        // Restore step data from API steps
        context.SetStepData(message.UploadSourceFilesData);

        return context;
    }
}

The Worker orchestrator acquires a distributed lock before processing — two workers cannot process the same task. The lock timeout matches TimeoutSeconds = 600 from the task declaration.

The retry policy for CreateZipArchive is compiled from the [RetryPolicy] attribute: 3 retries with exponential backoff starting at 500ms (500ms, 1000ms, 2000ms). If all retries fail, OnRetryExhausted = RetryExhaustedAction.Compensate triggers the compensation chain.


The Developer Override

This is the only code the developer writes. Everything above is generated.

public partial class CreateZipFromFilesTask
{
    protected override async Task<StepResult> ExecuteCreateZipArchiveAsync(
        SagaContext<CreateZipRequest> context,
        CreateZipArchiveStepData stepData,
        CancellationToken ct)
    {
        var uploadData = context.GetStepData<UploadSourceFilesStepData>();
        var downloadDir = Path.Combine(Path.GetTempPath(), context.TaskId);
        Directory.CreateDirectory(downloadDir);

        // Download source files from S3
        foreach (var key in uploadData.UploadedKeys)
        {
            var localPath = Path.Combine(downloadDir, Path.GetFileName(key));
            await _s3.DownloadAsync(uploadData.Bucket, key, localPath, ct);
        }

        // Create the ZIP
        var zipPath = Path.Combine(Path.GetTempPath(),
            context.Request.OutputFileName ?? $"{context.TaskId}.zip");
        ZipFile.CreateFromDirectory(downloadDir, zipPath);

        // Populate typed step data
        stepData.ZipPath = zipPath;
        stepData.ZipSizeBytes = new FileInfo(zipPath).Length;
        return StepResult.Success();
    }

    protected override CreateZipResponse MapResponse(
        SagaContext<CreateZipRequest> context)
    {
        var zipData = context.GetStepData<UploadZipStepData>();
        var archiveData = context.GetStepData<CreateZipArchiveStepData>();

        return new CreateZipResponse
        {
            ZipS3Key = zipData.ZipS3Key,
            ZipS3Bucket = zipData.ZipS3Bucket,
            ZipSizeBytes = archiveData.ZipSizeBytes,
            FileCount = context.GetStepData<UploadSourceFilesStepData>()
                .UploadedKeys.Count,
        };
    }
}

Twenty lines of business logic. The developer:

  1. Downloads the source files using S3 keys from the previous step's typed data
  2. Creates the ZIP archive
  3. Populates the step data ValueObject (ZipPath, ZipSizeBytes)
  4. Maps the response from the accumulated step data

Everything else — retry, compensation, progress tracking, queue dispatch, distributed locking, audit trail, S3 upload/download for built-in steps — is generated.


The Sequence

Diagram

The blue region is the API process. The orange region is the Worker process. The queue is the boundary between them. The database is the single source of truth for task state — both sides read and write to it.


The State Machine

Diagram

Every transition is recorded as an audit entry. Every step result is persisted as a SagaStepRecord. The Compensating state runs compensation in reverse order — Step 3 (delete ZIP from S3), Step 2 (developer override, if provided), Step 1 (delete source files from S3).

Cancellation follows the same path: a DELETE /api/tasks/{id} request (enabled by [Cancellable]) sets the CancellationToken, which interrupts the currently running step and triggers the compensation chain.


What the Developer Writes vs. What Is Generated

Artifact Who Lines
Task declaration (attributes) Developer ~35
CreateZipRequest Developer ~5
CreateZipResponse Developer ~7
ExecuteCreateZipArchiveAsync override Developer ~16
MapResponse override Developer ~12
UploadSourceFilesStepData Generator ~5
CreateZipArchiveStepData Generator ~5
UploadZipStepData Generator ~5
CreateZipFromFilesWorkerMessage Generator ~12
CreateZipOrchestratorKernel Generator ~120
CreateZipApiOrchestrator Generator ~60
CreateZipWorkerOrchestrator Generator ~80
CreateZipConsumer (queue consumer) Generator ~25
CreateZipController (API endpoints) Generator ~80
Polly retry policy Generator ~10
DI registration Generator ~15
Domain events Generator ~20
SignalR hub integration Generator ~15
Developer total ~75
Generated total ~450+

The amplification ratio for this example: roughly 1:6. The developer declares the structure and writes the business logic. The generator handles the distributed systems plumbing.


What's Next

Part IV takes the same DSL and applies it to a more complex scenario — video transcoding with [ParallelStepGroup] for fan-out/fan-in, [StepCondition] for conditional 4K encoding, and differentiated retry policies per transcode resolution.