Observability
"If you can't trace a task from the API submit through the queue into the worker and back, you don't have observability — you have hope."
OpenTelemetry Integration
Every saga step is an OpenTelemetry span. The source generator wraps each step execution inside an Activity created from a generated ActivitySource, so you get distributed tracing for free — no manual instrumentation.
Generated ActivitySource
// ── Generated: CreateZipFromFilesActivitySource.g.cs ──
public static partial class CreateZipFromFilesActivitySource
{
public static readonly ActivitySource Source =
new("DistributedTask.CreateZipFromFiles", "1.0.0");
}// ── Generated: CreateZipFromFilesActivitySource.g.cs ──
public static partial class CreateZipFromFilesActivitySource
{
public static readonly ActivitySource Source =
new("DistributedTask.CreateZipFromFiles", "1.0.0");
}Step Span Wrapping
The orchestrator kernel wraps every step execution with a child span:
// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
private async Task<StepResult> ExecuteStepWithTracing(
SagaStepRecord stepRecord,
Func<SagaStepContext, Task<StepResult>> executeFunc,
SagaStepContext context)
{
using var activity = CreateZipFromFilesActivitySource.Source.StartActivity(
$"DistributedTask.CreateZipFromFiles.{stepRecord.StepName}",
ActivityKind.Internal,
parentContext: _parentActivity?.Context ?? default);
if (activity is not null)
{
activity.SetTag("task.id", context.TaskId.ToString());
activity.SetTag("task.name", "CreateZipFromFiles");
activity.SetTag("step.name", stepRecord.StepName);
activity.SetTag("step.order", stepRecord.Order);
activity.SetTag("step.attempt", stepRecord.AttemptCount);
activity.SetTag("step.status", "Running");
activity.AddEvent(new ActivityEvent("StepStarted", tags: new()
{
{ "step.name", stepRecord.StepName },
{ "step.attempt", stepRecord.AttemptCount },
}));
}
var stopwatch = Stopwatch.StartNew();
StepResult result;
try
{
result = await executeFunc(context);
stopwatch.Stop();
activity?.SetTag("step.status", "Completed");
activity?.SetTag("step.duration_ms", stopwatch.ElapsedMilliseconds);
activity?.AddEvent(new ActivityEvent("StepCompleted", tags: new()
{
{ "step.duration_ms", stopwatch.ElapsedMilliseconds },
}));
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
stopwatch.Stop();
activity?.SetTag("step.status", "Failed");
activity?.SetTag("step.error", ex.GetType().Name);
activity?.SetTag("step.duration_ms", stopwatch.ElapsedMilliseconds);
activity?.AddEvent(new ActivityEvent("StepFailed", tags: new()
{
{ "error.type", ex.GetType().FullName! },
{ "error.message", ex.Message },
}));
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
return result;
}// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
private async Task<StepResult> ExecuteStepWithTracing(
SagaStepRecord stepRecord,
Func<SagaStepContext, Task<StepResult>> executeFunc,
SagaStepContext context)
{
using var activity = CreateZipFromFilesActivitySource.Source.StartActivity(
$"DistributedTask.CreateZipFromFiles.{stepRecord.StepName}",
ActivityKind.Internal,
parentContext: _parentActivity?.Context ?? default);
if (activity is not null)
{
activity.SetTag("task.id", context.TaskId.ToString());
activity.SetTag("task.name", "CreateZipFromFiles");
activity.SetTag("step.name", stepRecord.StepName);
activity.SetTag("step.order", stepRecord.Order);
activity.SetTag("step.attempt", stepRecord.AttemptCount);
activity.SetTag("step.status", "Running");
activity.AddEvent(new ActivityEvent("StepStarted", tags: new()
{
{ "step.name", stepRecord.StepName },
{ "step.attempt", stepRecord.AttemptCount },
}));
}
var stopwatch = Stopwatch.StartNew();
StepResult result;
try
{
result = await executeFunc(context);
stopwatch.Stop();
activity?.SetTag("step.status", "Completed");
activity?.SetTag("step.duration_ms", stopwatch.ElapsedMilliseconds);
activity?.AddEvent(new ActivityEvent("StepCompleted", tags: new()
{
{ "step.duration_ms", stopwatch.ElapsedMilliseconds },
}));
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
stopwatch.Stop();
activity?.SetTag("step.status", "Failed");
activity?.SetTag("step.error", ex.GetType().Name);
activity?.SetTag("step.duration_ms", stopwatch.ElapsedMilliseconds);
activity?.AddEvent(new ActivityEvent("StepFailed", tags: new()
{
{ "error.type", ex.GetType().FullName! },
{ "error.message", ex.Message },
}));
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
return result;
}Span Hierarchy
The parent span covers the entire task execution. Each step is a child span. Parallel steps produce sibling spans under the same parent:
DistributedTask.CreateZipFromFiles [============================]
├── DistributedTask.CreateZipFromFiles.UploadSourceFiles [========]
├── DistributedTask.CreateZipFromFiles.CreateZipArchive [============]
└── DistributedTask.CreateZipFromFiles.UploadResult [======]DistributedTask.CreateZipFromFiles [============================]
├── DistributedTask.CreateZipFromFiles.UploadSourceFiles [========]
├── DistributedTask.CreateZipFromFiles.CreateZipArchive [============]
└── DistributedTask.CreateZipFromFiles.UploadResult [======]For tasks with [ParallelStepGroup], the sibling spans overlap:
DistributedTask.TranscodeVideo [============================]
├── DistributedTask.TranscodeVideo.DownloadSource [====]
├── DistributedTask.TranscodeVideo.Transcode720p [==========]
├── DistributedTask.TranscodeVideo.Transcode1080p [============]
├── DistributedTask.TranscodeVideo.Transcode4K [==============]
└── DistributedTask.TranscodeVideo.UploadResults [====]DistributedTask.TranscodeVideo [============================]
├── DistributedTask.TranscodeVideo.DownloadSource [====]
├── DistributedTask.TranscodeVideo.Transcode720p [==========]
├── DistributedTask.TranscodeVideo.Transcode1080p [============]
├── DistributedTask.TranscodeVideo.Transcode4K [==============]
└── DistributedTask.TranscodeVideo.UploadResults [====]Compensation Spans
When compensation runs, the generator creates separate spans marked with ActivityKind.Internal and an explicit Compensating event:
using var compensationActivity = CreateZipFromFilesActivitySource.Source.StartActivity(
$"DistributedTask.CreateZipFromFiles.{stepRecord.StepName}.Compensate",
ActivityKind.Internal);
compensationActivity?.AddEvent(new ActivityEvent("Compensating"));
compensationActivity?.SetTag("step.name", stepRecord.StepName);
compensationActivity?.SetTag("step.status", "Compensating");using var compensationActivity = CreateZipFromFilesActivitySource.Source.StartActivity(
$"DistributedTask.CreateZipFromFiles.{stepRecord.StepName}.Compensate",
ActivityKind.Internal);
compensationActivity?.AddEvent(new ActivityEvent("Compensating"));
compensationActivity?.SetTag("step.name", stepRecord.StepName);
compensationActivity?.SetTag("step.status", "Compensating");Trace Context Propagation
The API and the Worker run in different processes. Without trace context propagation, you get two disconnected traces. The generator solves this by injecting trace context into queue message headers.
Publisher Side — Injecting Context
When the API-side orchestrator publishes a message to the queue, it injects the current Activity.Context into the message headers using the W3C Trace Context standard:
// ── Generated: CreateZipFromFilesQueuePublisher.g.cs (excerpt) ──
public async Task PublishAsync(CreateZipWorkerMessage message, CancellationToken ct)
{
var headers = new Dictionary<string, string>();
// Inject W3C trace context into message headers
if (Activity.Current is { } activity)
{
headers["traceparent"] = $"00-{activity.TraceId}-{activity.SpanId}-01";
if (activity.TraceStateString is not null)
headers["tracestate"] = activity.TraceStateString;
}
var envelope = new QueueEnvelope<CreateZipWorkerMessage>
{
Body = message,
Headers = headers,
PublishedAt = DateTimeOffset.UtcNow,
};
await _queueClient.PublishAsync(
queue: "create-zip-from-files",
envelope: envelope,
ct: ct);
}// ── Generated: CreateZipFromFilesQueuePublisher.g.cs (excerpt) ──
public async Task PublishAsync(CreateZipWorkerMessage message, CancellationToken ct)
{
var headers = new Dictionary<string, string>();
// Inject W3C trace context into message headers
if (Activity.Current is { } activity)
{
headers["traceparent"] = $"00-{activity.TraceId}-{activity.SpanId}-01";
if (activity.TraceStateString is not null)
headers["tracestate"] = activity.TraceStateString;
}
var envelope = new QueueEnvelope<CreateZipWorkerMessage>
{
Body = message,
Headers = headers,
PublishedAt = DateTimeOffset.UtcNow,
};
await _queueClient.PublishAsync(
queue: "create-zip-from-files",
envelope: envelope,
ct: ct);
}Consumer Side — Restoring Context
When the Worker picks up the message, it extracts the trace context and creates a linked span so the entire flow appears as one trace:
// ── Generated: CreateZipFromFilesQueueConsumer.g.cs (excerpt) ──
public async Task HandleAsync(QueueEnvelope<CreateZipWorkerMessage> envelope, CancellationToken ct)
{
// Extract W3C trace context from message headers
ActivityContext parentContext = default;
if (envelope.Headers.TryGetValue("traceparent", out var traceparent))
{
parentContext = ActivityContext.Parse(traceparent,
envelope.Headers.GetValueOrDefault("tracestate"));
}
// Start a new activity linked to the API-side parent
using var activity = CreateZipFromFilesActivitySource.Source.StartActivity(
"DistributedTask.CreateZipFromFiles.WorkerProcess",
ActivityKind.Consumer,
parentContext: parentContext);
activity?.SetTag("task.id", envelope.Body.TaskId.ToString());
activity?.SetTag("messaging.system", "rabbitmq");
activity?.SetTag("messaging.destination", "create-zip-from-files");
activity?.SetTag("messaging.operation", "process");
await _orchestrator.ResumeAsync(envelope.Body, ct);
}// ── Generated: CreateZipFromFilesQueueConsumer.g.cs (excerpt) ──
public async Task HandleAsync(QueueEnvelope<CreateZipWorkerMessage> envelope, CancellationToken ct)
{
// Extract W3C trace context from message headers
ActivityContext parentContext = default;
if (envelope.Headers.TryGetValue("traceparent", out var traceparent))
{
parentContext = ActivityContext.Parse(traceparent,
envelope.Headers.GetValueOrDefault("tracestate"));
}
// Start a new activity linked to the API-side parent
using var activity = CreateZipFromFilesActivitySource.Source.StartActivity(
"DistributedTask.CreateZipFromFiles.WorkerProcess",
ActivityKind.Consumer,
parentContext: parentContext);
activity?.SetTag("task.id", envelope.Body.TaskId.ToString());
activity?.SetTag("messaging.system", "rabbitmq");
activity?.SetTag("messaging.destination", "create-zip-from-files");
activity?.SetTag("messaging.operation", "process");
await _orchestrator.ResumeAsync(envelope.Body, ct);
}The Result
A single trace now spans the entire distributed flow:
The W3C Trace Context propagation means any OpenTelemetry-compatible backend — Jaeger, Zipkin, Azure Monitor, Datadog, Honeycomb — shows the full distributed trace as a single waterfall.
Structured Logging
Every step transition generates a structured log entry with semantic properties. The generator emits ILogger<T> calls using [LoggerMessage] source-generated high-performance logging.
Generated Log Definitions
// ── Generated: CreateZipFromFilesLogMessages.g.cs ──
public static partial class CreateZipFromFilesLogMessages
{
[LoggerMessage(
EventId = 10001,
Level = LogLevel.Information,
Message = "Task {TaskName}/{TaskId} step {StepName} started (attempt {AttemptCount}/{MaxAttempts})")]
public static partial void StepStarted(
ILogger logger,
string taskName,
Guid taskId,
string stepName,
int stepOrder,
int attemptCount,
int maxAttempts);
[LoggerMessage(
EventId = 10002,
Level = LogLevel.Information,
Message = "Task {TaskName}/{TaskId} step {StepName} completed in {DurationMs}ms")]
public static partial void StepCompleted(
ILogger logger,
string taskName,
Guid taskId,
string stepName,
int stepOrder,
long durationMs,
string status);
[LoggerMessage(
EventId = 10003,
Level = LogLevel.Error,
Message = "Task {TaskName}/{TaskId} step {StepName} failed: {ErrorType} (attempt {AttemptCount}/{MaxAttempts})")]
public static partial void StepFailed(
ILogger logger,
Exception exception,
string taskName,
Guid taskId,
string stepName,
int stepOrder,
string errorType,
int attemptCount,
int maxAttempts);
[LoggerMessage(
EventId = 10004,
Level = LogLevel.Warning,
Message = "Task {TaskName}/{TaskId} step {StepName} compensating")]
public static partial void StepCompensating(
ILogger logger,
string taskName,
Guid taskId,
string stepName,
int stepOrder);
[LoggerMessage(
EventId = 10005,
Level = LogLevel.Information,
Message = "Task {TaskName}/{TaskId} completed in {DurationMs}ms ({StepCount} steps)")]
public static partial void TaskCompleted(
ILogger logger,
string taskName,
Guid taskId,
long durationMs,
int stepCount);
[LoggerMessage(
EventId = 10006,
Level = LogLevel.Error,
Message = "Task {TaskName}/{TaskId} failed after {StepCount} steps")]
public static partial void TaskFailed(
ILogger logger,
Exception exception,
string taskName,
Guid taskId,
int stepCount);
}// ── Generated: CreateZipFromFilesLogMessages.g.cs ──
public static partial class CreateZipFromFilesLogMessages
{
[LoggerMessage(
EventId = 10001,
Level = LogLevel.Information,
Message = "Task {TaskName}/{TaskId} step {StepName} started (attempt {AttemptCount}/{MaxAttempts})")]
public static partial void StepStarted(
ILogger logger,
string taskName,
Guid taskId,
string stepName,
int stepOrder,
int attemptCount,
int maxAttempts);
[LoggerMessage(
EventId = 10002,
Level = LogLevel.Information,
Message = "Task {TaskName}/{TaskId} step {StepName} completed in {DurationMs}ms")]
public static partial void StepCompleted(
ILogger logger,
string taskName,
Guid taskId,
string stepName,
int stepOrder,
long durationMs,
string status);
[LoggerMessage(
EventId = 10003,
Level = LogLevel.Error,
Message = "Task {TaskName}/{TaskId} step {StepName} failed: {ErrorType} (attempt {AttemptCount}/{MaxAttempts})")]
public static partial void StepFailed(
ILogger logger,
Exception exception,
string taskName,
Guid taskId,
string stepName,
int stepOrder,
string errorType,
int attemptCount,
int maxAttempts);
[LoggerMessage(
EventId = 10004,
Level = LogLevel.Warning,
Message = "Task {TaskName}/{TaskId} step {StepName} compensating")]
public static partial void StepCompensating(
ILogger logger,
string taskName,
Guid taskId,
string stepName,
int stepOrder);
[LoggerMessage(
EventId = 10005,
Level = LogLevel.Information,
Message = "Task {TaskName}/{TaskId} completed in {DurationMs}ms ({StepCount} steps)")]
public static partial void TaskCompleted(
ILogger logger,
string taskName,
Guid taskId,
long durationMs,
int stepCount);
[LoggerMessage(
EventId = 10006,
Level = LogLevel.Error,
Message = "Task {TaskName}/{TaskId} failed after {StepCount} steps")]
public static partial void TaskFailed(
ILogger logger,
Exception exception,
string taskName,
Guid taskId,
int stepCount);
}Usage in the Orchestrator
// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
private async Task<StepResult> ExecuteStepCore(SagaStepRecord stepRecord, SagaStepContext context)
{
CreateZipFromFilesLogMessages.StepStarted(
_logger,
taskName: "CreateZipFromFiles",
taskId: context.TaskId,
stepName: stepRecord.StepName,
stepOrder: stepRecord.Order,
attemptCount: stepRecord.AttemptCount,
maxAttempts: 3);
var stopwatch = Stopwatch.StartNew();
try
{
var result = await ExecuteStepWithTracing(stepRecord, GetStepExecutor(stepRecord), context);
stopwatch.Stop();
CreateZipFromFilesLogMessages.StepCompleted(
_logger,
taskName: "CreateZipFromFiles",
taskId: context.TaskId,
stepName: stepRecord.StepName,
stepOrder: stepRecord.Order,
durationMs: stopwatch.ElapsedMilliseconds,
status: "Completed");
return result;
}
catch (Exception ex)
{
stopwatch.Stop();
CreateZipFromFilesLogMessages.StepFailed(
_logger,
exception: ex,
taskName: "CreateZipFromFiles",
taskId: context.TaskId,
stepName: stepRecord.StepName,
stepOrder: stepRecord.Order,
errorType: ex.GetType().Name,
attemptCount: stepRecord.AttemptCount,
maxAttempts: 3);
throw;
}
}// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
private async Task<StepResult> ExecuteStepCore(SagaStepRecord stepRecord, SagaStepContext context)
{
CreateZipFromFilesLogMessages.StepStarted(
_logger,
taskName: "CreateZipFromFiles",
taskId: context.TaskId,
stepName: stepRecord.StepName,
stepOrder: stepRecord.Order,
attemptCount: stepRecord.AttemptCount,
maxAttempts: 3);
var stopwatch = Stopwatch.StartNew();
try
{
var result = await ExecuteStepWithTracing(stepRecord, GetStepExecutor(stepRecord), context);
stopwatch.Stop();
CreateZipFromFilesLogMessages.StepCompleted(
_logger,
taskName: "CreateZipFromFiles",
taskId: context.TaskId,
stepName: stepRecord.StepName,
stepOrder: stepRecord.Order,
durationMs: stopwatch.ElapsedMilliseconds,
status: "Completed");
return result;
}
catch (Exception ex)
{
stopwatch.Stop();
CreateZipFromFilesLogMessages.StepFailed(
_logger,
exception: ex,
taskName: "CreateZipFromFiles",
taskId: context.TaskId,
stepName: stepRecord.StepName,
stepOrder: stepRecord.Order,
errorType: ex.GetType().Name,
attemptCount: stepRecord.AttemptCount,
maxAttempts: 3);
throw;
}
}What You See in the Console
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles started (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles completed in 1234ms
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive started (attempt 1/3)
[ERR] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive failed: IOException (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive started (attempt 2/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive completed in 2456ms
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadResult started (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadResult completed in 890ms
[INF] Task CreateZipFromFiles/a1b2c3d4 completed in 4580ms (3 steps)[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles started (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles completed in 1234ms
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive started (attempt 1/3)
[ERR] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive failed: IOException (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive started (attempt 2/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive completed in 2456ms
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadResult started (attempt 1/3)
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadResult completed in 890ms
[INF] Task CreateZipFromFiles/a1b2c3d4 completed in 4580ms (3 steps)When compensation runs:
[ERR] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive failed: IOException (attempt 3/3)
[WRN] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles compensating
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles compensated in 340ms
[ERR] Task CreateZipFromFiles/a1b2c3d4 failed after 2 steps[ERR] Task CreateZipFromFiles/a1b2c3d4 step CreateZipArchive failed: IOException (attempt 3/3)
[WRN] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles compensating
[INF] Task CreateZipFromFiles/a1b2c3d4 step UploadSourceFiles compensated in 340ms
[ERR] Task CreateZipFromFiles/a1b2c3d4 failed after 2 stepsEvery log line carries structured properties (TaskId, TaskName, StepName, StepOrder, AttemptCount, DurationMs, Status, ErrorType) — queryable in Seq, Loki, Elasticsearch, or any structured logging sink.
Metrics
The source generator creates a dedicated Meter per task with counters and histograms that follow OpenTelemetry semantic conventions.
Generated Meter
// ── Generated: CreateZipFromFilesMetrics.g.cs ──
public sealed class CreateZipFromFilesMetrics : IDisposable
{
private readonly Meter _meter;
// Counters
private readonly Counter<long> _tasksSubmitted;
private readonly Counter<long> _tasksCompleted;
private readonly Counter<long> _tasksFailed;
private readonly Counter<long> _compensationCount;
// Histograms
private readonly Histogram<double> _stepDuration;
private readonly Histogram<int> _stepAttempts;
public CreateZipFromFilesMetrics(IMeterFactory meterFactory)
{
_meter = meterFactory.Create("DistributedTask.CreateZipFromFiles");
_tasksSubmitted = _meter.CreateCounter<long>(
"distributed_task.submitted",
unit: "{task}",
description: "Number of distributed tasks submitted");
_tasksCompleted = _meter.CreateCounter<long>(
"distributed_task.completed",
unit: "{task}",
description: "Number of distributed tasks completed successfully");
_tasksFailed = _meter.CreateCounter<long>(
"distributed_task.failed",
unit: "{task}",
description: "Number of distributed tasks that failed permanently");
_compensationCount = _meter.CreateCounter<long>(
"distributed_task.compensation.count",
unit: "{compensation}",
description: "Number of compensation sequences triggered");
_stepDuration = _meter.CreateHistogram<double>(
"distributed_task.step.duration",
unit: "ms",
description: "Step execution duration in milliseconds");
_stepAttempts = _meter.CreateHistogram<int>(
"distributed_task.step.attempts",
unit: "{attempt}",
description: "Number of retry attempts per step execution");
}
public void RecordTaskSubmitted(string taskName)
=> _tasksSubmitted.Add(1, new KeyValuePair<string, object?>("task.name", taskName));
public void RecordTaskCompleted(string taskName)
=> _tasksCompleted.Add(1, new KeyValuePair<string, object?>("task.name", taskName));
public void RecordTaskFailed(string taskName)
=> _tasksFailed.Add(1, new KeyValuePair<string, object?>("task.name", taskName));
public void RecordStepDuration(string taskName, string stepName, double durationMs)
=> _stepDuration.Record(durationMs,
new KeyValuePair<string, object?>("task.name", taskName),
new KeyValuePair<string, object?>("step.name", stepName));
public void RecordStepAttempts(string taskName, string stepName, int attempts)
=> _stepAttempts.Record(attempts,
new KeyValuePair<string, object?>("task.name", taskName),
new KeyValuePair<string, object?>("step.name", stepName));
public void RecordCompensation(string taskName, string stepName)
=> _compensationCount.Add(1,
new KeyValuePair<string, object?>("task.name", taskName),
new KeyValuePair<string, object?>("step.name", stepName));
public void Dispose() => _meter.Dispose();
}// ── Generated: CreateZipFromFilesMetrics.g.cs ──
public sealed class CreateZipFromFilesMetrics : IDisposable
{
private readonly Meter _meter;
// Counters
private readonly Counter<long> _tasksSubmitted;
private readonly Counter<long> _tasksCompleted;
private readonly Counter<long> _tasksFailed;
private readonly Counter<long> _compensationCount;
// Histograms
private readonly Histogram<double> _stepDuration;
private readonly Histogram<int> _stepAttempts;
public CreateZipFromFilesMetrics(IMeterFactory meterFactory)
{
_meter = meterFactory.Create("DistributedTask.CreateZipFromFiles");
_tasksSubmitted = _meter.CreateCounter<long>(
"distributed_task.submitted",
unit: "{task}",
description: "Number of distributed tasks submitted");
_tasksCompleted = _meter.CreateCounter<long>(
"distributed_task.completed",
unit: "{task}",
description: "Number of distributed tasks completed successfully");
_tasksFailed = _meter.CreateCounter<long>(
"distributed_task.failed",
unit: "{task}",
description: "Number of distributed tasks that failed permanently");
_compensationCount = _meter.CreateCounter<long>(
"distributed_task.compensation.count",
unit: "{compensation}",
description: "Number of compensation sequences triggered");
_stepDuration = _meter.CreateHistogram<double>(
"distributed_task.step.duration",
unit: "ms",
description: "Step execution duration in milliseconds");
_stepAttempts = _meter.CreateHistogram<int>(
"distributed_task.step.attempts",
unit: "{attempt}",
description: "Number of retry attempts per step execution");
}
public void RecordTaskSubmitted(string taskName)
=> _tasksSubmitted.Add(1, new KeyValuePair<string, object?>("task.name", taskName));
public void RecordTaskCompleted(string taskName)
=> _tasksCompleted.Add(1, new KeyValuePair<string, object?>("task.name", taskName));
public void RecordTaskFailed(string taskName)
=> _tasksFailed.Add(1, new KeyValuePair<string, object?>("task.name", taskName));
public void RecordStepDuration(string taskName, string stepName, double durationMs)
=> _stepDuration.Record(durationMs,
new KeyValuePair<string, object?>("task.name", taskName),
new KeyValuePair<string, object?>("step.name", stepName));
public void RecordStepAttempts(string taskName, string stepName, int attempts)
=> _stepAttempts.Record(attempts,
new KeyValuePair<string, object?>("task.name", taskName),
new KeyValuePair<string, object?>("step.name", stepName));
public void RecordCompensation(string taskName, string stepName)
=> _compensationCount.Add(1,
new KeyValuePair<string, object?>("task.name", taskName),
new KeyValuePair<string, object?>("step.name", stepName));
public void Dispose() => _meter.Dispose();
}Recording in the Orchestrator
// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
// On task submission
_metrics.RecordTaskSubmitted("CreateZipFromFiles");
// After step completes
_metrics.RecordStepDuration("CreateZipFromFiles", stepRecord.StepName, stopwatch.ElapsedMilliseconds);
_metrics.RecordStepAttempts("CreateZipFromFiles", stepRecord.StepName, stepRecord.AttemptCount);
// On task completion
_metrics.RecordTaskCompleted("CreateZipFromFiles");
// On permanent failure
_metrics.RecordTaskFailed("CreateZipFromFiles");
// On compensation
_metrics.RecordCompensation("CreateZipFromFiles", stepRecord.StepName);// ── Generated: CreateZipFromFilesOrchestrator.g.cs (excerpt) ──
// On task submission
_metrics.RecordTaskSubmitted("CreateZipFromFiles");
// After step completes
_metrics.RecordStepDuration("CreateZipFromFiles", stepRecord.StepName, stopwatch.ElapsedMilliseconds);
_metrics.RecordStepAttempts("CreateZipFromFiles", stepRecord.StepName, stepRecord.AttemptCount);
// On task completion
_metrics.RecordTaskCompleted("CreateZipFromFiles");
// On permanent failure
_metrics.RecordTaskFailed("CreateZipFromFiles");
// On compensation
_metrics.RecordCompensation("CreateZipFromFiles", stepRecord.StepName);Prometheus / Grafana
With the OpenTelemetry Prometheus exporter, these metrics are scraped as standard Prometheus metrics:
# HELP distributed_task_submitted_total Number of distributed tasks submitted
distributed_task_submitted_total{task_name="CreateZipFromFiles"} 142
# HELP distributed_task_step_duration_ms Step execution duration in milliseconds
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="500"} 89
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="1000"} 128
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="5000"} 141
# HELP distributed_task_compensation_count_total Number of compensation sequences triggered
distributed_task_compensation_count_total{task_name="CreateZipFromFiles",step_name="CreateZipArchive"} 3# HELP distributed_task_submitted_total Number of distributed tasks submitted
distributed_task_submitted_total{task_name="CreateZipFromFiles"} 142
# HELP distributed_task_step_duration_ms Step execution duration in milliseconds
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="500"} 89
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="1000"} 128
distributed_task_step_duration_ms_bucket{task_name="CreateZipFromFiles",step_name="UploadSourceFiles",le="5000"} 141
# HELP distributed_task_compensation_count_total Number of compensation sequences triggered
distributed_task_compensation_count_total{task_name="CreateZipFromFiles",step_name="CreateZipArchive"} 3The metrics instance is registered as a singleton in the generated DI extension, so all orchestrator instances share the same counters.
Health Checks
The source generator produces IHealthCheck implementations for every infrastructure dependency the task touches. If the task uses S3, a queue, and Redis, three health checks are generated.
Generated Health Checks
// ── Generated: CreateZipFromFilesHealthChecks.g.cs ──
public class CreateZipFromFilesQueueHealthCheck : IHealthCheck
{
private readonly IQueueClient _queueClient;
public CreateZipFromFilesQueueHealthCheck(IQueueClient queueClient)
=> _queueClient = queueClient;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
await _queueClient.PingAsync("create-zip-from-files", ct);
return HealthCheckResult.Healthy("Queue 'create-zip-from-files' is reachable");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy(
"Queue 'create-zip-from-files' is unreachable",
exception: ex);
}
}
}
public class CreateZipFromFilesS3HealthCheck : IHealthCheck
{
private readonly IS3Client _s3Client;
public CreateZipFromFilesS3HealthCheck(IS3Client s3Client)
=> _s3Client = s3Client;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
await _s3Client.ListBucketsAsync(ct);
return HealthCheckResult.Healthy("S3 is reachable");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("S3 is unreachable", exception: ex);
}
}
}
public class CreateZipFromFilesRedisHealthCheck : IHealthCheck
{
private readonly IConnectionMultiplexer _redis;
public CreateZipFromFilesRedisHealthCheck(IConnectionMultiplexer redis)
=> _redis = redis;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
var db = _redis.GetDatabase();
await db.PingAsync();
return HealthCheckResult.Healthy("Redis is reachable");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Redis is unreachable", exception: ex);
}
}
}
public class CreateZipFromFilesStuckTasksHealthCheck : IHealthCheck
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly TimeSpan _threshold = TimeSpan.FromSeconds(300); // from TimeoutSeconds
public CreateZipFromFilesStuckTasksHealthCheck(IServiceScopeFactory scopeFactory)
=> _scopeFactory = scopeFactory;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
using var scope = _scopeFactory.CreateScope();
var uow = scope.ServiceProvider.GetRequiredService<IDistributedTaskUnitOfWork>();
var cutoff = DateTimeOffset.UtcNow - _threshold;
var stuckTasks = await uow.Tasks.Query
.Where(t => t.TaskName == "CreateZipFromFiles"
&& t.Status == DistributedTaskStatus.Running
&& t.StartedAt < cutoff)
.CountAsync(ct);
if (stuckTasks > 0)
{
return HealthCheckResult.Degraded(
$"{stuckTasks} task(s) stuck in Running state for > {_threshold.TotalSeconds}s",
data: new Dictionary<string, object>
{
["stuck_count"] = stuckTasks,
["threshold_seconds"] = _threshold.TotalSeconds,
});
}
return HealthCheckResult.Healthy("No stuck tasks");
}
}// ── Generated: CreateZipFromFilesHealthChecks.g.cs ──
public class CreateZipFromFilesQueueHealthCheck : IHealthCheck
{
private readonly IQueueClient _queueClient;
public CreateZipFromFilesQueueHealthCheck(IQueueClient queueClient)
=> _queueClient = queueClient;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
await _queueClient.PingAsync("create-zip-from-files", ct);
return HealthCheckResult.Healthy("Queue 'create-zip-from-files' is reachable");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy(
"Queue 'create-zip-from-files' is unreachable",
exception: ex);
}
}
}
public class CreateZipFromFilesS3HealthCheck : IHealthCheck
{
private readonly IS3Client _s3Client;
public CreateZipFromFilesS3HealthCheck(IS3Client s3Client)
=> _s3Client = s3Client;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
await _s3Client.ListBucketsAsync(ct);
return HealthCheckResult.Healthy("S3 is reachable");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("S3 is unreachable", exception: ex);
}
}
}
public class CreateZipFromFilesRedisHealthCheck : IHealthCheck
{
private readonly IConnectionMultiplexer _redis;
public CreateZipFromFilesRedisHealthCheck(IConnectionMultiplexer redis)
=> _redis = redis;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
var db = _redis.GetDatabase();
await db.PingAsync();
return HealthCheckResult.Healthy("Redis is reachable");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Redis is unreachable", exception: ex);
}
}
}
public class CreateZipFromFilesStuckTasksHealthCheck : IHealthCheck
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly TimeSpan _threshold = TimeSpan.FromSeconds(300); // from TimeoutSeconds
public CreateZipFromFilesStuckTasksHealthCheck(IServiceScopeFactory scopeFactory)
=> _scopeFactory = scopeFactory;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
using var scope = _scopeFactory.CreateScope();
var uow = scope.ServiceProvider.GetRequiredService<IDistributedTaskUnitOfWork>();
var cutoff = DateTimeOffset.UtcNow - _threshold;
var stuckTasks = await uow.Tasks.Query
.Where(t => t.TaskName == "CreateZipFromFiles"
&& t.Status == DistributedTaskStatus.Running
&& t.StartedAt < cutoff)
.CountAsync(ct);
if (stuckTasks > 0)
{
return HealthCheckResult.Degraded(
$"{stuckTasks} task(s) stuck in Running state for > {_threshold.TotalSeconds}s",
data: new Dictionary<string, object>
{
["stuck_count"] = stuckTasks,
["threshold_seconds"] = _threshold.TotalSeconds,
});
}
return HealthCheckResult.Healthy("No stuck tasks");
}
}Registration in the DI Extension
All health checks, metrics, and the ActivitySource are wired up in the same generated AddCreateZipFromFilesTask() method:
// ── Generated: CreateZipFromFilesServiceCollectionExtensions.g.cs (excerpt) ──
public static IServiceCollection AddCreateZipFromFilesTask(
this IServiceCollection services,
IConfiguration configuration)
{
// ... orchestrator, queue, S3, etc. (from Part VI) ...
// ── Observability ──
// Metrics
services.AddSingleton<CreateZipFromFilesMetrics>();
// OpenTelemetry tracing
services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("DistributedTask.CreateZipFromFiles"));
// OpenTelemetry metrics
services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddMeter("DistributedTask.CreateZipFromFiles"));
// Health checks
services.AddHealthChecks()
.AddCheck<CreateZipFromFilesQueueHealthCheck>(
"create-zip-queue",
tags: new[] { "distributed-task", "queue" })
.AddCheck<CreateZipFromFilesS3HealthCheck>(
"create-zip-s3",
tags: new[] { "distributed-task", "s3" })
.AddCheck<CreateZipFromFilesRedisHealthCheck>(
"create-zip-redis",
tags: new[] { "distributed-task", "redis" })
.AddCheck<CreateZipFromFilesStuckTasksHealthCheck>(
"create-zip-stuck-tasks",
tags: new[] { "distributed-task", "stuck" });
return services;
}// ── Generated: CreateZipFromFilesServiceCollectionExtensions.g.cs (excerpt) ──
public static IServiceCollection AddCreateZipFromFilesTask(
this IServiceCollection services,
IConfiguration configuration)
{
// ... orchestrator, queue, S3, etc. (from Part VI) ...
// ── Observability ──
// Metrics
services.AddSingleton<CreateZipFromFilesMetrics>();
// OpenTelemetry tracing
services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("DistributedTask.CreateZipFromFiles"));
// OpenTelemetry metrics
services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddMeter("DistributedTask.CreateZipFromFiles"));
// Health checks
services.AddHealthChecks()
.AddCheck<CreateZipFromFilesQueueHealthCheck>(
"create-zip-queue",
tags: new[] { "distributed-task", "queue" })
.AddCheck<CreateZipFromFilesS3HealthCheck>(
"create-zip-s3",
tags: new[] { "distributed-task", "s3" })
.AddCheck<CreateZipFromFilesRedisHealthCheck>(
"create-zip-redis",
tags: new[] { "distributed-task", "redis" })
.AddCheck<CreateZipFromFilesStuckTasksHealthCheck>(
"create-zip-stuck-tasks",
tags: new[] { "distributed-task", "stuck" });
return services;
}Query them at /health:
{
"status": "Degraded",
"entries": {
"create-zip-queue": { "status": "Healthy", "description": "Queue 'create-zip-from-files' is reachable" },
"create-zip-s3": { "status": "Healthy", "description": "S3 is reachable" },
"create-zip-redis": { "status": "Healthy", "description": "Redis is reachable" },
"create-zip-stuck-tasks": {
"status": "Degraded",
"description": "2 task(s) stuck in Running state for > 300s",
"data": { "stuck_count": 2, "threshold_seconds": 300 }
}
}
}{
"status": "Degraded",
"entries": {
"create-zip-queue": { "status": "Healthy", "description": "Queue 'create-zip-from-files' is reachable" },
"create-zip-s3": { "status": "Healthy", "description": "S3 is reachable" },
"create-zip-redis": { "status": "Healthy", "description": "Redis is reachable" },
"create-zip-stuck-tasks": {
"status": "Degraded",
"description": "2 task(s) stuck in Running state for > 300s",
"data": { "stuck_count": 2, "threshold_seconds": 300 }
}
}
}Summary
| Concern | What's Generated | Lines |
|---|---|---|
| Tracing | ActivitySource, span wrapping per step, compensation spans |
~80 |
| Propagation | traceparent/tracestate injection in publisher, extraction in consumer |
~30 |
| Logging | [LoggerMessage] definitions, structured calls in orchestrator |
~60 |
| Metrics | Meter with 4 counters + 2 histograms, recording in orchestrator |
~70 |
| Health checks | Queue, S3, Redis, stuck tasks | ~90 |
| DI registration | AddOpenTelemetry, AddHealthChecks, singleton metrics |
~20 |
| Total | ~350 |
Zero lines of observability code written by the developer. The DSL attributes ([DistributedTask], [SagaStep], [FileUploadStep]) already carry enough information — task name, step name, queue, timeout — for the generator to emit the full observability stack.
What's Next
Part X covers the Listening Strategy — how the client picks its preferred notification channel (Polling, SSE, WebSocket, SignalR, or Webhook) at submit time, and how the generator produces all five transports powered by a single Redis pub/sub backbone.