Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

Resilience

"Distributed systems don't fail gracefully by default. You have to engineer every failure mode — retry, deduplication, locking, cancellation, compensation failure, orphan cleanup — and then generate the code so nobody forgets one."


Retry with Polly

Every step in a saga can fail transiently. The network drops, the database times out, the downstream service returns 503. Retrying is not optional — it is infrastructure. But retry policies vary per step: an S3 upload might tolerate five retries with exponential backoff, while a payment capture should retry exactly once with no delay.

The [RetryPolicy] attribute declares retry behavior at the step level:

[SagaStep(1)]
[RetryPolicy(MaxRetries = 5, BackoffType = BackoffType.Exponential,
             BaseDelayMs = 200, OnRetryExhausted = RetryExhaustedAction.Compensate)]
public async Task<UploadResult> UploadToS3Async(
    CreateZipRequest request, CancellationToken ct)
{
    // ...
}

[SagaStep(2)]
[RetryPolicy(MaxRetries = 1, BackoffType = BackoffType.Constant,
             BaseDelayMs = 0, OnRetryExhausted = RetryExhaustedAction.Fail)]
public async Task<PaymentResult> CapturePaymentAsync(
    UploadResult previous, CancellationToken ct)
{
    // ...
}

The source generator reads these attributes and produces an IAsyncPolicy for each step. The generated code varies by BackoffType:

// ── Generated: CreateZipSaga.Policies.g.cs ──

public static class CreateZipSagaPolicies
{
    // BackoffType.Exponential → 200ms, 400ms, 800ms, 1600ms, 3200ms
    public static IAsyncPolicy UploadToS3Policy { get; } =
        Policy.Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 5,
                sleepDurationProvider: attempt =>
                    TimeSpan.FromMilliseconds(200 * Math.Pow(2, attempt - 1)),
                onRetry: (exception, delay, attempt, context) =>
                {
                    var logger = context.GetLogger();
                    logger.LogWarning(exception,
                        "Step UploadToS3 retry {Attempt}/5 after {Delay}ms",
                        attempt, delay.TotalMilliseconds);
                });

    // BackoffType.Constant → 0ms, 0ms (one retry, no delay)
    public static IAsyncPolicy CapturePaymentPolicy { get; } =
        Policy.Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 1,
                sleepDurationProvider: _ => TimeSpan.Zero,
                onRetry: (exception, delay, attempt, context) =>
                {
                    var logger = context.GetLogger();
                    logger.LogWarning(exception,
                        "Step CapturePayment retry {Attempt}/1 after {Delay}ms",
                        attempt, delay.TotalMilliseconds);
                });
}

For BackoffType.Linear, the generated sleep duration is BaseDelayMs * attempt — 200ms, 400ms, 600ms, 800ms, etc.

Each retry increments SagaStepRecord.AttemptCount, so the data model always reflects how many times a step was attempted. When retries are exhausted, the OnRetryExhausted action determines what happens:

Action Behavior
Compensate Trigger compensation of all completed steps in reverse order (default)
Fail Mark task as Failed immediately, no compensation
DeadLetter Publish the failed message to a dead-letter queue for manual inspection

Circuit Breaker

For steps calling external services that might be down entirely, a circuit breaker prevents hammering a dead endpoint:

[SagaStep(3)]
[RetryPolicy(MaxRetries = 3, BackoffType = BackoffType.Exponential, BaseDelayMs = 500)]
[CircuitBreaker(nameof(NotifyWebhookAsync), FailureThreshold = 5, DurationSeconds = 30)]
public async Task NotifyWebhookAsync(
    ProcessResult result, CancellationToken ct)
{
    // ...
}

The generator wraps the retry policy inside a circuit breaker:

// ── Generated: CreateZipSagaPolicies (circuit breaker addition) ──

