Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

The Video Pipeline

"Upload once. Transcode in parallel. Compensate only what succeeded."


Why a Second Example

Part III walked through a linear saga — ordered steps, one at a time, straightforward compensation. Real-world pipelines are rarely that simple.

A video transcoding pipeline hits every advanced feature in the DSL at once:

  • Fan-out — multiple transcode jobs running concurrently
  • Conditional execution — skip 4K if the source is 1080p
  • Differentiated retry — transcode steps get 5 retries with exponential backoff, webhook notification gets 1
  • Partial compensation — if one transcode fails, delete outputs from the ones that succeeded, but leave skipped steps alone
  • Cancellation — a one-hour job needs a kill switch
  • Mixed placement — upload runs on the API, everything else on the Worker

All of this from attributes on a single partial class.


The Declaration

[DistributedTask("VideoTranscode",
    Queue = "video-processing",
    MaxRetries = 1,
    TimeoutSeconds = 3600)]
[Cancellable]

// ── Step 1: Upload the source video (API-side) ──
[FileUploadStep("UploadVideo", Order = 1,
    Bucket = "raw-videos", SourceProperty = "VideoFile")]
[StepPlacement("UploadVideo", Host = StepHost.Api)]

// ── Step 2: Extract metadata with FFprobe (Worker-side) ──
[CustomStep("ExtractMetadata", Order = 2)]
[StepPlacement("ExtractMetadata", Host = StepHost.Worker)]

// ── Step 3: Fan-out — transcode to multiple resolutions ──
[ParallelStepGroup("TranscodeAll", Order = 3, AwaitAll = true)]

