Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

Observability

"If you can't trace a task from the API submit through the queue into the worker and back, you don't have observability — you have hope."


OpenTelemetry Integration

Every saga step is an OpenTelemetry span. The source generator wraps each step execution inside an Activity created from a generated ActivitySource, so you get distributed tracing for free — no manual instrumentation.

Generated ActivitySource

// ── Generated: CreateZipFromFilesActivitySource.g.cs ──
public static partial class CreateZipFromFilesActivitySource
{
    public static readonly ActivitySource Source =
        new("DistributedTask.CreateZipFromFiles", "1.0.0");
}

Step Span Wrapping

The orchestrator kernel wraps every step execution with a child span:

// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
private async Task<StepResult> ExecuteStepWithTracing(
    SagaStepRecord stepRecord,
    Func<SagaStepContext, Task<StepResult>> executeFunc,
    SagaStepContext context)
{
    using var activity = CreateZipFromFilesActivitySource.Source.StartActivity(
        $"DistributedTask.CreateZipFromFiles.{stepRecord.StepName}",
        ActivityKind.Internal,
        parentContext: _parentActivity?.Context ?? default);

    if (activity is not null)
    {
        activity.SetTag("task.id", context.TaskId.ToString());
        activity.SetTag("task.name", "CreateZipFromFiles");
        activity.SetTag("step.name", stepRecord.StepName);
        activity.SetTag("step.order", stepRecord.Order);
        activity.SetTag("step.attempt", stepRecord.AttemptCount);
        activity.SetTag("step.status", "Running");

        activity.AddEvent(new ActivityEvent("StepStarted", tags: new()
        {
            { "step.name", stepRecord.StepName },
            { "step.attempt", stepRecord.AttemptCount },
        }));
    }

    var stopwatch = Stopwatch.StartNew();
    StepResult result;

    try
    {
        result = await executeFunc(context);
        stopwatch.Stop();

        activity?.SetTag("step.status", "Completed");
        activity?.SetTag("step.duration_ms", stopwatch.ElapsedMilliseconds);
        activity?.AddEvent(new ActivityEvent("StepCompleted", tags: new()
        {
            { "step.duration_ms", stopwatch.ElapsedMilliseconds },
        }));
        activity?.SetStatus(ActivityStatusCode.Ok);
    }
    catch (Exception ex)
    {
        stopwatch.Stop();

        activity?.SetTag("step.status", "Failed");
        activity?.SetTag("step.error", ex.GetType().Name);
        activity?.SetTag("step.duration_ms", stopwatch.ElapsedMilliseconds);
        activity?.AddEvent(new ActivityEvent("StepFailed", tags: new()
        {
            { "error.type", ex.GetType().FullName! },
            { "error.message", ex.Message },
        }));
        activity?.SetStatus(ActivityStatusCode.Error, ex.Message);

        throw;
    }

    return result;
}

Span Hierarchy

The parent span covers the entire task execution. Each step is a child span. Parallel steps produce sibling spans under the same parent:

DistributedTask.CreateZipFromFiles                          [============================]
  ├── DistributedTask.CreateZipFromFiles.UploadSourceFiles  [========]
  ├── DistributedTask.CreateZipFromFiles.CreateZipArchive       [============]
  └── DistributedTask.CreateZipFromFiles.UploadResult                  [======]

For tasks with [ParallelStepGroup], the sibling spans overlap:

DistributedTask.TranscodeVideo                              [============================]
  ├── DistributedTask.TranscodeVideo.DownloadSource         [====]
  ├── DistributedTask.TranscodeVideo.Transcode720p              [==========]
  ├── DistributedTask.TranscodeVideo.Transcode1080p             [============]
  ├── DistributedTask.TranscodeVideo.Transcode4K                [==============]
  └── DistributedTask.TranscodeVideo.UploadResults                           [====]

Compensation Spans

When compensation runs, the generator creates separate spans marked with ActivityKind.Internal and an explicit Compensating event:

using var compensationActivity = CreateZipFromFilesActivitySource.Source.StartActivity(
    $"DistributedTask.CreateZipFromFiles.{stepRecord.StepName}.Compensate",
    ActivityKind.Internal);

compensationActivity?.AddEvent(new ActivityEvent("Compensating"));
compensationActivity?.SetTag("step.name", stepRecord.StepName);
compensationActivity?.SetTag("step.status", "Compensating");

Trace Context Propagation

The API and the Worker run in different processes. Without trace context propagation, you get two disconnected traces. The generator solves this by injecting trace context into queue message headers.