public static IAsyncPolicy NotifyWebhookPolicy { get; } =
    Policy.WrapAsync(
        // Outer: circuit breaker
        Policy.Handle<Exception>()
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: 5,
                durationOfBreak: TimeSpan.FromSeconds(30),
                onBreak: (ex, duration) =>
                    Log.Warning("Circuit open for NotifyWebhook: {Duration}s", duration.TotalSeconds),
                onReset: () =>
                    Log.Information("Circuit closed for NotifyWebhook")),
        // Inner: retry
        Policy.Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 3,
                sleepDurationProvider: attempt =>
                    TimeSpan.FromMilliseconds(500 * Math.Pow(2, attempt - 1))));

The circuit breaker state is shared across all instances of the same saga step within the process. When the circuit is open, the step fails immediately without consuming retries.


The Problem

Message queues guarantee at-least-once delivery. The broker delivers a message, the consumer processes it, the ACK is lost in transit — the broker redelivers. The step executes twice. If that step charges a credit card or uploads a file, the user pays twice or gets duplicate data.

The Solution

Every step execution is guarded by an idempotency key:

{TaskId}:{StepName}:{AttemptNumber}

For example: d4f2a1b3-...:UploadToS3:1. The key is checked before execution and set atomically on completion.

The abstraction:

public interface IIdempotencyStore
{
    /// <summary>
    /// Attempts to claim the key. Returns true if this is the first claim.
    /// Returns false if the key already exists (duplicate execution).
    /// </summary>
    Task<bool> TryClaimAsync(string key, TimeSpan ttl, CancellationToken ct);
}

The Redis implementation uses SETNX with a TTL:

public class RedisIdempotencyStore : IIdempotencyStore
{
    private readonly IDatabase _redis;

    public async Task<bool> TryClaimAsync(string key, TimeSpan ttl, CancellationToken ct)
    {
        // SETNX is atomic — only one caller wins
        return await _redis.StringSetAsync(
            $"idempotency:{key}",
            DateTimeOffset.UtcNow.ToString("O"),
            ttl,
            When.NotExists);
    }
}

The generator wraps every step execution with the idempotency check:

// ── Generated: CreateZipSagaOrchestrator.g.cs (step execution) ──

private async Task ExecuteStepAsync<TResult>(
    DistributedTaskInstance task,
    string stepName,
    int attemptNumber,
    Func<CancellationToken, Task<TResult>> execute,
    IAsyncPolicy policy,
    CancellationToken ct)
{
    var idempotencyKey = $"{task.Id}:{stepName}:{attemptNumber}";

    if (!await _idempotencyStore.TryClaimAsync(idempotencyKey, TimeSpan.FromHours(24), ct))
    {
        _logger.LogInformation(
            "Step {Step} attempt {Attempt} already executed (idempotency hit), skipping",
            stepName, attemptNumber);
        return;
    }

    var stepRecord = task.Steps.First(s => s.StepName == stepName);
    stepRecord.AttemptCount = attemptNumber;
    stepRecord.Status = StepStatus.Running;
    stepRecord.StartedAt = DateTimeOffset.UtcNow;

    try
    {
        await policy.ExecuteAsync(async (ctx) =>
        {
            var result = await execute(ct);
            stepRecord.StepData = JsonSerializer.Serialize(result);
            stepRecord.Status = StepStatus.Completed;
            stepRecord.CompletedAt = DateTimeOffset.UtcNow;
        }, new Context());
    }
    catch (Exception ex)
    {
        stepRecord.Status = StepStatus.Failed;
        stepRecord.ErrorMessage = ex.Message;
        throw;
    }
}

The TTL on the idempotency key is configurable. Twenty-four hours is the default — long enough that redelivery after a crash is caught, short enough that the key store does not grow unbounded.


The Problem

Multiple workers consume from the same queue. A message becomes visible to two workers simultaneously — or a redelivered message arrives while the first processing attempt is still running. Two workers execute the same step in parallel: data corruption, double charges, race conditions.

The Solution

Before processing a task, the worker acquires a distributed lock. The lock key is the task ID. Only one worker can hold the lock at a time.