[SagaStep("Transcode720p", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode720p", Host = StepHost.Worker)]
[RetryPolicy("Transcode720p", MaxRetries = 5,
    BackoffType = BackoffType.Exponential, DelayMs = 2000)]

[SagaStep("Transcode1080p", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode1080p", Host = StepHost.Worker)]
[RetryPolicy("Transcode1080p", MaxRetries = 5,
    BackoffType = BackoffType.Exponential, DelayMs = 2000)]

[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode4K", Host = StepHost.Worker)]
[RetryPolicy("Transcode4K", MaxRetries = 5,
    BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[StepCondition("Transcode4K", "Source >= 4K", nameof(IsSource4KOrHigher))]

// ── Step 4: Generate preview thumbnails ──
[CustomStep("GenerateThumbnails", Order = 4)]
[StepPlacement("GenerateThumbnails", Host = StepHost.Worker)]

// ── Step 5: Update the content catalog ──
[CustomStep("UpdateCatalog", Order = 5)]
[StepPlacement("UpdateCatalog", Host = StepHost.Worker)]

// ── Step 6: Notify the caller via webhook ──
[CustomStep("NotifyWebhook", Order = 6)]
[StepPlacement("NotifyWebhook", Host = StepHost.Worker)]
[RetryPolicy("NotifyWebhook", MaxRetries = 1,
    BackoffType = BackoffType.Constant, DelayMs = 3000)]
public partial class VideoTranscodeTask { }

Six logical steps. Three of them run in parallel. Two different retry strategies. One conditional gate. One cancellation endpoint. Every concept from Part II is in play.


TaskRequest

[TaskRequest("VideoTranscode")]
public partial class VideoTranscodeRequest
{
    public IFormFile VideoFile { get; set; } = null!;
    public string Title { get; set; } = string.Empty;
    public string? CallbackUrl { get; set; }
}

VideoFile is an IFormFile — it lives on the API side. The generator replaces it with an S3 key in the VideoTranscodeWorkerMessage:

// Generated — no IFormFile, just the S3 key from UploadVideo
public sealed class VideoTranscodeWorkerMessage
{
    public string TaskId { get; set; } = null!;
    public string VideoFileS3Key { get; set; } = null!;
    public string Title { get; set; } = string.Empty;
    public string? CallbackUrl { get; set; }
}

TaskResponse

[TaskResponse("VideoTranscode")]
public class VideoTranscodeResponse
{
    public Dictionary<string, string> TranscodedKeys { get; set; } = new();
    public List<string> ThumbnailKeys { get; set; } = new();
    public string CatalogEntryId { get; set; } = string.Empty;
}

TranscodedKeys maps resolution to S3 key: { "720p": "transcoded/abc/720p.mp4", "1080p": "transcoded/abc/1080p.mp4" }. Skipped resolutions are simply absent from the dictionary.


StepCondition in Detail

The Transcode4K step has a [StepCondition] — it only runs if the source video is 4K or higher. The condition method reads typed step data from a previous step:

public partial class VideoTranscodeTask
{
    private bool IsSource4KOrHigher(SagaContext<VideoTranscodeRequest> context)
    {
        var metadata = context.GetStepData<ExtractMetadataStepData>();
        return metadata.Width >= 3840 && metadata.Height >= 2160;
    }
}

ExtractMetadataStepData is a generated ValueObject populated by the ExtractMetadata step. The condition reads it type-safely — no string keys, no casts.

The Snapshot

When the generator evaluates a condition, the result is persisted as a StepConditionSnapshot:

// Generated — persisted to database for audit
public sealed class StepConditionSnapshot
{
    public string StepName { get; set; } = null!;       // "Transcode4K"
    public string Description { get; set; } = null!;    // "Source >= 4K"
    public string MethodName { get; set; } = null!;     // "IsSource4KOrHigher"
    public bool Result { get; set; }                     // false
    public DateTimeOffset EvaluatedAt { get; set; }      // 2026-04-03T14:22:07Z
    public string? ContextSummary { get; set; }          // "Width=1920, Height=1080"
}

The snapshot answers the question that always comes up in production: "Why was this step skipped?" The answer is in the database, with a timestamp and the input values that produced the decision.

If the condition returns false, the step transitions to Skipped — not Failed. Skipped steps are excluded from compensation.


Fan-Out Generated Code

The [ParallelStepGroup("TranscodeAll", AwaitAll = true)] attribute tells the generator to run the group members concurrently. Here is the generated ExecuteParallelGroupAsync method inside the orchestrator kernel:

// Generated — VideoTranscodeOrchestratorKernel.g.cs
private async Task ExecuteParallelGroup_TranscodeAll_Async(
    SagaContext<VideoTranscodeRequest> context,
    CancellationToken ct)
{
    var tasks = new List<(string StepName, Task<StepResult> Execution)>();
    var completedSteps = new List<string>();

    // ── Evaluate conditions and launch ──

    // Transcode720p — no condition, always runs
    tasks.Add(("Transcode720p",
        ExecuteWithRetryAsync("Transcode720p",
            () => ExecuteTranscode720pAsync(context,
                context.GetOrCreateStepData<Transcode720pStepData>(), ct),
            _retryPolicy_Transcode720p, context, ct)));

    // Transcode1080p — no condition, always runs
    tasks.Add(("Transcode1080p",
        ExecuteWithRetryAsync("Transcode1080p",
            () => ExecuteTranscode1080pAsync(context,
                context.GetOrCreateStepData<Transcode1080pStepData>(), ct),
            _retryPolicy_Transcode1080p, context, ct)));

    // Transcode4K — conditional
    var condition_Transcode4K = IsSource4KOrHigher(context);
    await SnapshotConditionAsync(context, new StepConditionSnapshot
    {
        StepName = "Transcode4K",
        Description = "Source >= 4K",
        MethodName = "IsSource4KOrHigher",
        Result = condition_Transcode4K,
        EvaluatedAt = DateTimeOffset.UtcNow,
        ContextSummary = BuildConditionSummary_Transcode4K(context),
    });

    if (condition_Transcode4K)
    {
        tasks.Add(("Transcode4K",
            ExecuteWithRetryAsync("Transcode4K",
                () => ExecuteTranscode4KAsync(context,
                    context.GetOrCreateStepData<Transcode4KStepData>(), ct),
                _retryPolicy_Transcode4K, context, ct)));
    }
    else
    {
        await RecordStepSkippedAsync(context, "Transcode4K");
    }

    // ── Await all and handle failures ──

    var results = new Dictionary<string, StepResult>();
    var exceptions = new List<(string StepName, Exception Exception)>();

    foreach (var (stepName, execution) in tasks)
    {
        try
        {
            results[stepName] = await execution;
            if (results[stepName].Status == StepStatus.Completed)
                completedSteps.Add(stepName);
        }
        catch (Exception ex)
        {
            exceptions.Add((stepName, ex));
        }
    }

    // ── If any failed, compensate completed ones ──

    if (exceptions.Count > 0)
    {
        // Compensate in reverse completion order
        completedSteps.Reverse();
        foreach (var stepName in completedSteps)
        {
            await CompensateStepAsync(context, stepName, ct);
        }

        // Skipped steps are NOT compensated — nothing to undo

        throw new ParallelStepGroupException("TranscodeAll", exceptions);
    }
}

Key details:

  1. Condition evaluation happens before launch. The Transcode4K condition is evaluated, snapshotted, and if false, the step is recorded as Skipped and never added to the task list.

  2. Task.WhenAll semantics via individual awaits. Each task is awaited independently so the orchestrator can track which steps completed and which failed. (The generated code uses a sequential foreach over the task list — the tasks are already running concurrently from the moment they are created.)

  3. Compensation targets only completed steps. If Transcode1080p throws after Transcode720p completed, only Transcode720p is compensated. Skipped steps (Transcode4K when source is 1080p) are not touched.

  4. Reverse completion order. Compensation runs in the reverse order of successful completion — matching the saga pattern of undoing the most recent work first.


Partial Compensation

This is the scenario that makes parallel sagas tricky. Consider a 1080p source video:

Step Condition Outcome
Transcode720p none Completed
Transcode1080p none Failed (FFmpeg crash)
Transcode4K IsSource4KOrHigher = false Skipped

The compensation sequence:

1. Transcode720p  → CompensateTranscode720pAsync()  → delete 720p output from S3
2. Transcode1080p → (failed, nothing to compensate)
3. Transcode4K    → (skipped, nothing to compensate)

Only the 720p output gets deleted. The 1080p step never produced an output (it failed mid-transcode). The 4K step never ran. No wasted deletes, no missing cleanup.

The generated compensator:

// Generated — dispatches to the developer's override
private async Task CompensateStepAsync(
    SagaContext<VideoTranscodeRequest> context,
    string stepName,
    CancellationToken ct)
{
    switch (stepName)
    {
        case "Transcode720p":
            await CompensateTranscode720pAsync(context,
                context.GetStepData<Transcode720pStepData>(), ct);
            break;
        case "Transcode1080p":
            await CompensateTranscode1080pAsync(context,
                context.GetStepData<Transcode1080pStepData>(), ct);
            break;
        case "Transcode4K":
            await CompensateTranscode4KAsync(context,
                context.GetStepData<Transcode4KStepData>(), ct);
            break;
        // ... other steps
    }

    await RecordStepCompensatedAsync(context, stepName);
}

The orchestrator only calls into this method for steps in the completedSteps list. The developer's compensation code can assume the step data is fully populated — the step ran to completion before compensation was triggered.


Developer Overrides

The generator emits abstract methods for every [CustomStep] and [SagaStep]. The developer provides the business logic in the partial class.

ExtractMetadata — FFprobe

public partial class VideoTranscodeTask
{
    protected override async Task<StepResult> ExecuteExtractMetadataAsync(
        SagaContext<VideoTranscodeRequest> context,
        ExtractMetadataStepData stepData,
        CancellationToken ct)
    {
        var uploadData = context.GetStepData<UploadVideoStepData>();
        var localPath = await _s3.DownloadToTempAsync(
            "raw-videos", uploadData.S3Key, ct);

        var probe = await _ffprobe.AnalyzeAsync(localPath, ct);

        stepData.Width = probe.PrimaryVideoStream!.Width;
        stepData.Height = probe.PrimaryVideoStream!.Height;
        stepData.DurationSeconds = probe.Duration.TotalSeconds;
        stepData.Codec = probe.PrimaryVideoStream.CodecName;
        stepData.FileSizeBytes = new FileInfo(localPath).Length;

        return StepResult.Success();
    }
}

ExtractMetadataStepData is a generated ValueObject. The properties (Width, Height, DurationSeconds, etc.) are inferred from assignments in this method by the source generator. Later steps — including the IsSource4KOrHigher condition — access these values type-safely.

Transcode720p — FFmpeg

protected override async Task<StepResult> ExecuteTranscode720pAsync(
    SagaContext<VideoTranscodeRequest> context,
    Transcode720pStepData stepData,
    CancellationToken ct)
{
    var uploadData = context.GetStepData<UploadVideoStepData>();
    var localSource = await _s3.DownloadToTempAsync(
        "raw-videos", uploadData.S3Key, ct);

    var outputPath = Path.Combine(Path.GetTempPath(),
        $"{context.TaskId}_720p.mp4");

    await _ffmpeg.TranscodeAsync(localSource, outputPath, new TranscodeOptions
    {
        Width = 1280,
        Height = 720,
        Preset = "medium",
        Crf = 23,
    }, ct);

    var s3Key = $"transcoded/{context.TaskId}/720p.mp4";
    await _s3.UploadAsync("transcoded-videos", s3Key, outputPath, ct);

    stepData.S3Key = s3Key;
    stepData.S3Bucket = "transcoded-videos";
    stepData.FileSizeBytes = new FileInfo(outputPath).Length;

    return StepResult.Success();
}

The 1080p and 4K overrides follow the same pattern with different resolution parameters. The RetryPolicy (5 retries, exponential backoff starting at 2 seconds) wraps each execution automatically — the developer never writes retry logic.

CompensateTranscode720p — S3 Cleanup

protected override async Task CompensateTranscode720pAsync(
    SagaContext<VideoTranscodeRequest> context,
    Transcode720pStepData stepData,
    CancellationToken ct)
{
    if (!string.IsNullOrEmpty(stepData.S3Key))
    {
        await _s3.DeleteAsync(stepData.S3Bucket, stepData.S3Key, ct);
    }
}

The compensation is simple because the step data tells us exactly what was created. No guessing, no pattern-matching on S3 key prefixes. The Transcode720pStepData.S3Key was set during execution — if the step completed, the key is populated.


Differentiated Retry

Not all steps deserve the same retry budget. FFmpeg transcoding is expensive but transient failures happen (disk I/O, memory pressure). Webhook notifications are cheap but the remote server might be down.

The declaration makes this explicit:

Step MaxRetries Backoff Delay Rationale
Transcode720p 5 Exponential 2000ms Heavy compute, transient failures likely
Transcode1080p 5 Exponential 2000ms Same
Transcode4K 5 Exponential 2000ms Same
NotifyWebhook 1 Constant 3000ms Fast call, if it fails twice it's not transient

The generator emits distinct Polly policies for each:

// Generated — retry policies
private static readonly IAsyncPolicy _retryPolicy_Transcode720p =
    Policy.Handle<Exception>()
        .WaitAndRetryAsync(5, attempt =>
            TimeSpan.FromMilliseconds(2000 * Math.Pow(2, attempt - 1)));

private static readonly IAsyncPolicy _retryPolicy_NotifyWebhook =
    Policy.Handle<Exception>()
        .WaitAndRetryAsync(1, _ => TimeSpan.FromMilliseconds(3000));

Steps without a [RetryPolicy] attribute inherit the task-level MaxRetries = 1 from the [DistributedTask] attribute.


The Flow

Diagram

The diagram shows the fan-out at order 3 and the fan-in before order 4. The condition gate on Transcode4K is evaluated after ExtractMetadata populates the resolution data. If the source is 1080p, the 4K branch goes directly to Skipped — no task is created, no resources are consumed, and the condition snapshot records exactly why.


The Full Picture

From the developer's perspective, this pipeline requires:

  • ~40 lines of attribute declarations (the partial class header)
  • ~15 lines for the request and response classes
  • ~60 lines of step overrides (ExtractMetadata, three transcodes, GenerateThumbnails, UpdateCatalog, NotifyWebhook)
  • ~15 lines of compensation overrides (one per transcode step)
  • ~5 lines for the condition method

~135 lines total. The generator produces the orchestrator kernel, parallel group execution, condition evaluation and snapshotting, Polly policies, compensation dispatch, the API controller, the queue consumer, the worker message, typed step data ValueObjects, progress tracking, cancellation handling, DI registration, and audit logging.

Everything the developer writes is business logic — FFprobe calls, FFmpeg parameters, S3 key conventions, catalog API calls. Everything the generator writes is infrastructure — the saga pattern, the retry loop, the compensation sequence, the fan-out/fan-in synchronization.


What's Next

Part V introduces the persistence layer — the DistributedTaskInstance aggregate root, SagaStepRecord entities, StepConditionSnapshot storage, and TaskAuditEntry logging — all modeled with Entity.Dsl and generated by the same source generation pipeline.