Publisher Side — Injecting Context

When the API-side orchestrator publishes a message to the queue, it injects the current Activity.Context into the message headers using the W3C Trace Context standard:

// ── Generated: CreateZipFromFilesQueuePublisher.g.cs (excerpt) ──
public async Task PublishAsync(CreateZipWorkerMessage message, CancellationToken ct)
{
    var headers = new Dictionary<string, string>();

    // Inject W3C trace context into message headers
    if (Activity.Current is { } activity)
    {
        headers["traceparent"] = $"00-{activity.TraceId}-{activity.SpanId}-01";

        if (activity.TraceStateString is not null)
            headers["tracestate"] = activity.TraceStateString;
    }

    var envelope = new QueueEnvelope<CreateZipWorkerMessage>
    {
        Body = message,
        Headers = headers,
        PublishedAt = DateTimeOffset.UtcNow,
    };

    await _queueClient.PublishAsync(
        queue: "create-zip-from-files",
        envelope: envelope,
        ct: ct);
}

Consumer Side — Restoring Context

When the Worker picks up the message, it extracts the trace context and creates a linked span so the entire flow appears as one trace:

// ── Generated: CreateZipFromFilesQueueConsumer.g.cs (excerpt) ──
public async Task HandleAsync(QueueEnvelope<CreateZipWorkerMessage> envelope, CancellationToken ct)
{
    // Extract W3C trace context from message headers
    ActivityContext parentContext = default;
    if (envelope.Headers.TryGetValue("traceparent", out var traceparent))
    {
        parentContext = ActivityContext.Parse(traceparent,
            envelope.Headers.GetValueOrDefault("tracestate"));
    }

    // Start a new activity linked to the API-side parent
    using var activity = CreateZipFromFilesActivitySource.Source.StartActivity(
        "DistributedTask.CreateZipFromFiles.WorkerProcess",
        ActivityKind.Consumer,
        parentContext: parentContext);

    activity?.SetTag("task.id", envelope.Body.TaskId.ToString());
    activity?.SetTag("messaging.system", "rabbitmq");
    activity?.SetTag("messaging.destination", "create-zip-from-files");
    activity?.SetTag("messaging.operation", "process");

    await _orchestrator.ResumeAsync(envelope.Body, ct);
}

The Result

A single trace now spans the entire distributed flow:

Diagram

The W3C Trace Context propagation means any OpenTelemetry-compatible backend — Jaeger, Zipkin, Azure Monitor, Datadog, Honeycomb — shows the full distributed trace as a single waterfall.


Structured Logging

Every step transition generates a structured log entry with semantic properties. The generator emits ILogger<T> calls using [LoggerMessage] source-generated high-performance logging.

Generated Log Definitions

// ── Generated: CreateZipFromFilesLogMessages.g.cs ──
public static partial class CreateZipFromFilesLogMessages
{
    [LoggerMessage(
        EventId = 10001,
        Level = LogLevel.Information,
        Message = "Task {TaskName}/{TaskId} step {StepName} started (attempt {AttemptCount}/{MaxAttempts})")]
    public static partial void StepStarted(
        ILogger logger,
        string taskName,
        Guid taskId,
        string stepName,
        int stepOrder,
        int attemptCount,
        int maxAttempts);

    [LoggerMessage(
        EventId = 10002,
        Level = LogLevel.Information,
        Message = "Task {TaskName}/{TaskId} step {StepName} completed in {DurationMs}ms")]
    public static partial void StepCompleted(
        ILogger logger,
        string taskName,
        Guid taskId,
        string stepName,
        int stepOrder,
        long durationMs,
        string status);

    [LoggerMessage(
        EventId = 10003,
        Level = LogLevel.Error,
        Message = "Task {TaskName}/{TaskId} step {StepName} failed: {ErrorType} (attempt {AttemptCount}/{MaxAttempts})")]
    public static partial void StepFailed(
        ILogger logger,
        Exception exception,
        string taskName,
        Guid taskId,
        string stepName,
        int stepOrder,
        string errorType,
        int attemptCount,
        int maxAttempts);

    [LoggerMessage(
        EventId = 10004,
        Level = LogLevel.Warning,
        Message = "Task {TaskName}/{TaskId} step {StepName} compensating")]
    public static partial void StepCompensating(
        ILogger logger,
        string taskName,
        Guid taskId,
        string stepName,
        int stepOrder);