The abstraction:

public interface IDistributedLock
{
    /// <summary>
    /// Attempts to acquire a lock. Returns an IAsyncDisposable that releases
    /// the lock on disposal. Returns null if the lock is already held.
    /// </summary>
    Task<IAsyncDisposable?> TryAcquireAsync(
        string resource,
        TimeSpan expiry,
        CancellationToken ct);
}

The Redis implementation uses RedLock — the standard distributed locking algorithm for Redis:

public class RedisDistributedLock : IDistributedLock
{
    private readonly IDistributedLockFactory _lockFactory;

    public async Task<IAsyncDisposable?> TryAcquireAsync(
        string resource, TimeSpan expiry, CancellationToken ct)
    {
        var redLock = await _lockFactory.CreateLockAsync(
            resource: $"saga-lock:{resource}",
            expiryTime: expiry,
            waitTime: TimeSpan.Zero,   // don't wait — fail immediately
            retryTime: TimeSpan.Zero);

        if (!redLock.IsAcquired)
        {
            return null;  // another worker holds the lock
        }

        return redLock;  // IRedLock implements IAsyncDisposable
    }
}

The generated consumer code acquires the lock before processing:

// ── Generated: CreateZipSagaConsumer.g.cs ──

public async Task HandleAsync(SagaMessage message, CancellationToken ct)
{
    await using var lockHandle = await _distributedLock.TryAcquireAsync(
        resource: message.TaskId.ToString(),
        expiry: TimeSpan.FromMinutes(5),
        ct);

    if (lockHandle is null)
    {
        _logger.LogInformation(
            "Task {TaskId} is locked by another worker, requeueing",
            message.TaskId);

        // NACK without requeue delay — the broker will redeliver later
        throw new TaskLockedException(message.TaskId);
    }

    // Lock acquired — safe to proceed
    var task = await _repository.GetByIdAsync(message.TaskId, ct)
        ?? throw new TaskNotFoundException(message.TaskId);

    await _orchestrator.ExecuteNextStepAsync(task, ct);
    await _unitOfWork.SaveChangesAsync(ct);
}

Key design decisions:

  • Lock expiry: configurable per saga (default 5 minutes). If a worker crashes while holding the lock, the key expires and another worker can pick up the task. This is dead-lock detection by timeout.
  • No wait: waitTime: TimeSpan.Zero means the lock acquisition fails immediately if the lock is held. The message is NACKed and redelivered later. No thread sits blocked waiting.
  • Lock scope: the lock covers the entire task, not individual steps. This prevents partial concurrent execution.

When lock acquisition fails, the message is rejected without acknowledgement. The broker redelivers it after its visibility timeout. The worker that holds the lock will finish processing, release the lock, and the redelivered message either finds the task already completed (no-op via idempotency) or acquires the lock and continues.


Cancellation

Some tasks take minutes. The user changes their mind, the request is no longer needed, the admin wants to abort a runaway process. Cancellation must be first-class.

The [Cancellable] attribute on a saga definition generates a DELETE endpoint and the cancellation machinery:

[DistributedTask("CreateZip")]
[Cancellable]
public partial class CreateZipSaga
{
    // ...
}

Generated Cancel Endpoint

// ── Generated: CreateZipSagaEndpoints.g.cs (cancel addition) ──

app.MapDelete("/api/distributed-tasks/create-zip/{taskId:guid}", async (
    Guid taskId,
    ICreateZipSagaOrchestrator orchestrator,
    CancellationToken ct) =>
{
    var result = await orchestrator.CancelAsync(taskId, ct);
    return result switch
    {
        CancelResult.Cancelled => Results.Ok(new { Status = "Cancelled" }),
        CancelResult.NotFound => Results.NotFound(),
        CancelResult.AlreadyCompleted => Results.Conflict(
            new { Error = "Task already completed, cannot cancel" }),
        CancelResult.AlreadyCancelled => Results.Ok(
            new { Status = "Already cancelled" }),
        _ => Results.StatusCode(500)
    };
});

