Why a Second Example
Part III walked through a linear saga — ordered steps, one at a time, straightforward compensation. Real-world pipelines are rarely that simple.
A video transcoding pipeline hits every advanced feature in the DSL at once:
- Fan-out — multiple transcode jobs running concurrently
- Conditional execution — skip 4K if the source is 1080p
- Differentiated retry — transcode steps get 5 retries with exponential backoff, webhook notification gets 1
- Partial compensation — if one transcode fails, delete outputs from the ones that succeeded, but leave skipped steps alone
- Cancellation — a one-hour job needs a kill switch
- Mixed placement — upload runs on the API, everything else on the Worker
All of this from attributes on a single partial class.
The Declaration
[DistributedTask("VideoTranscode",
Queue = "video-processing",
MaxRetries = 1,
TimeoutSeconds = 3600)]
[Cancellable]
// ── Step 1: Upload the source video (API-side) ──
[FileUploadStep("UploadVideo", Order = 1,
Bucket = "raw-videos", SourceProperty = "VideoFile")]
[StepPlacement("UploadVideo", Host = StepHost.Api)]
// ── Step 2: Extract metadata with FFprobe (Worker-side) ──
[CustomStep("ExtractMetadata", Order = 2)]
[StepPlacement("ExtractMetadata", Host = StepHost.Worker)]
// ── Step 3: Fan-out — transcode to multiple resolutions ──
[ParallelStepGroup("TranscodeAll", Order = 3, AwaitAll = true)]
[SagaStep("Transcode720p", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode720p", Host = StepHost.Worker)]
[RetryPolicy("Transcode720p", MaxRetries = 5,
BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[SagaStep("Transcode1080p", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode1080p", Host = StepHost.Worker)]
[RetryPolicy("Transcode1080p", MaxRetries = 5,
BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode4K", Host = StepHost.Worker)]
[RetryPolicy("Transcode4K", MaxRetries = 5,
BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[StepCondition("Transcode4K", "Source >= 4K", nameof(IsSource4KOrHigher))]
// ── Step 4: Generate preview thumbnails ──
[CustomStep("GenerateThumbnails", Order = 4)]
[StepPlacement("GenerateThumbnails", Host = StepHost.Worker)]
// ── Step 5: Update the content catalog ──
[CustomStep("UpdateCatalog", Order = 5)]
[StepPlacement("UpdateCatalog", Host = StepHost.Worker)]
// ── Step 6: Notify the caller via webhook ──
[CustomStep("NotifyWebhook", Order = 6)]
[StepPlacement("NotifyWebhook", Host = StepHost.Worker)]
[RetryPolicy("NotifyWebhook", MaxRetries = 1,
BackoffType = BackoffType.Constant, DelayMs = 3000)]
public partial class VideoTranscodeTask { }[DistributedTask("VideoTranscode",
Queue = "video-processing",
MaxRetries = 1,
TimeoutSeconds = 3600)]
[Cancellable]
// ── Step 1: Upload the source video (API-side) ──
[FileUploadStep("UploadVideo", Order = 1,
Bucket = "raw-videos", SourceProperty = "VideoFile")]
[StepPlacement("UploadVideo", Host = StepHost.Api)]
// ── Step 2: Extract metadata with FFprobe (Worker-side) ──
[CustomStep("ExtractMetadata", Order = 2)]
[StepPlacement("ExtractMetadata", Host = StepHost.Worker)]
// ── Step 3: Fan-out — transcode to multiple resolutions ──
[ParallelStepGroup("TranscodeAll", Order = 3, AwaitAll = true)]
[SagaStep("Transcode720p", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode720p", Host = StepHost.Worker)]
[RetryPolicy("Transcode720p", MaxRetries = 5,
BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[SagaStep("Transcode1080p", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode1080p", Host = StepHost.Worker)]
[RetryPolicy("Transcode1080p", MaxRetries = 5,
BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]
[StepPlacement("Transcode4K", Host = StepHost.Worker)]
[RetryPolicy("Transcode4K", MaxRetries = 5,
BackoffType = BackoffType.Exponential, DelayMs = 2000)]
[StepCondition("Transcode4K", "Source >= 4K", nameof(IsSource4KOrHigher))]
// ── Step 4: Generate preview thumbnails ──
[CustomStep("GenerateThumbnails", Order = 4)]
[StepPlacement("GenerateThumbnails", Host = StepHost.Worker)]
// ── Step 5: Update the content catalog ──
[CustomStep("UpdateCatalog", Order = 5)]
[StepPlacement("UpdateCatalog", Host = StepHost.Worker)]
// ── Step 6: Notify the caller via webhook ──
[CustomStep("NotifyWebhook", Order = 6)]
[StepPlacement("NotifyWebhook", Host = StepHost.Worker)]
[RetryPolicy("NotifyWebhook", MaxRetries = 1,
BackoffType = BackoffType.Constant, DelayMs = 3000)]
public partial class VideoTranscodeTask { }Six logical steps. Three of them run in parallel. Two different retry strategies. One conditional gate. One cancellation endpoint. Every concept from Part II is in play.
TaskRequest
[TaskRequest("VideoTranscode")]
public partial class VideoTranscodeRequest
{
public IFormFile VideoFile { get; set; } = null!;
public string Title { get; set; } = string.Empty;
public string? CallbackUrl { get; set; }
}[TaskRequest("VideoTranscode")]
public partial class VideoTranscodeRequest
{
public IFormFile VideoFile { get; set; } = null!;
public string Title { get; set; } = string.Empty;
public string? CallbackUrl { get; set; }
}VideoFile is an IFormFile — it lives on the API side. The generator replaces it with an S3 key in the VideoTranscodeWorkerMessage:
// Generated — no IFormFile, just the S3 key from UploadVideo
public sealed class VideoTranscodeWorkerMessage
{
public string TaskId { get; set; } = null!;
public string VideoFileS3Key { get; set; } = null!;
public string Title { get; set; } = string.Empty;
public string? CallbackUrl { get; set; }
}// Generated — no IFormFile, just the S3 key from UploadVideo
public sealed class VideoTranscodeWorkerMessage
{
public string TaskId { get; set; } = null!;
public string VideoFileS3Key { get; set; } = null!;
public string Title { get; set; } = string.Empty;
public string? CallbackUrl { get; set; }
}TaskResponse
[TaskResponse("VideoTranscode")]
public class VideoTranscodeResponse
{
public Dictionary<string, string> TranscodedKeys { get; set; } = new();
public List<string> ThumbnailKeys { get; set; } = new();
public string CatalogEntryId { get; set; } = string.Empty;
}[TaskResponse("VideoTranscode")]
public class VideoTranscodeResponse
{
public Dictionary<string, string> TranscodedKeys { get; set; } = new();
public List<string> ThumbnailKeys { get; set; } = new();
public string CatalogEntryId { get; set; } = string.Empty;
}TranscodedKeys maps resolution to S3 key: { "720p": "transcoded/abc/720p.mp4", "1080p": "transcoded/abc/1080p.mp4" }. Skipped resolutions are simply absent from the dictionary.
StepCondition in Detail
The Transcode4K step has a [StepCondition] — it only runs if the source video is 4K or higher. The condition method reads typed step data from a previous step:
public partial class VideoTranscodeTask
{
private bool IsSource4KOrHigher(SagaContext<VideoTranscodeRequest> context)
{
var metadata = context.GetStepData<ExtractMetadataStepData>();
return metadata.Width >= 3840 && metadata.Height >= 2160;
}
}public partial class VideoTranscodeTask
{
private bool IsSource4KOrHigher(SagaContext<VideoTranscodeRequest> context)
{
var metadata = context.GetStepData<ExtractMetadataStepData>();
return metadata.Width >= 3840 && metadata.Height >= 2160;
}
}ExtractMetadataStepData is a generated ValueObject populated by the ExtractMetadata step. The condition reads it type-safely — no string keys, no casts.
The Snapshot
When the generator evaluates a condition, the result is persisted as a StepConditionSnapshot:
// Generated — persisted to database for audit
public sealed class StepConditionSnapshot
{
public string StepName { get; set; } = null!; // "Transcode4K"
public string Description { get; set; } = null!; // "Source >= 4K"
public string MethodName { get; set; } = null!; // "IsSource4KOrHigher"
public bool Result { get; set; } // false
public DateTimeOffset EvaluatedAt { get; set; } // 2026-04-03T14:22:07Z
public string? ContextSummary { get; set; } // "Width=1920, Height=1080"
}// Generated — persisted to database for audit
public sealed class StepConditionSnapshot
{
public string StepName { get; set; } = null!; // "Transcode4K"
public string Description { get; set; } = null!; // "Source >= 4K"
public string MethodName { get; set; } = null!; // "IsSource4KOrHigher"
public bool Result { get; set; } // false
public DateTimeOffset EvaluatedAt { get; set; } // 2026-04-03T14:22:07Z
public string? ContextSummary { get; set; } // "Width=1920, Height=1080"
}The snapshot answers the question that always comes up in production: "Why was this step skipped?" The answer is in the database, with a timestamp and the input values that produced the decision.
If the condition returns false, the step transitions to Skipped — not Failed. Skipped steps are excluded from compensation.
Fan-Out Generated Code
The [ParallelStepGroup("TranscodeAll", AwaitAll = true)] attribute tells the generator to run the group members concurrently. Here is the generated ExecuteParallelGroupAsync method inside the orchestrator kernel:
// Generated — VideoTranscodeOrchestratorKernel.g.cs
private async Task ExecuteParallelGroup_TranscodeAll_Async(
SagaContext<VideoTranscodeRequest> context,
CancellationToken ct)
{
var tasks = new List<(string StepName, Task<StepResult> Execution)>();
var completedSteps = new List<string>();
// ── Evaluate conditions and launch ──
// Transcode720p — no condition, always runs
tasks.Add(("Transcode720p",
ExecuteWithRetryAsync("Transcode720p",
() => ExecuteTranscode720pAsync(context,
context.GetOrCreateStepData<Transcode720pStepData>(), ct),
_retryPolicy_Transcode720p, context, ct)));
// Transcode1080p — no condition, always runs
tasks.Add(("Transcode1080p",
ExecuteWithRetryAsync("Transcode1080p",
() => ExecuteTranscode1080pAsync(context,
context.GetOrCreateStepData<Transcode1080pStepData>(), ct),
_retryPolicy_Transcode1080p, context, ct)));
// Transcode4K — conditional
var condition_Transcode4K = IsSource4KOrHigher(context);
await SnapshotConditionAsync(context, new StepConditionSnapshot
{
StepName = "Transcode4K",
Description = "Source >= 4K",
MethodName = "IsSource4KOrHigher",
Result = condition_Transcode4K,
EvaluatedAt = DateTimeOffset.UtcNow,
ContextSummary = BuildConditionSummary_Transcode4K(context),
});
if (condition_Transcode4K)
{
tasks.Add(("Transcode4K",
ExecuteWithRetryAsync("Transcode4K",
() => ExecuteTranscode4KAsync(context,
context.GetOrCreateStepData<Transcode4KStepData>(), ct),
_retryPolicy_Transcode4K, context, ct)));
}
else
{
await RecordStepSkippedAsync(context, "Transcode4K");
}
// ── Await all and handle failures ──
var results = new Dictionary<string, StepResult>();
var exceptions = new List<(string StepName, Exception Exception)>();
foreach (var (stepName, execution) in tasks)
{
try
{
results[stepName] = await execution;
if (results[stepName].Status == StepStatus.Completed)
completedSteps.Add(stepName);
}
catch (Exception ex)
{
exceptions.Add((stepName, ex));
}
}
// ── If any failed, compensate completed ones ──
if (exceptions.Count > 0)
{
// Compensate in reverse completion order
completedSteps.Reverse();
foreach (var stepName in completedSteps)
{
await CompensateStepAsync(context, stepName, ct);
}
// Skipped steps are NOT compensated — nothing to undo
throw new ParallelStepGroupException("TranscodeAll", exceptions);
}
}// Generated — VideoTranscodeOrchestratorKernel.g.cs
private async Task ExecuteParallelGroup_TranscodeAll_Async(
SagaContext<VideoTranscodeRequest> context,
CancellationToken ct)
{
var tasks = new List<(string StepName, Task<StepResult> Execution)>();
var completedSteps = new List<string>();
// ── Evaluate conditions and launch ──
// Transcode720p — no condition, always runs
tasks.Add(("Transcode720p",
ExecuteWithRetryAsync("Transcode720p",
() => ExecuteTranscode720pAsync(context,
context.GetOrCreateStepData<Transcode720pStepData>(), ct),
_retryPolicy_Transcode720p, context, ct)));
// Transcode1080p — no condition, always runs
tasks.Add(("Transcode1080p",
ExecuteWithRetryAsync("Transcode1080p",
() => ExecuteTranscode1080pAsync(context,
context.GetOrCreateStepData<Transcode1080pStepData>(), ct),
_retryPolicy_Transcode1080p, context, ct)));
// Transcode4K — conditional
var condition_Transcode4K = IsSource4KOrHigher(context);
await SnapshotConditionAsync(context, new StepConditionSnapshot
{
StepName = "Transcode4K",
Description = "Source >= 4K",
MethodName = "IsSource4KOrHigher",
Result = condition_Transcode4K,
EvaluatedAt = DateTimeOffset.UtcNow,
ContextSummary = BuildConditionSummary_Transcode4K(context),
});
if (condition_Transcode4K)
{
tasks.Add(("Transcode4K",
ExecuteWithRetryAsync("Transcode4K",
() => ExecuteTranscode4KAsync(context,
context.GetOrCreateStepData<Transcode4KStepData>(), ct),
_retryPolicy_Transcode4K, context, ct)));
}
else
{
await RecordStepSkippedAsync(context, "Transcode4K");
}
// ── Await all and handle failures ──
var results = new Dictionary<string, StepResult>();
var exceptions = new List<(string StepName, Exception Exception)>();
foreach (var (stepName, execution) in tasks)
{
try
{
results[stepName] = await execution;
if (results[stepName].Status == StepStatus.Completed)
completedSteps.Add(stepName);
}
catch (Exception ex)
{
exceptions.Add((stepName, ex));
}
}
// ── If any failed, compensate completed ones ──
if (exceptions.Count > 0)
{
// Compensate in reverse completion order
completedSteps.Reverse();
foreach (var stepName in completedSteps)
{
await CompensateStepAsync(context, stepName, ct);
}
// Skipped steps are NOT compensated — nothing to undo
throw new ParallelStepGroupException("TranscodeAll", exceptions);
}
}Key details:
Condition evaluation happens before launch. The
Transcode4Kcondition is evaluated, snapshotted, and iffalse, the step is recorded asSkippedand never added to the task list.Task.WhenAllsemantics via individual awaits. Each task is awaited independently so the orchestrator can track which steps completed and which failed. (The generated code uses a sequentialforeachover the task list — the tasks are already running concurrently from the moment they are created.)Compensation targets only completed steps. If
Transcode1080pthrows afterTranscode720pcompleted, onlyTranscode720pis compensated. Skipped steps (Transcode4Kwhen source is 1080p) are not touched.Reverse completion order. Compensation runs in the reverse order of successful completion — matching the saga pattern of undoing the most recent work first.
Partial Compensation
This is the scenario that makes parallel sagas tricky. Consider a 1080p source video:
| Step | Condition | Outcome |
|---|---|---|
| Transcode720p | none | Completed |
| Transcode1080p | none | Failed (FFmpeg crash) |
| Transcode4K | IsSource4KOrHigher = false |
Skipped |
The compensation sequence:
1. Transcode720p → CompensateTranscode720pAsync() → delete 720p output from S3
2. Transcode1080p → (failed, nothing to compensate)
3. Transcode4K → (skipped, nothing to compensate)1. Transcode720p → CompensateTranscode720pAsync() → delete 720p output from S3
2. Transcode1080p → (failed, nothing to compensate)
3. Transcode4K → (skipped, nothing to compensate)Only the 720p output gets deleted. The 1080p step never produced an output (it failed mid-transcode). The 4K step never ran. No wasted deletes, no missing cleanup.
The generated compensator:
// Generated — dispatches to the developer's override
private async Task CompensateStepAsync(
SagaContext<VideoTranscodeRequest> context,
string stepName,
CancellationToken ct)
{
switch (stepName)
{
case "Transcode720p":
await CompensateTranscode720pAsync(context,
context.GetStepData<Transcode720pStepData>(), ct);
break;
case "Transcode1080p":
await CompensateTranscode1080pAsync(context,
context.GetStepData<Transcode1080pStepData>(), ct);
break;
case "Transcode4K":
await CompensateTranscode4KAsync(context,
context.GetStepData<Transcode4KStepData>(), ct);
break;
// ... other steps
}
await RecordStepCompensatedAsync(context, stepName);
}// Generated — dispatches to the developer's override
private async Task CompensateStepAsync(
SagaContext<VideoTranscodeRequest> context,
string stepName,
CancellationToken ct)
{
switch (stepName)
{
case "Transcode720p":
await CompensateTranscode720pAsync(context,
context.GetStepData<Transcode720pStepData>(), ct);
break;
case "Transcode1080p":
await CompensateTranscode1080pAsync(context,
context.GetStepData<Transcode1080pStepData>(), ct);
break;
case "Transcode4K":
await CompensateTranscode4KAsync(context,
context.GetStepData<Transcode4KStepData>(), ct);
break;
// ... other steps
}
await RecordStepCompensatedAsync(context, stepName);
}The orchestrator only calls into this method for steps in the completedSteps list. The developer's compensation code can assume the step data is fully populated — the step ran to completion before compensation was triggered.
Developer Overrides
The generator emits abstract methods for every [CustomStep] and [SagaStep]. The developer provides the business logic in the partial class.
ExtractMetadata — FFprobe
public partial class VideoTranscodeTask
{
protected override async Task<StepResult> ExecuteExtractMetadataAsync(
SagaContext<VideoTranscodeRequest> context,
ExtractMetadataStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadVideoStepData>();
var localPath = await _s3.DownloadToTempAsync(
"raw-videos", uploadData.S3Key, ct);
var probe = await _ffprobe.AnalyzeAsync(localPath, ct);
stepData.Width = probe.PrimaryVideoStream!.Width;
stepData.Height = probe.PrimaryVideoStream!.Height;
stepData.DurationSeconds = probe.Duration.TotalSeconds;
stepData.Codec = probe.PrimaryVideoStream.CodecName;
stepData.FileSizeBytes = new FileInfo(localPath).Length;
return StepResult.Success();
}
}public partial class VideoTranscodeTask
{
protected override async Task<StepResult> ExecuteExtractMetadataAsync(
SagaContext<VideoTranscodeRequest> context,
ExtractMetadataStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadVideoStepData>();
var localPath = await _s3.DownloadToTempAsync(
"raw-videos", uploadData.S3Key, ct);
var probe = await _ffprobe.AnalyzeAsync(localPath, ct);
stepData.Width = probe.PrimaryVideoStream!.Width;
stepData.Height = probe.PrimaryVideoStream!.Height;
stepData.DurationSeconds = probe.Duration.TotalSeconds;
stepData.Codec = probe.PrimaryVideoStream.CodecName;
stepData.FileSizeBytes = new FileInfo(localPath).Length;
return StepResult.Success();
}
}ExtractMetadataStepData is a generated ValueObject. The properties (Width, Height, DurationSeconds, etc.) are inferred from assignments in this method by the source generator. Later steps — including the IsSource4KOrHigher condition — access these values type-safely.
Transcode720p — FFmpeg
protected override async Task<StepResult> ExecuteTranscode720pAsync(
SagaContext<VideoTranscodeRequest> context,
Transcode720pStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadVideoStepData>();
var localSource = await _s3.DownloadToTempAsync(
"raw-videos", uploadData.S3Key, ct);
var outputPath = Path.Combine(Path.GetTempPath(),
$"{context.TaskId}_720p.mp4");
await _ffmpeg.TranscodeAsync(localSource, outputPath, new TranscodeOptions
{
Width = 1280,
Height = 720,
Preset = "medium",
Crf = 23,
}, ct);
var s3Key = $"transcoded/{context.TaskId}/720p.mp4";
await _s3.UploadAsync("transcoded-videos", s3Key, outputPath, ct);
stepData.S3Key = s3Key;
stepData.S3Bucket = "transcoded-videos";
stepData.FileSizeBytes = new FileInfo(outputPath).Length;
return StepResult.Success();
}protected override async Task<StepResult> ExecuteTranscode720pAsync(
SagaContext<VideoTranscodeRequest> context,
Transcode720pStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadVideoStepData>();
var localSource = await _s3.DownloadToTempAsync(
"raw-videos", uploadData.S3Key, ct);
var outputPath = Path.Combine(Path.GetTempPath(),
$"{context.TaskId}_720p.mp4");
await _ffmpeg.TranscodeAsync(localSource, outputPath, new TranscodeOptions
{
Width = 1280,
Height = 720,
Preset = "medium",
Crf = 23,
}, ct);
var s3Key = $"transcoded/{context.TaskId}/720p.mp4";
await _s3.UploadAsync("transcoded-videos", s3Key, outputPath, ct);
stepData.S3Key = s3Key;
stepData.S3Bucket = "transcoded-videos";
stepData.FileSizeBytes = new FileInfo(outputPath).Length;
return StepResult.Success();
}The 1080p and 4K overrides follow the same pattern with different resolution parameters. The RetryPolicy (5 retries, exponential backoff starting at 2 seconds) wraps each execution automatically — the developer never writes retry logic.
CompensateTranscode720p — S3 Cleanup
protected override async Task CompensateTranscode720pAsync(
SagaContext<VideoTranscodeRequest> context,
Transcode720pStepData stepData,
CancellationToken ct)
{
if (!string.IsNullOrEmpty(stepData.S3Key))
{
await _s3.DeleteAsync(stepData.S3Bucket, stepData.S3Key, ct);
}
}protected override async Task CompensateTranscode720pAsync(
SagaContext<VideoTranscodeRequest> context,
Transcode720pStepData stepData,
CancellationToken ct)
{
if (!string.IsNullOrEmpty(stepData.S3Key))
{
await _s3.DeleteAsync(stepData.S3Bucket, stepData.S3Key, ct);
}
}The compensation is simple because the step data tells us exactly what was created. No guessing, no pattern-matching on S3 key prefixes. The Transcode720pStepData.S3Key was set during execution — if the step completed, the key is populated.
Differentiated Retry
Not all steps deserve the same retry budget. FFmpeg transcoding is expensive but transient failures happen (disk I/O, memory pressure). Webhook notifications are cheap but the remote server might be down.
The declaration makes this explicit:
| Step | MaxRetries | Backoff | Delay | Rationale |
|---|---|---|---|---|
| Transcode720p | 5 | Exponential | 2000ms | Heavy compute, transient failures likely |
| Transcode1080p | 5 | Exponential | 2000ms | Same |
| Transcode4K | 5 | Exponential | 2000ms | Same |
| NotifyWebhook | 1 | Constant | 3000ms | Fast call, if it fails twice it's not transient |
The generator emits distinct Polly policies for each:
// Generated — retry policies
private static readonly IAsyncPolicy _retryPolicy_Transcode720p =
Policy.Handle<Exception>()
.WaitAndRetryAsync(5, attempt =>
TimeSpan.FromMilliseconds(2000 * Math.Pow(2, attempt - 1)));
private static readonly IAsyncPolicy _retryPolicy_NotifyWebhook =
Policy.Handle<Exception>()
.WaitAndRetryAsync(1, _ => TimeSpan.FromMilliseconds(3000));// Generated — retry policies
private static readonly IAsyncPolicy _retryPolicy_Transcode720p =
Policy.Handle<Exception>()
.WaitAndRetryAsync(5, attempt =>
TimeSpan.FromMilliseconds(2000 * Math.Pow(2, attempt - 1)));
private static readonly IAsyncPolicy _retryPolicy_NotifyWebhook =
Policy.Handle<Exception>()
.WaitAndRetryAsync(1, _ => TimeSpan.FromMilliseconds(3000));Steps without a [RetryPolicy] attribute inherit the task-level MaxRetries = 1 from the [DistributedTask] attribute.
The Flow
The diagram shows the fan-out at order 3 and the fan-in before order 4. The condition gate on Transcode4K is evaluated after ExtractMetadata populates the resolution data. If the source is 1080p, the 4K branch goes directly to Skipped — no task is created, no resources are consumed, and the condition snapshot records exactly why.
The Full Picture
From the developer's perspective, this pipeline requires:
- ~40 lines of attribute declarations (the
partial classheader) - ~15 lines for the request and response classes
- ~60 lines of step overrides (ExtractMetadata, three transcodes, GenerateThumbnails, UpdateCatalog, NotifyWebhook)
- ~15 lines of compensation overrides (one per transcode step)
- ~5 lines for the condition method
~135 lines total. The generator produces the orchestrator kernel, parallel group execution, condition evaluation and snapshotting, Polly policies, compensation dispatch, the API controller, the queue consumer, the worker message, typed step data ValueObjects, progress tracking, cancellation handling, DI registration, and audit logging.
Everything the developer writes is business logic — FFprobe calls, FFmpeg parameters, S3 key conventions, catalog API calls. Everything the generator writes is infrastructure — the saga pattern, the retry loop, the compensation sequence, the fan-out/fan-in synchronization.
What's Next
Part V introduces the persistence layer — the DistributedTaskInstance aggregate root, SagaStepRecord entities, StepConditionSnapshot storage, and TaskAuditEntry logging — all modeled with Entity.Dsl and generated by the same source generation pipeline.