    [LoggerMessage(
        EventId = 10005,
        Level = LogLevel.Information,
        Message = "Task {TaskName}/{TaskId} completed in {DurationMs}ms ({StepCount} steps)")]
    public static partial void TaskCompleted(
        ILogger logger,
        string taskName,
        Guid taskId,
        long durationMs,
        int stepCount);

    [LoggerMessage(
        EventId = 10006,
        Level = LogLevel.Error,
        Message = "Task {TaskName}/{TaskId} failed after {StepCount} steps")]
    public static partial void TaskFailed(
        ILogger logger,
        Exception exception,
        string taskName,
        Guid taskId,
        int stepCount);
}

Usage in the Orchestrator

// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
private async Task<StepResult> ExecuteStepCore(SagaStepRecord stepRecord, SagaStepContext context)
{
    CreateZipFromFilesLogMessages.StepStarted(
        _logger,
        taskName: "CreateZipFromFiles",
        taskId: context.TaskId,
        stepName: stepRecord.StepName,
        stepOrder: stepRecord.Order,
        attemptCount: stepRecord.AttemptCount,
        maxAttempts: 3);

    var stopwatch = Stopwatch.StartNew();

    try
    {
        var result = await ExecuteStepWithTracing(stepRecord, GetStepExecutor(stepRecord), context);
        stopwatch.Stop();

        CreateZipFromFilesLogMessages.StepCompleted(
            _logger,
            taskName: "CreateZipFromFiles",
            taskId: context.TaskId,
            stepName: stepRecord.StepName,
            stepOrder: stepRecord.Order,
            durationMs: stopwatch.ElapsedMilliseconds,
            status: "Completed");

        return result;
    }
    catch (Exception ex)
    {
        stopwatch.Stop();

        CreateZipFromFilesLogMessages.StepFailed(
            _logger,
            exception: ex,
            taskName: "CreateZipFromFiles",
            taskId: context.TaskId,
            stepName: stepRecord.StepName,
            stepOrder: stepRecord.Order,
            errorType: ex.GetType().Name,
            attemptCount: stepRecord.AttemptCount,
            maxAttempts: 3);

        throw;
    }
}

What You See in the Console