Generated Cancel Flow

The orchestrator's CancelAsync method does the following:

// ── Generated: CreateZipSagaOrchestrator.g.cs (cancel flow) ──

public async Task<CancelResult> CancelAsync(Guid taskId, CancellationToken ct)
{
    var task = await _repository.GetByIdAsync(taskId, ct);
    if (task is null) return CancelResult.NotFound;
    if (task.Status == DistributedTaskStatus.Completed) return CancelResult.AlreadyCompleted;
    if (task.Status == DistributedTaskStatus.Cancelled) return CancelResult.AlreadyCancelled;

    // 1. Signal cancellation to the running step
    _cancellationRegistry.Cancel(taskId);

    // 2. Record cancellation
    task.CancelledAt = DateTimeOffset.UtcNow;
    task.Status = DistributedTaskStatus.Cancelled;
    task.AuditLog.Add(new TaskAuditEntry
    {
        Action = "Cancelled",
        Timestamp = DateTimeOffset.UtcNow,
        Details = "Cancellation requested by user"
    });

    // 3. Compensate completed steps in reverse order
    var completedSteps = task.Steps
        .Where(s => s.Status == StepStatus.Completed)
        .OrderByDescending(s => s.Order)
        .ToList();

    foreach (var step in completedSteps)
    {
        await CompensateStepAsync(task, step, ct);
    }

    await _unitOfWork.SaveChangesAsync(ct);
    return CancelResult.Cancelled;
}

The _cancellationRegistry is a ConcurrentDictionary<Guid, CancellationTokenSource>. When a step starts, the orchestrator registers a CancellationTokenSource for the task ID. The CancelAsync method calls Cancel() on that source, which triggers the CancellationToken passed into the currently running step. The step can check ct.IsCancellationRequested or let the token propagate through async calls.

After the running step exits (either by completing or throwing OperationCanceledException), all previously completed steps are compensated in reverse order — the same compensation logic used for failures.


Compensation Failure

Compensation is just code. It can fail. The S3 DeleteObjectAsync call might time out. The database rollback might deadlock. The downstream service that needs to be notified of the reversal might be down.

When compensation itself fails, the system follows a five-step escalation:

Step 1: Retry Compensation

Compensation gets its own retry policy. By default, it reuses the step's [RetryPolicy], but a separate [CompensationRetryPolicy] can override:

[SagaStep(1)]
[RetryPolicy(MaxRetries = 5, BackoffType = BackoffType.Exponential, BaseDelayMs = 200)]
[CompensationRetryPolicy(MaxRetries = 3, BackoffType = BackoffType.Constant, BaseDelayMs = 1000)]
public async Task<UploadResult> UploadToS3Async(
    CreateZipRequest request, CancellationToken ct)
{ /* ... */ }

[Compensate(nameof(UploadToS3Async))]
public async Task CompensateUploadAsync(
    UploadResult result, CancellationToken ct)
{
    await _s3Client.DeleteObjectAsync(result.Bucket, result.Key, ct);
}

The generated compensation retry code:

// ── Generated: CreateZipSagaOrchestrator.g.cs (compensation retry) ──

