Generated Code
"~75 lines of developer code. ~700+ lines of generated infrastructure. The compiler is your staff engineer."
The Amplification
Parts III and IV showed the ZIP and Video examples from the developer's perspective — declare attributes, write business logic, done. This part catalogs every file the generator emits, in one place, so you can see the full scope of what the compiler produces from a single decorated class.
For the CreateZipFromFiles task:
- Developer writes: task declaration (~35 lines), request/response (~12 lines), two overrides (~28 lines) = ~75 lines
- Generator emits: 16 files totaling ~700+ lines of orchestration, API surface, resilience, events, real-time transport, cleanup, and DI wiring
The rest of this article catalogs every one of those files.
The Generation Gap Table
| # | File | Author | Purpose |
|---|---|---|---|
| 1 | CreateZipOrchestratorKernel.g.cs |
Generator | Shared saga kernel: step execution with policies, compensation, audit |
| 2 | CreateZipApiOrchestrator.g.cs |
Generator | API-side orchestrator: executes StepHost.Api steps, publishes to queue |
| 3 | CreateZipWorkerOrchestrator.g.cs |
Generator | Worker-side orchestrator: consumes message, executes StepHost.Worker steps |
| 4 | UploadSourceFilesStepData.g.cs |
Generator | Typed step data ValueObject for Step 1 |
| 5 | CreateZipArchiveStepData.g.cs |
Generator | Typed step data ValueObject for Step 2 |
| 6 | UploadZipStepData.g.cs |
Generator | Typed step data ValueObject for Step 3 |
| 7 | CreateZipWorkerMessage.g.cs |
Generator | Queue message: IFormFile replaced by S3 keys |
| 8 | CreateZipConsumer.g.cs |
Generator | Queue consumer with distributed lock acquisition |
| 9 | CreateZipTaskController.g.cs |
Generator | API controller: POST, GET status, GET stream, GET ws, GET result, DELETE |
| 10 | CreateZipTaskEvents.g.cs |
Generator | Domain events: submitted, step completed/failed, task completed/cancelled |
| 11 | CreateZipRetryPolicies.g.cs |
Generator | Polly IAsyncPolicy per step from [RetryPolicy] attributes |
| 12 | CreateZipTaskHub.g.cs |
Generator | SignalR hub for real-time progress |
| 13 | CreateZipWebhookNotifier.g.cs |
Generator | Webhook BackgroundService for push notifications |
| 14 | CreateZipReconciliationServiceBase.g.cs |
Generator | Abstract S3 orphan cleanup service |
| 15 | CreateZipArchiveService.g.cs |
Generator | DB archival IHostedService |
| 16 | CreateZipServiceExtensions.g.cs |
Generator | DI registration: AddCreateZipFromFilesTask(this IServiceCollection) |
| 17 | CreateZipFromFilesTask.cs |
Developer | Partial class with ExecuteCreateZipArchiveAsync and MapResponse overrides |
| 18 | CreateZipReconciliationService.cs |
Developer | Implements abstract cleanup logic for orphaned S3 objects |
Sixteen generated files. Two developer files. The developer owns the business logic. The generator owns everything else.
1. OrchestratorKernel
CreateZipOrchestratorKernel.g.cs — the abstract base class shared by both API and Worker orchestrators. Part III showed this in full. The key signatures:
// Generated: CreateZipOrchestratorKernel.g.cs
public abstract class CreateZipOrchestratorKernel
{
// ── Step execution with retry, progress tracking, and audit ──
protected async Task<StepResult> ExecuteStepWithPoliciesAsync<TStepData>(
SagaContext<CreateZipRequest> context,
string stepName,
int stepOrder,
Func<SagaContext<CreateZipRequest>, TStepData, CancellationToken, Task<StepResult>> execute,
IAsyncPolicy? retryPolicy,
CancellationToken ct)
where TStepData : new();
// ── Compensation: reverse-order execution of completed steps ──
protected async Task CompensateAsync(
SagaContext<CreateZipRequest> context,
CancellationToken ct);
// ── Audit trail ──
protected async Task AuditAsync(
SagaContext<CreateZipRequest> context,
string stepName,
string action,
string? detail = null);
// ── Per-step compensation (virtual, built-in for FileUploadStep) ──
protected virtual Task CompensateUploadSourceFilesAsync(/*...*/);
protected virtual Task CompensateCreateZipArchiveAsync(/*...*/);
protected virtual Task CompensateUploadZipAsync(/*...*/);
// ── Developer must implement ──
protected abstract Task<StepResult> ExecuteCreateZipArchiveAsync(
SagaContext<CreateZipRequest> context,
CreateZipArchiveStepData stepData,
CancellationToken ct);
protected abstract CreateZipResponse MapResponse(
SagaContext<CreateZipRequest> context);
}// Generated: CreateZipOrchestratorKernel.g.cs
public abstract class CreateZipOrchestratorKernel
{
// ── Step execution with retry, progress tracking, and audit ──
protected async Task<StepResult> ExecuteStepWithPoliciesAsync<TStepData>(
SagaContext<CreateZipRequest> context,
string stepName,
int stepOrder,
Func<SagaContext<CreateZipRequest>, TStepData, CancellationToken, Task<StepResult>> execute,
IAsyncPolicy? retryPolicy,
CancellationToken ct)
where TStepData : new();
// ── Compensation: reverse-order execution of completed steps ──
protected async Task CompensateAsync(
SagaContext<CreateZipRequest> context,
CancellationToken ct);
// ── Audit trail ──
protected async Task AuditAsync(
SagaContext<CreateZipRequest> context,
string stepName,
string action,
string? detail = null);
// ── Per-step compensation (virtual, built-in for FileUploadStep) ──
protected virtual Task CompensateUploadSourceFilesAsync(/*...*/);
protected virtual Task CompensateCreateZipArchiveAsync(/*...*/);
protected virtual Task CompensateUploadZipAsync(/*...*/);
// ── Developer must implement ──
protected abstract Task<StepResult> ExecuteCreateZipArchiveAsync(
SagaContext<CreateZipRequest> context,
CreateZipArchiveStepData stepData,
CancellationToken ct);
protected abstract CreateZipResponse MapResponse(
SagaContext<CreateZipRequest> context);
}The kernel enforces the saga contract. ExecuteStepWithPoliciesAsync wraps every step with the Polly policy (if any), records success/failure in the SagaStepRecord, updates real-time progress, and writes an audit entry. Compensation runs in reverse order. [FileUploadStep] steps get built-in compensation (delete the uploaded S3 objects). [CustomStep] steps get an empty virtual method.
2. ApiOrchestrator
CreateZipApiOrchestrator.g.cs — executes the StepHost.Api steps, builds the WorkerMessage, and publishes to the queue.
// Generated: CreateZipApiOrchestrator.g.cs
public class CreateZipApiOrchestrator : CreateZipOrchestratorKernel
{
private readonly IQueuePublisher _queue;
public async Task<string> SubmitAsync(
CreateZipRequest request,
CancellationToken ct)
{
var taskId = Guid.NewGuid().ToString();
var context = SagaContext<CreateZipRequest>.Create(taskId, request);
await _taskRepo.CreateAsync(context.TaskInstance, ct);
try
{
// Step 1: UploadSourceFiles (StepHost.Api)
await ExecuteStepWithPoliciesAsync<UploadSourceFilesStepData>(
context, "UploadSourceFiles", 1,
ExecuteUploadSourceFilesAsync, retryPolicy: null, ct);
// Dispatch to Worker
var message = BuildWorkerMessage(context);
await _queue.PublishAsync("file-processing", message, ct);
await AuditAsync(context, "Dispatch", "Published to queue");
return taskId;
}
catch (Exception)
{
await CompensateAsync(context, ct);
throw;
}
}
}// Generated: CreateZipApiOrchestrator.g.cs
public class CreateZipApiOrchestrator : CreateZipOrchestratorKernel
{
private readonly IQueuePublisher _queue;
public async Task<string> SubmitAsync(
CreateZipRequest request,
CancellationToken ct)
{
var taskId = Guid.NewGuid().ToString();
var context = SagaContext<CreateZipRequest>.Create(taskId, request);
await _taskRepo.CreateAsync(context.TaskInstance, ct);
try
{
// Step 1: UploadSourceFiles (StepHost.Api)
await ExecuteStepWithPoliciesAsync<UploadSourceFilesStepData>(
context, "UploadSourceFiles", 1,
ExecuteUploadSourceFilesAsync, retryPolicy: null, ct);
// Dispatch to Worker
var message = BuildWorkerMessage(context);
await _queue.PublishAsync("file-processing", message, ct);
await AuditAsync(context, "Dispatch", "Published to queue");
return taskId;
}
catch (Exception)
{
await CompensateAsync(context, ct);
throw;
}
}
}SubmitAsync is what the controller calls. It returns the taskId immediately after dispatching to the queue — the client polls or subscribes for updates.
3. WorkerOrchestrator
CreateZipWorkerOrchestrator.g.cs — consumes the worker message and executes the remaining steps.
// Generated: CreateZipWorkerOrchestrator.g.cs
public class CreateZipWorkerOrchestrator : CreateZipOrchestratorKernel
{
private readonly IAsyncPolicy _createZipArchiveRetryPolicy;
public async Task ProcessAsync(
CreateZipFromFilesWorkerMessage message,
CancellationToken ct)
{
await using var lockHandle = await _lock.AcquireAsync(
$"task:{message.TaskId}", TimeSpan.FromSeconds(600), ct);
var context = await RestoreContextAsync(message, ct);
try
{
// Step 2: CreateZipArchive (StepHost.Worker) — with Polly retry
await ExecuteStepWithPoliciesAsync<CreateZipArchiveStepData>(
context, "CreateZipArchive", 2,
ExecuteCreateZipArchiveAsync,
_createZipArchiveRetryPolicy, ct);
// Step 3: UploadZip (StepHost.Worker)
await ExecuteStepWithPoliciesAsync<UploadZipStepData>(
context, "UploadZip", 3,
ExecuteUploadZipAsync, retryPolicy: null, ct);
// Complete
var response = MapResponse(context);
await _taskRepo.CompleteAsync(context.TaskId, response, ct);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
await CompensateAsync(context, ct);
await _taskRepo.UpdateStatusAsync(message.TaskId,
DistributedTaskStatus.Cancelled, ct);
}
catch (Exception)
{
await CompensateAsync(context, ct);
await _taskRepo.UpdateStatusAsync(message.TaskId,
DistributedTaskStatus.Failed, ct);
throw;
}
}
}// Generated: CreateZipWorkerOrchestrator.g.cs
public class CreateZipWorkerOrchestrator : CreateZipOrchestratorKernel
{
private readonly IAsyncPolicy _createZipArchiveRetryPolicy;
public async Task ProcessAsync(
CreateZipFromFilesWorkerMessage message,
CancellationToken ct)
{
await using var lockHandle = await _lock.AcquireAsync(
$"task:{message.TaskId}", TimeSpan.FromSeconds(600), ct);
var context = await RestoreContextAsync(message, ct);
try
{
// Step 2: CreateZipArchive (StepHost.Worker) — with Polly retry
await ExecuteStepWithPoliciesAsync<CreateZipArchiveStepData>(
context, "CreateZipArchive", 2,
ExecuteCreateZipArchiveAsync,
_createZipArchiveRetryPolicy, ct);
// Step 3: UploadZip (StepHost.Worker)
await ExecuteStepWithPoliciesAsync<UploadZipStepData>(
context, "UploadZip", 3,
ExecuteUploadZipAsync, retryPolicy: null, ct);
// Complete
var response = MapResponse(context);
await _taskRepo.CompleteAsync(context.TaskId, response, ct);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
await CompensateAsync(context, ct);
await _taskRepo.UpdateStatusAsync(message.TaskId,
DistributedTaskStatus.Cancelled, ct);
}
catch (Exception)
{
await CompensateAsync(context, ct);
await _taskRepo.UpdateStatusAsync(message.TaskId,
DistributedTaskStatus.Failed, ct);
throw;
}
}
}The distributed lock prevents two workers from processing the same task. The lock timeout is derived from TimeoutSeconds = 600 on the task declaration. Context restoration deserializes the step data from the worker message so Step 2 can access Step 1's output through context.GetStepData<UploadSourceFilesStepData>().
4-6. Typed Step Data ValueObjects
Three files, one per step. Each is a sealed class with the properties that flow between steps.
// Generated: UploadSourceFilesStepData.g.cs
public sealed class UploadSourceFilesStepData
{
public List<string> UploadedKeys { get; set; } = new();
public string Bucket { get; set; } = "incoming-files";
}
// Generated: CreateZipArchiveStepData.g.cs
public sealed class CreateZipArchiveStepData
{
public string ZipPath { get; set; } = "";
public long ZipSizeBytes { get; set; }
}
// Generated: UploadZipStepData.g.cs
public sealed class UploadZipStepData
{
public string ZipS3Key { get; set; } = "";
public string ZipS3Bucket { get; set; } = "processed-files";
}// Generated: UploadSourceFilesStepData.g.cs
public sealed class UploadSourceFilesStepData
{
public List<string> UploadedKeys { get; set; } = new();
public string Bucket { get; set; } = "incoming-files";
}
// Generated: CreateZipArchiveStepData.g.cs
public sealed class CreateZipArchiveStepData
{
public string ZipPath { get; set; } = "";
public long ZipSizeBytes { get; set; }
}
// Generated: UploadZipStepData.g.cs
public sealed class UploadZipStepData
{
public string ZipS3Key { get; set; } = "";
public string ZipS3Bucket { get; set; } = "processed-files";
}[FileUploadStep] produces predictable data shapes — keys and bucket. [CustomStep] data is inferred from the properties the developer writes to in the override method (the generator inspects the partial class at compile time).
7. WorkerMessage
CreateZipWorkerMessage.g.cs — the serializable projection of the request where IFormFile is replaced by S3 keys.
// Generated: CreateZipWorkerMessage.g.cs
public sealed class CreateZipWorkerMessage
{
public string TaskId { get; set; } = "";
// IFormFile → S3 keys (from UploadSourceFiles step)
public List<string> FileS3Keys { get; set; } = new();
public string FileS3Bucket { get; set; } = "";
// Non-file properties carried through
public string? OutputFileName { get; set; }
// Completed API step data
public UploadSourceFilesStepData UploadSourceFilesData { get; set; } = new();
}// Generated: CreateZipWorkerMessage.g.cs
public sealed class CreateZipWorkerMessage
{
public string TaskId { get; set; } = "";
// IFormFile → S3 keys (from UploadSourceFiles step)
public List<string> FileS3Keys { get; set; } = new();
public string FileS3Bucket { get; set; } = "";
// Non-file properties carried through
public string? OutputFileName { get; set; }
// Completed API step data
public UploadSourceFilesStepData UploadSourceFilesData { get; set; } = new();
}The generator knows that Files (the IFormFile property referenced by SourceProperty = "Files") was uploaded by the UploadSourceFiles step to the incoming-files bucket. It replaces the property with FileS3Keys and FileS3Bucket. Non-file properties (OutputFileName) pass through unchanged. Completed step data is embedded so the Worker can restore the saga context without a database round-trip.
8. Consumer
CreateZipConsumer.g.cs — the queue consumer that dequeues messages and delegates to the Worker orchestrator.
// Generated: CreateZipConsumer.g.cs
public class CreateZipConsumer : IConsumer<CreateZipWorkerMessage>
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IDistributedLock _lock;
private readonly ILogger<CreateZipConsumer> _logger;
public async Task Consume(ConsumeContext<CreateZipWorkerMessage> context)
{
var message = context.Message;
_logger.LogInformation("Consuming task {TaskId}", message.TaskId);
await using var lockHandle = await _lock.TryAcquireAsync(
$"consumer:task:{message.TaskId}",
TimeSpan.FromSeconds(600));
if (lockHandle is null)
{
_logger.LogWarning(
"Task {TaskId} already being processed, skipping", message.TaskId);
return;
}
using var scope = _scopeFactory.CreateScope();
var orchestrator = scope.ServiceProvider
.GetRequiredService<CreateZipWorkerOrchestrator>();
await orchestrator.ProcessAsync(message, context.CancellationToken);
}
}// Generated: CreateZipConsumer.g.cs
public class CreateZipConsumer : IConsumer<CreateZipWorkerMessage>
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IDistributedLock _lock;
private readonly ILogger<CreateZipConsumer> _logger;
public async Task Consume(ConsumeContext<CreateZipWorkerMessage> context)
{
var message = context.Message;
_logger.LogInformation("Consuming task {TaskId}", message.TaskId);
await using var lockHandle = await _lock.TryAcquireAsync(
$"consumer:task:{message.TaskId}",
TimeSpan.FromSeconds(600));
if (lockHandle is null)
{
_logger.LogWarning(
"Task {TaskId} already being processed, skipping", message.TaskId);
return;
}
using var scope = _scopeFactory.CreateScope();
var orchestrator = scope.ServiceProvider
.GetRequiredService<CreateZipWorkerOrchestrator>();
await orchestrator.ProcessAsync(message, context.CancellationToken);
}
}The consumer uses TryAcquireAsync (non-blocking) rather than AcquireAsync (blocking). If another worker already holds the lock, the message is skipped — the queue will redeliver it if the first worker fails. Each consumption creates a new DI scope so the orchestrator gets fresh scoped services (DbContext, repositories).
9. Controller
CreateZipTaskController.g.cs — six endpoints covering the full lifecycle.
// Generated: CreateZipTaskController.g.cs
[ApiController]
[Route("api/tasks/create-zip")]
public class CreateZipTaskController : ControllerBase
{
private readonly CreateZipApiOrchestrator _orchestrator;
private readonly IDistributedTaskInstanceRepository _taskRepo;
private readonly ITaskProgressTracker _progress;
// ── POST: Submit a new task ──
[HttpPost]
public async Task<ActionResult<TaskSubmittedResponse>> Submit(
[FromForm] CreateZipRequest request,
CancellationToken ct)
{
var taskId = await _orchestrator.SubmitAsync(request, ct);
return Accepted(new TaskSubmittedResponse { TaskId = taskId });
}
// ── GET: Poll for status ──
[HttpGet("{taskId}/status")]
public async Task<ActionResult<TaskStatusResponse>> GetStatus(
string taskId, CancellationToken ct)
{
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null) return NotFound();
return Ok(TaskStatusResponse.From(instance));
}
// ── GET: SSE progress stream ──
[HttpGet("{taskId}/stream")]
public async Task GetStream(string taskId, CancellationToken ct)
{
Response.ContentType = "text/event-stream";
await foreach (var update in _progress.StreamAsync(taskId, ct))
{
await Response.WriteAsync($"data: {update.ToJson()}\n\n", ct);
await Response.Body.FlushAsync(ct);
}
}
// ── GET: WebSocket upgrade ──
[HttpGet("{taskId}/ws")]
public async Task GetWebSocket(string taskId)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
HttpContext.Response.StatusCode = 400;
return;
}
var ws = await HttpContext.WebSockets.AcceptWebSocketAsync();
await _progress.PipeToWebSocketAsync(taskId, ws, HttpContext.RequestAborted);
}
// ── GET: Completed result ──
[HttpGet("{taskId}/result")]
public async Task<ActionResult<CreateZipResponse>> GetResult(
string taskId, CancellationToken ct)
{
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null) return NotFound();
if (instance.Status != DistributedTaskStatus.Completed)
return Conflict(new { message = "Task not yet completed" });
return Ok(instance.DeserializeResponse<CreateZipResponse>());
}
// ── DELETE: Cancel a running task ──
[HttpDelete("{taskId}")]
public async Task<ActionResult> Cancel(
string taskId, CancellationToken ct)
{
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null) return NotFound();
if (instance.Status is DistributedTaskStatus.Completed
or DistributedTaskStatus.Failed
or DistributedTaskStatus.Cancelled)
return Conflict(new { message = "Task already terminal" });
await _taskRepo.UpdateStatusAsync(taskId,
DistributedTaskStatus.Cancelled, ct);
return Accepted();
}
}// Generated: CreateZipTaskController.g.cs
[ApiController]
[Route("api/tasks/create-zip")]
public class CreateZipTaskController : ControllerBase
{
private readonly CreateZipApiOrchestrator _orchestrator;
private readonly IDistributedTaskInstanceRepository _taskRepo;
private readonly ITaskProgressTracker _progress;
// ── POST: Submit a new task ──
[HttpPost]
public async Task<ActionResult<TaskSubmittedResponse>> Submit(
[FromForm] CreateZipRequest request,
CancellationToken ct)
{
var taskId = await _orchestrator.SubmitAsync(request, ct);
return Accepted(new TaskSubmittedResponse { TaskId = taskId });
}
// ── GET: Poll for status ──
[HttpGet("{taskId}/status")]
public async Task<ActionResult<TaskStatusResponse>> GetStatus(
string taskId, CancellationToken ct)
{
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null) return NotFound();
return Ok(TaskStatusResponse.From(instance));
}
// ── GET: SSE progress stream ──
[HttpGet("{taskId}/stream")]
public async Task GetStream(string taskId, CancellationToken ct)
{
Response.ContentType = "text/event-stream";
await foreach (var update in _progress.StreamAsync(taskId, ct))
{
await Response.WriteAsync($"data: {update.ToJson()}\n\n", ct);
await Response.Body.FlushAsync(ct);
}
}
// ── GET: WebSocket upgrade ──
[HttpGet("{taskId}/ws")]
public async Task GetWebSocket(string taskId)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
HttpContext.Response.StatusCode = 400;
return;
}
var ws = await HttpContext.WebSockets.AcceptWebSocketAsync();
await _progress.PipeToWebSocketAsync(taskId, ws, HttpContext.RequestAborted);
}
// ── GET: Completed result ──
[HttpGet("{taskId}/result")]
public async Task<ActionResult<CreateZipResponse>> GetResult(
string taskId, CancellationToken ct)
{
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null) return NotFound();
if (instance.Status != DistributedTaskStatus.Completed)
return Conflict(new { message = "Task not yet completed" });
return Ok(instance.DeserializeResponse<CreateZipResponse>());
}
// ── DELETE: Cancel a running task ──
[HttpDelete("{taskId}")]
public async Task<ActionResult> Cancel(
string taskId, CancellationToken ct)
{
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null) return NotFound();
if (instance.Status is DistributedTaskStatus.Completed
or DistributedTaskStatus.Failed
or DistributedTaskStatus.Cancelled)
return Conflict(new { message = "Task already terminal" });
await _taskRepo.UpdateStatusAsync(taskId,
DistributedTaskStatus.Cancelled, ct);
return Accepted();
}
}The [Cancellable] attribute on the task declaration enables the DELETE endpoint. Without it, the generator omits the cancel method entirely. The SSE and WebSocket endpoints are always generated — the client chooses which transport to use via ListeningStrategy in the request (covered in Part X).
Note the [FromForm] binding on Submit — the generator detects that the request contains IFormFile properties and uses multipart form data instead of JSON.
10. Domain Events
CreateZipTaskEvents.g.cs — typed events published through the domain event dispatcher.
// Generated: CreateZipTaskEvents.g.cs
public sealed record TaskSubmittedEvent(
string TaskId,
string TaskName,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record StepCompletedEvent(
string TaskId,
string StepName,
int StepOrder,
TimeSpan Duration,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record StepFailedEvent(
string TaskId,
string StepName,
int StepOrder,
string ErrorMessage,
int AttemptCount,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record TaskCompletedEvent(
string TaskId,
TimeSpan TotalDuration,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record TaskCancelledEvent(
string TaskId,
string? CancelledByStep,
DateTimeOffset Timestamp) : IDomainEvent;// Generated: CreateZipTaskEvents.g.cs
public sealed record TaskSubmittedEvent(
string TaskId,
string TaskName,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record StepCompletedEvent(
string TaskId,
string StepName,
int StepOrder,
TimeSpan Duration,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record StepFailedEvent(
string TaskId,
string StepName,
int StepOrder,
string ErrorMessage,
int AttemptCount,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record TaskCompletedEvent(
string TaskId,
TimeSpan TotalDuration,
DateTimeOffset Timestamp) : IDomainEvent;
public sealed record TaskCancelledEvent(
string TaskId,
string? CancelledByStep,
DateTimeOffset Timestamp) : IDomainEvent;Events are raised inside the orchestrator kernel as side effects of step execution. They flow through the domain event dispatcher and can be consumed by handlers in any layer — logging, metrics, notifications, downstream workflow triggers.
11. Retry Policies
CreateZipRetryPolicies.g.cs — Polly IAsyncPolicy instances generated from [RetryPolicy] attributes.
// Generated: CreateZipRetryPolicies.g.cs
public static class CreateZipRetryPolicies
{
// From: [RetryPolicy("CreateZipArchive", MaxRetries = 3,
// BackoffType = BackoffType.Exponential, DelayMs = 500)]
public static IAsyncPolicy CreateZipArchivePolicy { get; } =
Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt =>
TimeSpan.FromMilliseconds(500 * Math.Pow(2, attempt - 1)),
onRetry: (exception, delay, attempt, context) =>
{
var logger = context.GetLogger();
logger?.LogWarning(exception,
"Retry {Attempt}/3 for CreateZipArchive after {Delay}ms",
attempt, delay.TotalMilliseconds);
});
// Steps without [RetryPolicy] get no policy (null is passed to the kernel)
}// Generated: CreateZipRetryPolicies.g.cs
public static class CreateZipRetryPolicies
{
// From: [RetryPolicy("CreateZipArchive", MaxRetries = 3,
// BackoffType = BackoffType.Exponential, DelayMs = 500)]
public static IAsyncPolicy CreateZipArchivePolicy { get; } =
Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt =>
TimeSpan.FromMilliseconds(500 * Math.Pow(2, attempt - 1)),
onRetry: (exception, delay, attempt, context) =>
{
var logger = context.GetLogger();
logger?.LogWarning(exception,
"Retry {Attempt}/3 for CreateZipArchive after {Delay}ms",
attempt, delay.TotalMilliseconds);
});
// Steps without [RetryPolicy] get no policy (null is passed to the kernel)
}Each [RetryPolicy] attribute produces a static property. The BackoffType enum supports Constant, Linear, and Exponential. The onRetry callback logs each retry attempt with structured data. Steps without a [RetryPolicy] attribute receive null in ExecuteStepWithPoliciesAsync, which means no retry — one failure triggers compensation immediately.
12. SignalR Hub
CreateZipTaskHub.g.cs — real-time progress delivery for clients that choose ListeningStrategy.SignalR.
// Generated: CreateZipTaskHub.g.cs
[Authorize]
public class CreateZipTaskHub : Hub
{
public async Task Subscribe(string taskId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"task:{taskId}");
}
public async Task Unsubscribe(string taskId)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"task:{taskId}");
}
}
// Used by the progress tracker:
// await _hubContext.Clients.Group($"task:{taskId}")
// .SendAsync("StepProgress", update);// Generated: CreateZipTaskHub.g.cs
[Authorize]
public class CreateZipTaskHub : Hub
{
public async Task Subscribe(string taskId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"task:{taskId}");
}
public async Task Unsubscribe(string taskId)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"task:{taskId}");
}
}
// Used by the progress tracker:
// await _hubContext.Clients.Group($"task:{taskId}")
// .SendAsync("StepProgress", update);The hub is thin — it only manages group membership. The actual progress push happens inside the ITaskProgressTracker implementation, which injects IHubContext<CreateZipTaskHub> and broadcasts to the task's group whenever UpdateAsync is called.
13. Webhook Notifier
CreateZipWebhookNotifier.g.cs — a BackgroundService that delivers progress to external URLs for clients that choose ListeningStrategy.Webhook.
// Generated: CreateZipWebhookNotifier.g.cs
public class CreateZipWebhookNotifier : BackgroundService
{
private readonly ITaskProgressTracker _progress;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IDistributedTaskInstanceRepository _taskRepo;
private readonly ILogger<CreateZipWebhookNotifier> _logger;
protected override async Task ExecuteAsync(CancellationToken ct)
{
await foreach (var update in _progress.SubscribeAllAsync(
"CreateZipFromFiles", ct))
{
var instance = await _taskRepo.FindByIdAsync(update.TaskId, ct);
if (instance?.Listening != ListeningStrategy.Webhook) continue;
if (string.IsNullOrEmpty(instance.WebhookUrl)) continue;
await DeliverWithRetryAsync(instance.WebhookUrl, update, ct);
}
}
private async Task DeliverWithRetryAsync(
string url, TaskProgressUpdate update, CancellationToken ct)
{
var client = _httpClientFactory.CreateClient("webhook");
var payload = JsonSerializer.Serialize(new
{
taskId = update.TaskId,
step = update.StepName,
status = update.Status.ToString(),
timestamp = DateTimeOffset.UtcNow,
});
// 3 attempts with exponential backoff
for (var attempt = 1; attempt <= 3; attempt++)
{
try
{
var response = await client.PostAsync(url,
new StringContent(payload, Encoding.UTF8,
"application/json"), ct);
if (response.IsSuccessStatusCode) return;
}
catch (Exception ex) when (attempt < 3)
{
_logger.LogWarning(ex,
"Webhook delivery attempt {Attempt} failed for {TaskId}",
attempt, update.TaskId);
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);
}
}
_logger.LogError(
"Webhook delivery failed after 3 attempts for task {TaskId} to {Url}",
update.TaskId, url);
}
}// Generated: CreateZipWebhookNotifier.g.cs
public class CreateZipWebhookNotifier : BackgroundService
{
private readonly ITaskProgressTracker _progress;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IDistributedTaskInstanceRepository _taskRepo;
private readonly ILogger<CreateZipWebhookNotifier> _logger;
protected override async Task ExecuteAsync(CancellationToken ct)
{
await foreach (var update in _progress.SubscribeAllAsync(
"CreateZipFromFiles", ct))
{
var instance = await _taskRepo.FindByIdAsync(update.TaskId, ct);
if (instance?.Listening != ListeningStrategy.Webhook) continue;
if (string.IsNullOrEmpty(instance.WebhookUrl)) continue;
await DeliverWithRetryAsync(instance.WebhookUrl, update, ct);
}
}
private async Task DeliverWithRetryAsync(
string url, TaskProgressUpdate update, CancellationToken ct)
{
var client = _httpClientFactory.CreateClient("webhook");
var payload = JsonSerializer.Serialize(new
{
taskId = update.TaskId,
step = update.StepName,
status = update.Status.ToString(),
timestamp = DateTimeOffset.UtcNow,
});
// 3 attempts with exponential backoff
for (var attempt = 1; attempt <= 3; attempt++)
{
try
{
var response = await client.PostAsync(url,
new StringContent(payload, Encoding.UTF8,
"application/json"), ct);
if (response.IsSuccessStatusCode) return;
}
catch (Exception ex) when (attempt < 3)
{
_logger.LogWarning(ex,
"Webhook delivery attempt {Attempt} failed for {TaskId}",
attempt, update.TaskId);
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);
}
}
_logger.LogError(
"Webhook delivery failed after 3 attempts for task {TaskId} to {Url}",
update.TaskId, url);
}
}The notifier subscribes to all progress updates for the task type and filters to webhook listeners. Delivery retries are independent of the step retry policies — webhook failures never affect task execution.
14. Reconciliation Service Base
CreateZipReconciliationServiceBase.g.cs — abstract service that detects orphaned S3 objects from failed or abandoned tasks.
// Generated: CreateZipReconciliationServiceBase.g.cs
public abstract class CreateZipReconciliationServiceBase : BackgroundService
{
private readonly IS3Client _s3;
private readonly IDistributedTaskInstanceRepository _taskRepo;
private readonly ILogger _logger;
private readonly TimeSpan _interval = TimeSpan.FromHours(1);
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(_interval, ct);
await ReconcileAsync(ct);
}
}
private async Task ReconcileAsync(CancellationToken ct)
{
// List S3 objects in task buckets
var incomingKeys = await _s3.ListAsync("incoming-files", "uploads/", ct);
var processedKeys = await _s3.ListAsync("processed-files", "zips/", ct);
foreach (var key in incomingKeys.Concat(processedKeys))
{
var taskId = ExtractTaskId(key);
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null || IsOrphaned(instance))
{
if (await ShouldDeleteAsync(key, instance, ct))
{
var bucket = key.StartsWith("uploads/")
? "incoming-files" : "processed-files";
await _s3.DeleteAsync(bucket, key, ct);
_logger.LogInformation("Deleted orphaned S3 object {Key}", key);
}
}
}
}
// Developer implements: custom logic for determining deletion eligibility
protected abstract Task<bool> ShouldDeleteAsync(
string s3Key,
DistributedTaskInstance? instance,
CancellationToken ct);
private static bool IsOrphaned(DistributedTaskInstance instance)
=> instance.Status is DistributedTaskStatus.Failed
or DistributedTaskStatus.Cancelled
or DistributedTaskStatus.CompensationFailed
&& instance.CompletedAt < DateTimeOffset.UtcNow.AddHours(-2);
private static string ExtractTaskId(string key)
=> key.Split('/')[1]; // uploads/{taskId}/file.pdf → taskId
}// Generated: CreateZipReconciliationServiceBase.g.cs
public abstract class CreateZipReconciliationServiceBase : BackgroundService
{
private readonly IS3Client _s3;
private readonly IDistributedTaskInstanceRepository _taskRepo;
private readonly ILogger _logger;
private readonly TimeSpan _interval = TimeSpan.FromHours(1);
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(_interval, ct);
await ReconcileAsync(ct);
}
}
private async Task ReconcileAsync(CancellationToken ct)
{
// List S3 objects in task buckets
var incomingKeys = await _s3.ListAsync("incoming-files", "uploads/", ct);
var processedKeys = await _s3.ListAsync("processed-files", "zips/", ct);
foreach (var key in incomingKeys.Concat(processedKeys))
{
var taskId = ExtractTaskId(key);
var instance = await _taskRepo.FindByIdAsync(taskId, ct);
if (instance is null || IsOrphaned(instance))
{
if (await ShouldDeleteAsync(key, instance, ct))
{
var bucket = key.StartsWith("uploads/")
? "incoming-files" : "processed-files";
await _s3.DeleteAsync(bucket, key, ct);
_logger.LogInformation("Deleted orphaned S3 object {Key}", key);
}
}
}
}
// Developer implements: custom logic for determining deletion eligibility
protected abstract Task<bool> ShouldDeleteAsync(
string s3Key,
DistributedTaskInstance? instance,
CancellationToken ct);
private static bool IsOrphaned(DistributedTaskInstance instance)
=> instance.Status is DistributedTaskStatus.Failed
or DistributedTaskStatus.Cancelled
or DistributedTaskStatus.CompensationFailed
&& instance.CompletedAt < DateTimeOffset.UtcNow.AddHours(-2);
private static string ExtractTaskId(string key)
=> key.Split('/')[1]; // uploads/{taskId}/file.pdf → taskId
}The base class handles scheduling, S3 enumeration, and orphan detection. The developer subclass implements ShouldDeleteAsync — the decision of whether a specific orphaned object should be deleted. This is a Generation Gap: the generator provides the structure, the developer provides the judgment.
15. Archive Service
CreateZipArchiveService.g.cs — moves completed and failed tasks to archive tables after the configured retention period. This was shown in Part V as part of the Entity.Dsl output. The key signature:
// Generated: CreateZipArchiveService.g.cs
public class CreateZipArchiveService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromHours(6), ct);
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider
.GetRequiredService<DistributedTaskDbContext>();
var cutoff = DateTimeOffset.UtcNow.AddDays(-30);
await context.Database.ExecuteSqlRawAsync(@"
INSERT INTO saga.DistributedTasks_Archive
SELECT * FROM saga.DistributedTasks
WHERE (Status = {0} OR Status = {1})
AND CompletedAt < {2};
-- ... cascade to steps and audit entries ...
DELETE FROM saga.DistributedTasks
WHERE (Status = {0} OR Status = {1})
AND CompletedAt < {2};",
(int)DistributedTaskStatus.Completed,
(int)DistributedTaskStatus.Failed,
cutoff, ct);
}
}
}// Generated: CreateZipArchiveService.g.cs
public class CreateZipArchiveService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromHours(6), ct);
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider
.GetRequiredService<DistributedTaskDbContext>();
var cutoff = DateTimeOffset.UtcNow.AddDays(-30);
await context.Database.ExecuteSqlRawAsync(@"
INSERT INTO saga.DistributedTasks_Archive
SELECT * FROM saga.DistributedTasks
WHERE (Status = {0} OR Status = {1})
AND CompletedAt < {2};
-- ... cascade to steps and audit entries ...
DELETE FROM saga.DistributedTasks
WHERE (Status = {0} OR Status = {1})
AND CompletedAt < {2};",
(int)DistributedTaskStatus.Completed,
(int)DistributedTaskStatus.Failed,
cutoff, ct);
}
}
}The retention period (AddDays(-30)) comes from the ArchiveAfterDays property on [DistributedTask].
16. Service Extensions
CreateZipServiceExtensions.g.cs — the single DI registration method that wires everything together.
// Generated: CreateZipServiceExtensions.g.cs
public static class CreateZipServiceExtensions
{
public static IServiceCollection AddCreateZipFromFilesTask(
this IServiceCollection services)
{
// Orchestrators
services.AddScoped<CreateZipApiOrchestrator>();
services.AddScoped<CreateZipWorkerOrchestrator>();
// Queue consumer
services.AddScoped<IConsumer<CreateZipWorkerMessage>, CreateZipConsumer>();
// Retry policies
services.AddSingleton(CreateZipRetryPolicies.CreateZipArchivePolicy);
// SignalR hub
services.AddSignalR();
// Webhook notifier
services.AddHostedService<CreateZipWebhookNotifier>();
// Reconciliation (if developer registered the concrete implementation)
services.AddHostedService<CreateZipReconciliationService>();
// Archive service
services.AddHostedService<CreateZipArchiveService>();
// Controller is discovered by ASP.NET Core's convention-based routing
// (it inherits from ControllerBase and has [ApiController])
return services;
}
}// Generated: CreateZipServiceExtensions.g.cs
public static class CreateZipServiceExtensions
{
public static IServiceCollection AddCreateZipFromFilesTask(
this IServiceCollection services)
{
// Orchestrators
services.AddScoped<CreateZipApiOrchestrator>();
services.AddScoped<CreateZipWorkerOrchestrator>();
// Queue consumer
services.AddScoped<IConsumer<CreateZipWorkerMessage>, CreateZipConsumer>();
// Retry policies
services.AddSingleton(CreateZipRetryPolicies.CreateZipArchivePolicy);
// SignalR hub
services.AddSignalR();
// Webhook notifier
services.AddHostedService<CreateZipWebhookNotifier>();
// Reconciliation (if developer registered the concrete implementation)
services.AddHostedService<CreateZipReconciliationService>();
// Archive service
services.AddHostedService<CreateZipArchiveService>();
// Controller is discovered by ASP.NET Core's convention-based routing
// (it inherits from ControllerBase and has [ApiController])
return services;
}
}One call in Program.cs:
builder.Services.AddCreateZipFromFilesTask();builder.Services.AddCreateZipFromFilesTask();That single line registers two orchestrators, a queue consumer, retry policies, a SignalR hub, a webhook background service, a reconciliation service, and an archive service. The controller is picked up automatically by ASP.NET Core's controller discovery.
17-18. Developer Files
These are the only files the developer writes by hand.
CreateZipFromFilesTask.cs — the partial class with business logic overrides:
public partial class CreateZipFromFilesTask
{
protected override async Task<StepResult> ExecuteCreateZipArchiveAsync(
SagaContext<CreateZipRequest> context,
CreateZipArchiveStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadSourceFilesStepData>();
// ... download files, create ZIP, populate stepData ...
return StepResult.Success();
}
protected override CreateZipResponse MapResponse(
SagaContext<CreateZipRequest> context)
{
// ... assemble response from step data ...
}
}public partial class CreateZipFromFilesTask
{
protected override async Task<StepResult> ExecuteCreateZipArchiveAsync(
SagaContext<CreateZipRequest> context,
CreateZipArchiveStepData stepData,
CancellationToken ct)
{
var uploadData = context.GetStepData<UploadSourceFilesStepData>();
// ... download files, create ZIP, populate stepData ...
return StepResult.Success();
}
protected override CreateZipResponse MapResponse(
SagaContext<CreateZipRequest> context)
{
// ... assemble response from step data ...
}
}CreateZipReconciliationService.cs — the concrete reconciliation implementation:
public class CreateZipReconciliationService
: CreateZipReconciliationServiceBase
{
protected override Task<bool> ShouldDeleteAsync(
string s3Key,
DistributedTaskInstance? instance,
CancellationToken ct)
{
// Delete if no task record exists or task failed >24h ago
if (instance is null) return Task.FromResult(true);
if (instance.Status == DistributedTaskStatus.CompensationFailed
&& instance.CompletedAt < DateTimeOffset.UtcNow.AddHours(-24))
return Task.FromResult(true);
return Task.FromResult(false);
}
}public class CreateZipReconciliationService
: CreateZipReconciliationServiceBase
{
protected override Task<bool> ShouldDeleteAsync(
string s3Key,
DistributedTaskInstance? instance,
CancellationToken ct)
{
// Delete if no task record exists or task failed >24h ago
if (instance is null) return Task.FromResult(true);
if (instance.Status == DistributedTaskStatus.CompensationFailed
&& instance.CompletedAt < DateTimeOffset.UtcNow.AddHours(-24))
return Task.FromResult(true);
return Task.FromResult(false);
}
}The Dependency Graph
Every generated file knows its dependencies. The generator emits them in the correct order:
StepData ValueObjects (4-6) ← no dependencies
WorkerMessage (7) ← depends on StepData
RetryPolicies (11) ← no dependencies
Domain Events (10) ← no dependencies
OrchestratorKernel (1) ← depends on StepData, Events
ApiOrchestrator (2) ← depends on Kernel, WorkerMessage
WorkerOrchestrator (3) ← depends on Kernel, RetryPolicies
Consumer (8) ← depends on WorkerOrchestrator
Controller (9) ← depends on ApiOrchestrator
SignalR Hub (12) ← no dependencies
Webhook Notifier (13) ← no dependencies
ReconciliationServiceBase (14) ← no dependencies
ArchiveService (15) ← no dependencies
ServiceExtensions (16) ← depends on everything aboveStepData ValueObjects (4-6) ← no dependencies
WorkerMessage (7) ← depends on StepData
RetryPolicies (11) ← no dependencies
Domain Events (10) ← no dependencies
OrchestratorKernel (1) ← depends on StepData, Events
ApiOrchestrator (2) ← depends on Kernel, WorkerMessage
WorkerOrchestrator (3) ← depends on Kernel, RetryPolicies
Consumer (8) ← depends on WorkerOrchestrator
Controller (9) ← depends on ApiOrchestrator
SignalR Hub (12) ← no dependencies
Webhook Notifier (13) ← no dependencies
ReconciliationServiceBase (14) ← no dependencies
ArchiveService (15) ← no dependencies
ServiceExtensions (16) ← depends on everything aboveThe DI registration file is always generated last because it references every other type.
Summary
| Category | Files | Lines |
|---|---|---|
| Saga orchestration (kernel + API + Worker) | 3 | ~260 |
| Typed data (step data + worker message) | 4 | ~40 |
| API surface (controller) | 1 | ~80 |
| Queue (consumer) | 1 | ~25 |
| Domain events | 1 | ~20 |
| Resilience (retry policies) | 1 | ~15 |
| Real-time (SignalR hub + webhook) | 2 | ~60 |
| Ops (reconciliation + archive) | 2 | ~80 |
| DI registration | 1 | ~20 |
| Generator total | 16 | ~700+ |
| Developer total | 2 | ~75 |
The generator is not producing boilerplate for its own sake. Each file encodes a specific concern of distributed systems infrastructure — saga state management, cross-process communication, resilience, real-time delivery, operational cleanup, dependency wiring — that would otherwise be copy-pasted across every task and diverge over time.
What's Next
Part VII shows how the same generated code runs in two modes: InProcess (a BackgroundService with Channel<T>, no queue, no S3 — for development) and Distributed (real queues, real S3 — for production). The abstraction layer that makes this possible: IS3Client, IQueuePublisher, ITaskProgressTracker, IDistributedLock.