[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles started (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles completed in 1234ms
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive started (attempt 1/3)
[ERR] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive failed: IOException (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive started (attempt 2/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive completed in 2456ms
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadResult started (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadResult completed in 890ms
[INF] Task CreateZipFromFiles/a1b2c3d4 completed in 4580ms (3 steps)

When compensation runs:

[ERR] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive failed: IOException (attempt 3/3)
[WRN] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles compensating
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles compensated in 340ms
[ERR] Task CreateZipFromFiles/a1b2c3d4 failed after 2 steps

Every log line carries structured properties (TaskId, TaskName, StepName, StepOrder, AttemptCount, DurationMs, Status, ErrorType) — queryable in Seq, Loki, Elasticsearch, or any structured logging sink.


Metrics

The source generator creates a dedicated Meter per task with counters and histograms that follow OpenTelemetry semantic conventions.

Generated Meter

// ── Generated: CreateZipFromFilesMetrics.g.cs ──
public sealed class CreateZipFromFilesMetrics : IDisposable
{
    private readonly Meter _meter;

    // Counters
    private readonly Counter<long> _tasksSubmitted;
    private readonly Counter<long> _tasksCompleted;
    private readonly Counter<long> _tasksFailed;
    private readonly Counter<long> _compensationCount;

    // Histograms
    private readonly Histogram<double> _stepDuration;
    private readonly Histogram<int> _stepAttempts;

    public CreateZipFromFilesMetrics(IMeterFactory meterFactory)
    {
        _meter = meterFactory.Create("DistributedTask.CreateZipFromFiles");

        _tasksSubmitted = _meter.CreateCounter<long>(
            "distributed_task.submitted",
            unit: "{task}",
            description: "Number of distributed tasks submitted");

        _tasksCompleted = _meter.CreateCounter<long>(
            "distributed_task.completed",
            unit: "{task}",
            description: "Number of distributed tasks completed successfully");

        _tasksFailed = _meter.CreateCounter<long>(
            "distributed_task.failed",
            unit: "{task}",
            description: "Number of distributed tasks that failed permanently");

        _compensationCount = _meter.CreateCounter<long>(
            "distributed_task.compensation.count",
            unit: "{compensation}",
            description: "Number of compensation sequences triggered");

        _stepDuration = _meter.CreateHistogram<double>(
            "distributed_task.step.duration",
            unit: "ms",
            description: "Step execution duration in milliseconds");

        _stepAttempts = _meter.CreateHistogram<int>(
            "distributed_task.step.attempts",
            unit: "{attempt}",
            description: "Number of retry attempts per step execution");
    }

    public void RecordTaskSubmitted(string taskName)
        => _tasksSubmitted.Add(1, new KeyValuePair<string, object?>("task.name", taskName));

    public void RecordTaskCompleted(string taskName)
        => _tasksCompleted.Add(1, new KeyValuePair<string, object?>("task.name", taskName));

    public void RecordTaskFailed(string taskName)
        => _tasksFailed.Add(1, new KeyValuePair<string, object?>("task.name", taskName));

    public void RecordStepDuration(string taskName, string stepName, double durationMs)
        => _stepDuration.Record(durationMs,
            new KeyValuePair<string, object?>("task.name", taskName),
            new KeyValuePair<string, object?>("step.name", stepName));

    public void RecordStepAttempts(string taskName, string stepName, int attempts)
        => _stepAttempts.Record(attempts,
            new KeyValuePair<string, object?>("task.name", taskName),
            new KeyValuePair<string, object?>("step.name", stepName));

    public void RecordCompensation(string taskName, string stepName)
        => _compensationCount.Add(1,
            new KeyValuePair<string, object?>("task.name", taskName),
            new KeyValuePair<string, object?>("step.name", stepName));

    public void Dispose() => _meter.Dispose();
}

Recording in the Orchestrator

// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──

// On task submission
_metrics.RecordTaskSubmitted("CreateZipFromFiles");

// After step completes
_metrics.RecordStepDuration("CreateZipFromFiles", stepRecord.StepName, stopwatch.ElapsedMilliseconds);
_metrics.RecordStepAttempts("CreateZipFromFiles", stepRecord.StepName, stepRecord.AttemptCount);

// On task completion
_metrics.RecordTaskCompleted("CreateZipFromFiles");

// On permanent failure
_metrics.RecordTaskFailed("CreateZipFromFiles");

// On compensation
_metrics.RecordCompensation("CreateZipFromFiles", stepRecord.StepName);

Prometheus / Grafana

With the OpenTelemetry Prometheus exporter, these metrics are scraped as standard Prometheus metrics:

# HELP distributed_task_submitted_total Number of distributed tasks submitted
distributed_task_submitted_total{task_name="CreateZipFromFiles"} 142

# HELP distributed_task_step_duration_ms Step execution duration in milliseconds
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="500"} 89
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="1000"} 128
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="5000"} 141

# HELP distributed_task_compensation_count_total Number of compensation sequences triggered
distributed_task_compensation_count_total{task_name="CreateZipFromFiles",step_name="CreateZipArchive"} 3

The metrics instance is registered as a singleton in the generated DI extension, so all orchestrator instances share the same counters.


Health Checks

The source generator produces IHealthCheck implementations for every infrastructure dependency the task touches. If the task uses S3, a queue, and Redis, three health checks are generated.

Generated Health Checks

// ── Generated: CreateZipFromFilesHealthChecks.g.cs ──

public class CreateZipFromFilesQueueHealthCheck : IHealthCheck
{
    private readonly IQueueClient _queueClient;

    public CreateZipFromFilesQueueHealthCheck(IQueueClient queueClient)
        => _queueClient = queueClient;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken ct = default)
    {
        try
        {
            await _queueClient.PingAsync("create-zip-from-files", ct);
            return HealthCheckResult.Healthy("Queue 'create-zip-from-files' is reachable");
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy(
                "Queue 'create-zip-from-files' is unreachable",
                exception: ex);
        }
    }
}

public class CreateZipFromFilesS3HealthCheck : IHealthCheck
{
    private readonly IS3Client _s3Client;

    public CreateZipFromFilesS3HealthCheck(IS3Client s3Client)
        => _s3Client = s3Client;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken ct = default)
    {
        try
        {
            await _s3Client.ListBucketsAsync(ct);
            return HealthCheckResult.Healthy("S3 is reachable");
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("S3 is unreachable", exception: ex);
        }
    }
}

public class CreateZipFromFilesRedisHealthCheck : IHealthCheck
{
    private readonly IConnectionMultiplexer _redis;

    public CreateZipFromFilesRedisHealthCheck(IConnectionMultiplexer redis)
        => _redis = redis;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken ct = default)
    {
        try
        {
            var db = _redis.GetDatabase();
            await db.PingAsync();
            return HealthCheckResult.Healthy("Redis is reachable");
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("Redis is unreachable", exception: ex);
        }
    }
}