private async Task CompensateStepAsync(
    DistributedTaskInstance task,
    SagaStepRecord stepRecord,
    CancellationToken ct)
{
    stepRecord.Status = StepStatus.Compensating;
    var compensationPolicy = GetCompensationPolicy(stepRecord.StepName);

    try
    {
        await compensationPolicy.ExecuteAsync(async (ctx) =>
        {
            await InvokeCompensationAsync(stepRecord.StepName, stepRecord.StepData, ct);
            stepRecord.Status = StepStatus.Compensated;
        }, new Context());
    }
    catch (Exception ex)
    {
        // Step 2: Retry exhausted — record failure
        stepRecord.Status = StepStatus.CompensationFailed;

        // Step 3: Record error details for debugging
        stepRecord.CompensationError = ex.Message;
        stepRecord.TraceId = Activity.Current?.TraceId.ToString();

        _logger.LogCritical(ex,
            "Compensation FAILED for step {Step} on task {TaskId}. " +
            "TraceId: {TraceId}. Manual intervention required.",
            stepRecord.StepName, task.Id, stepRecord.TraceId);

        // Step 4: Escalate task status
        task.Status = DistributedTaskStatus.CompensationFailed;

        // Step 5: Audit entry with full context
        task.AuditLog.Add(new TaskAuditEntry
        {
            Action = "CompensationFailed",
            Timestamp = DateTimeOffset.UtcNow,
            Details = JsonSerializer.Serialize(new
            {
                StepName = stepRecord.StepName,
                Error = ex.Message,
                TraceId = stepRecord.TraceId,
                AttemptCount = stepRecord.AttemptCount,
                StackTrace = ex.StackTrace
            })
        });
    }
}

The CompensationFailed status on the task is a terminal state. The system does not retry further. It does not silently swallow the failure. It records everything — the error message, the trace ID for correlating with distributed tracing, the full stack trace in the audit log — and stops.

An operator must inspect the audit log, understand what happened, manually fix the inconsistency (delete the orphaned S3 object, reverse the charge, etc.), and then mark the task as resolved. This is the correct behavior: when automated recovery fails, the worst thing you can do is keep retrying in a loop or pretend it never happened.


The Problem

Consider the following sequence:

  1. Step 1 uploads a zip file to S3. Success.
  2. The process crashes before the SagaStepRecord is updated to Completed.
  3. On restart, the saga retries step 1 — uploading a second copy.
  4. The first upload is now an orphan: it exists in S3 but no task record references it.

Or the inverse: compensation tries to delete the S3 object but fails. The task is marked CompensationFailed. The S3 object remains.

Over time, orphaned objects accumulate. Storage costs grow. Nobody notices until the bill arrives.

The Solution

The generator produces an abstract IHostedService base class for reconciliation. This follows the Generation Gap pattern — the generator provides the infrastructure, the developer implements the cleanup strategy:

// ── Generated: CreateZipReconciliationServiceBase.g.cs ──

public abstract class CreateZipReconciliationServiceBase : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger _logger;

    /// <summary>
    /// How often the reconciliation runs. Default: every 6 hours.
    /// </summary>
    protected virtual TimeSpan Interval => TimeSpan.FromHours(6);

    /// <summary>
    /// The S3 prefix where this saga stores its objects.
    /// </summary>
    protected virtual string S3Prefix => "create-zip/";

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await using var scope = _scopeFactory.CreateAsyncScope();
                var s3Client = scope.ServiceProvider.GetRequiredService<IAmazonS3>();
                var repository = scope.ServiceProvider
                    .GetRequiredService<IDistributedTaskInstanceRepository>();

                // 1. List all S3 objects under the saga's prefix
                var s3Objects = await ListAllObjectsAsync(s3Client, S3Prefix, stoppingToken);

                // 2. Get all active/recent task IDs that reference S3 keys
                var activeKeys = await GetActiveS3KeysAsync(repository, stoppingToken);

                // 3. Find orphans: S3 objects not referenced by any active task
                var orphans = s3Objects
                    .Where(obj => !activeKeys.Contains(obj.Key))
                    .ToList();

                if (orphans.Count > 0)
                {
                    _logger.LogWarning(
                        "Found {OrphanCount} orphaned S3 objects under prefix {Prefix}",
                        orphans.Count, S3Prefix);

                    // 4. Developer decides what to do with orphans
                    await CleanupOrphansAsync(orphans, s3Client, stoppingToken);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Reconciliation cycle failed");
            }

            await Task.Delay(Interval, stoppingToken);
        }
    }

    /// <summary>
    /// Implement this to decide what to do with orphaned S3 objects.
    /// Options: delete immediately, move to archive bucket, log for review.
    /// </summary>
    protected abstract Task CleanupOrphansAsync(
        List<S3Object> orphans,
        IAmazonS3 s3Client,
        CancellationToken ct);

    private async Task<HashSet<string>> GetActiveS3KeysAsync(
        IDistributedTaskInstanceRepository repository,
        CancellationToken ct)
    {
        // Query all non-archived tasks, extract S3 keys from step data
        var tasks = await repository.GetActiveTasksAsync("CreateZip", ct);
        var keys = new HashSet<string>();

        foreach (var task in tasks)
        {
            foreach (var step in task.Steps)
            {
                if (step.StepData is not null)
                {
                    var data = JsonSerializer.Deserialize<JsonElement>(step.StepData);
                    if (data.TryGetProperty("Key", out var keyProp))
                    {
                        keys.Add(keyProp.GetString()!);
                    }
                }
            }
        }

        return keys;
    }

    private async Task<List<S3Object>> ListAllObjectsAsync(
        IAmazonS3 s3Client, string prefix, CancellationToken ct)
    {
        var objects = new List<S3Object>();
        string? continuationToken = null;

        do
        {
            var response = await s3Client.ListObjectsV2Async(new ListObjectsV2Request
            {
                BucketName = _bucketName,
                Prefix = prefix,
                ContinuationToken = continuationToken
            }, ct);

            objects.AddRange(response.S3Objects);
            continuationToken = response.IsTruncated ? response.NextContinuationToken : null;
        }
        while (continuationToken is not null);

        return objects;
    }
}

The developer implements the abstract method:

// ── Handwritten: CreateZipReconciliationService.cs ──

public class CreateZipReconciliationService : CreateZipReconciliationServiceBase
{
    protected override TimeSpan Interval => TimeSpan.FromHours(12);

    protected override async Task CleanupOrphansAsync(
        List<S3Object> orphans,
        IAmazonS3 s3Client,
        CancellationToken ct)
    {
        foreach (var orphan in orphans)
        {
            // Only delete objects older than 48 hours
            // (recent orphans might just be from in-flight tasks)
            if (orphan.LastModified < DateTimeOffset.UtcNow.AddHours(-48))
            {
                _logger.LogInformation(
                    "Deleting orphaned S3 object: {Key} (last modified: {Modified})",
                    orphan.Key, orphan.LastModified);

                await s3Client.DeleteObjectAsync(_bucketName, orphan.Key, ct);
            }
            else
            {
                _logger.LogInformation(
                    "Skipping recent orphan: {Key} (last modified: {Modified})",
                    orphan.Key, orphan.LastModified);
            }
        }
    }
}

This is the Generation Gap pattern: the generated base class handles the scanning, listing, and cross-referencing. The developer controls the cleanup policy. Some teams delete orphans immediately. Others archive to a cold storage bucket. Others just log and alert, leaving deletion to a human. The generator does not make that decision — it gives you the infrastructure to make it yourself.

Registration is one line:

services.AddHostedService<CreateZipReconciliationService>();

How It All Fits Together

None of these mechanisms exist in isolation. A single step execution passes through all of them:

  1. Message arrives from the queue
  2. Distributed lock acquired on the task ID (or message is requeued)
  3. Idempotency check on {TaskId}:{StepName}:{AttemptNumber} (or step is skipped)
  4. Step executes inside a Polly policy (retry with backoff, optional circuit breaker)
  5. On success: step record updated, lock released, next message published
  6. On retry exhaustion: compensation triggered, each compensation step retried with its own policy
  7. On compensation failure: terminal state recorded, operator alerted
  8. In the background: reconciliation service cleans up any orphaned resources

All generated. The developer writes the step logic and the compensation logic. The DSL generates the retry policies, the idempotency guards, the lock acquisition, the cancellation wiring, the compensation retry loop, and the reconciliation base class.


What's Next

Part IX covers real-time progress — SignalR hubs, Redis pub/sub, and how the client knows what is happening inside the saga while it runs.