public class CreateZipFromFilesStuckTasksHealthCheck : IHealthCheck
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly TimeSpan _threshold = TimeSpan.FromSeconds(300); // from TimeoutSeconds

    public CreateZipFromFilesStuckTasksHealthCheck(IServiceScopeFactory scopeFactory)
        => _scopeFactory = scopeFactory;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken ct = default)
    {
        using var scope = _scopeFactory.CreateScope();
        var uow = scope.ServiceProvider.GetRequiredService<IDistributedTaskUnitOfWork>();

        var cutoff = DateTimeOffset.UtcNow - _threshold;
        var stuckTasks = await uow.Tasks.Query
            .Where(t => t.TaskName == "CreateZipFromFiles"
                && t.Status == DistributedTaskStatus.Running
                && t.StartedAt < cutoff)
            .CountAsync(ct);

        if (stuckTasks > 0)
        {
            return HealthCheckResult.Degraded(
                $"{stuckTasks} task(s) stuck in Running state for > {_threshold.TotalSeconds}s",
                data: new Dictionary<string, object>
                {
                    ["stuck_count"] = stuckTasks,
                    ["threshold_seconds"] = _threshold.TotalSeconds,
                });
        }

        return HealthCheckResult.Healthy("No stuck tasks");
    }
}

Registration in the DI Extension

All health checks, metrics, and the ActivitySource are wired up in the same generated AddCreateZipFromFilesTask() method:

// ── Generated: CreateZipFromFilesServiceCollectionExtensions.g.cs (excerpt) ──
public static IServiceCollection AddCreateZipFromFilesTask(
    this IServiceCollection services,
    IConfiguration configuration)
{
    // ... orchestrator, queue, S3, etc. (from Part VI) ...

    // ── Observability ──

    // Metrics
    services.AddSingleton<CreateZipFromFilesMetrics>();

    // OpenTelemetry tracing
    services.AddOpenTelemetry()
        .WithTracing(tracing => tracing
            .AddSource("DistributedTask.CreateZipFromFiles"));

    // OpenTelemetry metrics
    services.AddOpenTelemetry()
        .WithMetrics(metrics => metrics
            .AddMeter("DistributedTask.CreateZipFromFiles"));

    // Health checks
    services.AddHealthChecks()
        .AddCheck<CreateZipFromFilesQueueHealthCheck>(
            "create-zip-queue",
            tags: new[] { "distributed-task", "queue" })
        .AddCheck<CreateZipFromFilesS3HealthCheck>(
            "create-zip-s3",
            tags: new[] { "distributed-task", "s3" })
        .AddCheck<CreateZipFromFilesRedisHealthCheck>(
            "create-zip-redis",
            tags: new[] { "distributed-task", "redis" })
        .AddCheck<CreateZipFromFilesStuckTasksHealthCheck>(
            "create-zip-stuck-tasks",
            tags: new[] { "distributed-task", "stuck" });

    return services;
}

Query them at /health:

{
  "status": "Degraded",
  "entries": {
    "create-zip-queue": { "status": "Healthy", "description": "Queue 'create-zip-from-files' is reachable" },
    "create-zip-s3": { "status": "Healthy", "description": "S3 is reachable" },
    "create-zip-redis": { "status": "Healthy", "description": "Redis is reachable" },
    "create-zip-stuck-tasks": {
      "status": "Degraded",
      "description": "2 task(s) stuck in Running state for > 300s",
      "data": { "stuck_count": 2, "threshold_seconds": 300 }
    }
  }
}

Summary

Concern What's Generated Lines
Tracing ActivitySource, span wrapping per step, compensation spans ~80
Propagation traceparent/tracestate injection in publisher, extraction in consumer ~30
Logging [LoggerMessage] definitions, structured calls in orchestrator ~60
Metrics Meter with 4 counters + 2 histograms, recording in orchestrator ~70
Health checks Queue, S3, Redis, stuck tasks ~90
DI registration AddOpenTelemetry, AddHealthChecks, singleton metrics ~20
Total ~350

Zero lines of observability code written by the developer. The DSL attributes ([DistributedTask], [SagaStep], [FileUploadStep]) already carry enough information — task name, step name, queue, timeout — for the generator to emit the full observability stack.


What's Next

Part X covers the Listening Strategy — how the client picks its preferred notification channel (Polling, SSE, WebSocket, SignalR, or Webhook) at submit time, and how the generator produces all five transports powered by a single Redis pub/sub backbone.