Listening Strategy
"The client picks. The system delivers. Polling is always available as fallback."
The Problem
A distributed task takes seconds to minutes. The client needs to know what's happening. There are five ways to find out:
- Polling — keep asking "are we there yet?"
- Server-Sent Events (SSE) — open a one-way stream
- WebSocket — open a bidirectional channel
- SignalR — managed WebSocket with reconnection
- Webhook — "call me when it's done"
Every team has its preferred transport. Frontend SPAs love SignalR. Mobile apps prefer SSE. B2B integrations use webhooks. CLI tools poll. Microservices use webhooks. Test suites poll.
Building all five? That's 5x the infrastructure. Unless the system generates them.
The Client Chooses
The listening strategy is a first-class concept in the DSL. Every [TaskRequest] gets a generated ListeningStrategy property:
[TaskRequest("CreateZipFromFiles")]
public partial class CreateZipRequest
{
public List<IFormFile> Files { get; set; } = new();
public string? OutputFileName { get; set; }
// ── Generated by [DistributedTask] ──
public ListeningStrategy Listening { get; set; } = ListeningStrategy.Polling;
public string? WebhookUrl { get; set; }
}
public enum ListeningStrategy
{
Polling, // GET /api/tasks/{id}/status
SSE, // GET /api/tasks/{id}/stream
WebSocket, // ws:///api/tasks/{id}/ws
SignalR, // /hubs/tasks
Webhook, // POST to client-provided URL
}[TaskRequest("CreateZipFromFiles")]
public partial class CreateZipRequest
{
public List<IFormFile> Files { get; set; } = new();
public string? OutputFileName { get; set; }
// ── Generated by [DistributedTask] ──
public ListeningStrategy Listening { get; set; } = ListeningStrategy.Polling;
public string? WebhookUrl { get; set; }
}
public enum ListeningStrategy
{
Polling, // GET /api/tasks/{id}/status
SSE, // GET /api/tasks/{id}/stream
WebSocket, // ws:///api/tasks/{id}/ws
SignalR, // /hubs/tasks
Webhook, // POST to client-provided URL
}The client simply sets the property:
// cURL — polling (default)
POST /api/tasks/create-zip
Content-Type: multipart/form-data
// cURL — SSE
POST /api/tasks/create-zip?listening=SSE
// cURL — webhook
POST /api/tasks/create-zip?listening=Webhook&webhookUrl=https://example.com/callback// cURL — polling (default)
POST /api/tasks/create-zip
Content-Type: multipart/form-data
// cURL — SSE
POST /api/tasks/create-zip?listening=SSE
// cURL — webhook
POST /api/tasks/create-zip?listening=Webhook&webhookUrl=https://example.com/callbackThe Response Tells You Where to Listen
// ── Generated: TaskSubmittedResponse ──
public record TaskSubmittedResponse(
string TaskId,
ListeningStrategy Strategy,
string ListenUrl
);// ── Generated: TaskSubmittedResponse ──
public record TaskSubmittedResponse(
string TaskId,
ListeningStrategy Strategy,
string ListenUrl
);The ListenUrl is computed from the strategy:
| Strategy | ListenUrl |
|---|---|
| Polling | /api/tasks/create-zip/{taskId}/status |
| SSE | /api/tasks/create-zip/{taskId}/stream |
| WebSocket | ws:///api/tasks/create-zip/{taskId}/ws |
| SignalR | /hubs/tasks (subscribe with taskId) |
| Webhook | Echoes back the client's URL |
One UpdateAsync, Five Transports
The magic is in the ITaskProgressTracker. The orchestrator kernel calls UpdateAsync once per step transition. The tracker fans out to all listeners.
The Abstraction
public interface ITaskProgressTracker
{
Task InitializeAsync(string taskId, ListeningStrategy strategy, string[] steps);
Task UpdateAsync(string taskId, string stepName, StepStatus status);
Task CompleteAsync(string taskId);
Task FailAsync(string taskId, string error);
// Polling
Task<TaskProgress?> GetAsync(string taskId, CancellationToken ct);
// Streaming (SSE, WebSocket, SignalR)
IAsyncEnumerable<TaskProgressEvent> StreamAsync(
string taskId, CancellationToken ct);
}
public record TaskProgressEvent(
string TaskId,
string StepName,
StepStatus Status,
DateTimeOffset Timestamp
);
public record TaskProgress(
string TaskId,
Dictionary<string, StepStatus> Steps,
DistributedTaskStatus OverallStatus
);public interface ITaskProgressTracker
{
Task InitializeAsync(string taskId, ListeningStrategy strategy, string[] steps);
Task UpdateAsync(string taskId, string stepName, StepStatus status);
Task CompleteAsync(string taskId);
Task FailAsync(string taskId, string error);
// Polling
Task<TaskProgress?> GetAsync(string taskId, CancellationToken ct);
// Streaming (SSE, WebSocket, SignalR)
IAsyncEnumerable<TaskProgressEvent> StreamAsync(
string taskId, CancellationToken ct);
}
public record TaskProgressEvent(
string TaskId,
string StepName,
StepStatus Status,
DateTimeOffset Timestamp
);
public record TaskProgress(
string TaskId,
Dictionary<string, StepStatus> Steps,
DistributedTaskStatus OverallStatus
);The Redis Implementation
public class RedisTaskProgressTracker : ITaskProgressTracker
{
private readonly IConnectionMultiplexer _redis;
public async Task InitializeAsync(
string taskId, ListeningStrategy strategy, string[] steps)
{
var db = _redis.GetDatabase();
foreach (var step in steps)
await db.HashSetAsync($"task:{taskId}", step, StepStatus.Pending.ToString());
await db.HashSetAsync($"task:{taskId}", "strategy", strategy.ToString());
}
public async Task UpdateAsync(
string taskId, string stepName, StepStatus status)
{
var db = _redis.GetDatabase();
// 1. Persist current state (serves polling GET)
await db.HashSetAsync($"task:{taskId}", stepName, status.ToString());
// 2. Publish event (fans out to SSE, WebSocket, SignalR, Webhook)
var evt = new TaskProgressEvent(taskId, stepName, status, DateTimeOffset.UtcNow);
await _redis.GetSubscriber().PublishAsync(
$"task:{taskId}:progress",
JsonSerializer.Serialize(evt));
}
public async Task<TaskProgress?> GetAsync(
string taskId, CancellationToken ct)
{
var db = _redis.GetDatabase();
var hash = await db.HashGetAllAsync($"task:{taskId}");
if (hash.Length == 0) return null;
var steps = hash
.Where(h => h.Name != "strategy" && h.Name != "overall")
.ToDictionary(
h => h.Name.ToString(),
h => Enum.Parse<StepStatus>(h.Value!));
var overall = hash.FirstOrDefault(h => h.Name == "overall");
return new TaskProgress(taskId, steps,
overall.Value.HasValue
? Enum.Parse<DistributedTaskStatus>(overall.Value!)
: DistributedTaskStatus.Running);
}
public async IAsyncEnumerable<TaskProgressEvent> StreamAsync(
string taskId, [EnumeratorCancellation] CancellationToken ct)
{
var sub = _redis.GetSubscriber();
var channel = await sub.SubscribeAsync($"task:{taskId}:progress");
await foreach (var message in channel.ReadAllAsync(ct))
{
var evt = JsonSerializer.Deserialize<TaskProgressEvent>(message.Message!);
if (evt is not null) yield return evt;
}
}
public async Task CompleteAsync(string taskId)
{
var db = _redis.GetDatabase();
await db.HashSetAsync($"task:{taskId}", "overall",
DistributedTaskStatus.Completed.ToString());
await _redis.GetSubscriber().PublishAsync(
$"task:{taskId}:progress",
JsonSerializer.Serialize(new TaskProgressEvent(
taskId, "__completed__", StepStatus.Completed, DateTimeOffset.UtcNow)));
}
public async Task FailAsync(string taskId, string error)
{
var db = _redis.GetDatabase();
await db.HashSetAsync($"task:{taskId}", "overall",
DistributedTaskStatus.Failed.ToString());
await _redis.GetSubscriber().PublishAsync(
$"task:{taskId}:progress",
JsonSerializer.Serialize(new TaskProgressEvent(
taskId, "__failed__", StepStatus.Failed, DateTimeOffset.UtcNow)));
}
}public class RedisTaskProgressTracker : ITaskProgressTracker
{
private readonly IConnectionMultiplexer _redis;
public async Task InitializeAsync(
string taskId, ListeningStrategy strategy, string[] steps)
{
var db = _redis.GetDatabase();
foreach (var step in steps)
await db.HashSetAsync($"task:{taskId}", step, StepStatus.Pending.ToString());
await db.HashSetAsync($"task:{taskId}", "strategy", strategy.ToString());
}
public async Task UpdateAsync(
string taskId, string stepName, StepStatus status)
{
var db = _redis.GetDatabase();
// 1. Persist current state (serves polling GET)
await db.HashSetAsync($"task:{taskId}", stepName, status.ToString());
// 2. Publish event (fans out to SSE, WebSocket, SignalR, Webhook)
var evt = new TaskProgressEvent(taskId, stepName, status, DateTimeOffset.UtcNow);
await _redis.GetSubscriber().PublishAsync(
$"task:{taskId}:progress",
JsonSerializer.Serialize(evt));
}
public async Task<TaskProgress?> GetAsync(
string taskId, CancellationToken ct)
{
var db = _redis.GetDatabase();
var hash = await db.HashGetAllAsync($"task:{taskId}");
if (hash.Length == 0) return null;
var steps = hash
.Where(h => h.Name != "strategy" && h.Name != "overall")
.ToDictionary(
h => h.Name.ToString(),
h => Enum.Parse<StepStatus>(h.Value!));
var overall = hash.FirstOrDefault(h => h.Name == "overall");
return new TaskProgress(taskId, steps,
overall.Value.HasValue
? Enum.Parse<DistributedTaskStatus>(overall.Value!)
: DistributedTaskStatus.Running);
}
public async IAsyncEnumerable<TaskProgressEvent> StreamAsync(
string taskId, [EnumeratorCancellation] CancellationToken ct)
{
var sub = _redis.GetSubscriber();
var channel = await sub.SubscribeAsync($"task:{taskId}:progress");
await foreach (var message in channel.ReadAllAsync(ct))
{
var evt = JsonSerializer.Deserialize<TaskProgressEvent>(message.Message!);
if (evt is not null) yield return evt;
}
}
public async Task CompleteAsync(string taskId)
{
var db = _redis.GetDatabase();
await db.HashSetAsync($"task:{taskId}", "overall",
DistributedTaskStatus.Completed.ToString());
await _redis.GetSubscriber().PublishAsync(
$"task:{taskId}:progress",
JsonSerializer.Serialize(new TaskProgressEvent(
taskId, "__completed__", StepStatus.Completed, DateTimeOffset.UtcNow)));
}
public async Task FailAsync(string taskId, string error)
{
var db = _redis.GetDatabase();
await db.HashSetAsync($"task:{taskId}", "overall",
DistributedTaskStatus.Failed.ToString());
await _redis.GetSubscriber().PublishAsync(
$"task:{taskId}:progress",
JsonSerializer.Serialize(new TaskProgressEvent(
taskId, "__failed__", StepStatus.Failed, DateTimeOffset.UtcNow)));
}
}Polling — Always Available
// ── Generated: CreateZipTaskController.g.cs (excerpt) ──
[HttpGet("{taskId}/status")]
public async Task<ActionResult<TaskProgress>> GetStatusAsync(
string taskId, CancellationToken ct)
{
var progress = await _progress.GetAsync(taskId, ct);
return progress is null ? NotFound() : Ok(progress);
}// ── Generated: CreateZipTaskController.g.cs (excerpt) ──
[HttpGet("{taskId}/status")]
public async Task<ActionResult<TaskProgress>> GetStatusAsync(
string taskId, CancellationToken ct)
{
var progress = await _progress.GetAsync(taskId, ct);
return progress is null ? NotFound() : Ok(progress);
}Response:
{
"taskId": "a1b2c3d4",
"steps": {
"UploadSourceFiles": "Completed",
"CreateZipArchive": "Running",
"UploadZip": "Pending"
},
"overallStatus": "Running"
}{
"taskId": "a1b2c3d4",
"steps": {
"UploadSourceFiles": "Completed",
"CreateZipArchive": "Running",
"UploadZip": "Pending"
},
"overallStatus": "Running"
}SSE — Server-Sent Events
[HttpGet("{taskId}/stream")]
public async Task StreamAsync(string taskId, CancellationToken ct)
{
Response.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
Response.Headers.Connection = "keep-alive";
await foreach (var evt in _progress.StreamAsync(taskId, ct))
{
await Response.WriteAsync(
$"data: {JsonSerializer.Serialize(evt)}\n\n", ct);
await Response.Body.FlushAsync(ct);
}
}[HttpGet("{taskId}/stream")]
public async Task StreamAsync(string taskId, CancellationToken ct)
{
Response.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
Response.Headers.Connection = "keep-alive";
await foreach (var evt in _progress.StreamAsync(taskId, ct))
{
await Response.WriteAsync(
$"data: {JsonSerializer.Serialize(evt)}\n\n", ct);
await Response.Body.FlushAsync(ct);
}
}Client-side:
const source = new EventSource('/api/tasks/create-zip/a1b2c3d4/stream');
source.onmessage = (e) => {
const evt = JSON.parse(e.data);
console.log(`${evt.stepName}: ${evt.status}`);
};const source = new EventSource('/api/tasks/create-zip/a1b2c3d4/stream');
source.onmessage = (e) => {
const evt = JSON.parse(e.data);
console.log(`${evt.stepName}: ${evt.status}`);
};WebSocket — Raw
[HttpGet("{taskId}/ws")]
public async Task WebSocketAsync(string taskId, CancellationToken ct)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
HttpContext.Response.StatusCode = 400;
return;
}
using var ws = await HttpContext.WebSockets.AcceptWebSocketAsync();
await foreach (var evt in _progress.StreamAsync(taskId, ct))
{
var bytes = JsonSerializer.SerializeToUtf8Bytes(evt);
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, ct);
}
await ws.CloseAsync(
WebSocketCloseStatus.NormalClosure, "Task completed", ct);
}[HttpGet("{taskId}/ws")]
public async Task WebSocketAsync(string taskId, CancellationToken ct)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
HttpContext.Response.StatusCode = 400;
return;
}
using var ws = await HttpContext.WebSockets.AcceptWebSocketAsync();
await foreach (var evt in _progress.StreamAsync(taskId, ct))
{
var bytes = JsonSerializer.SerializeToUtf8Bytes(evt);
await ws.SendAsync(bytes, WebSocketMessageType.Text, true, ct);
}
await ws.CloseAsync(
WebSocketCloseStatus.NormalClosure, "Task completed", ct);
}SignalR — Managed WebSocket
// ── Generated: CreateZipTaskHub.g.cs ──
public class CreateZipTaskHub : Hub
{
private readonly ITaskProgressTracker _progress;
public CreateZipTaskHub(ITaskProgressTracker progress)
=> _progress = progress;
public async Task WatchTask(string taskId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"task:{taskId}");
_ = Task.Run(async () =>
{
try
{
await foreach (var evt in _progress.StreamAsync(
taskId, Context.ConnectionAborted))
{
await Clients.Group($"task:{taskId}")
.SendAsync("ProgressUpdate", evt,
Context.ConnectionAborted);
}
}
catch (OperationCanceledException) { }
});
}
public async Task UnwatchTask(string taskId)
{
await Groups.RemoveFromGroupAsync(
Context.ConnectionId, $"task:{taskId}");
}
}// ── Generated: CreateZipTaskHub.g.cs ──
public class CreateZipTaskHub : Hub
{
private readonly ITaskProgressTracker _progress;
public CreateZipTaskHub(ITaskProgressTracker progress)
=> _progress = progress;
public async Task WatchTask(string taskId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"task:{taskId}");
_ = Task.Run(async () =>
{
try
{
await foreach (var evt in _progress.StreamAsync(
taskId, Context.ConnectionAborted))
{
await Clients.Group($"task:{taskId}")
.SendAsync("ProgressUpdate", evt,
Context.ConnectionAborted);
}
}
catch (OperationCanceledException) { }
});
}
public async Task UnwatchTask(string taskId)
{
await Groups.RemoveFromGroupAsync(
Context.ConnectionId, $"task:{taskId}");
}
}Client-side:
const connection = new signalR.HubConnectionBuilder()
.withUrl('/hubs/tasks')
.build();
connection.on('ProgressUpdate', (evt) => {
console.log(`${evt.stepName}: ${evt.status}`);
});
await connection.start();
await connection.invoke('WatchTask', 'a1b2c3d4');const connection = new signalR.HubConnectionBuilder()
.withUrl('/hubs/tasks')
.build();
connection.on('ProgressUpdate', (evt) => {
console.log(`${evt.stepName}: ${evt.status}`);
});
await connection.start();
await connection.invoke('WatchTask', 'a1b2c3d4');Webhook — POST to Client URL
// ── Generated: CreateZipWebhookNotifier.g.cs ──
public class CreateZipWebhookNotifier : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IDistributedTaskUnitOfWork _uow;
private readonly ILogger<CreateZipWebhookNotifier> _logger;
protected override async Task ExecuteAsync(CancellationToken ct)
{
var sub = _redis.GetSubscriber();
var channel = await sub.SubscribeAsync("task:*:progress");
await foreach (var message in channel.ReadAllAsync(ct))
{
try
{
var evt = JsonSerializer.Deserialize<TaskProgressEvent>(
message.Message!);
if (evt is null) continue;
var task = await _uow.Tasks.FindByIdAsync(
Guid.Parse(evt.TaskId));
if (task?.Listening != ListeningStrategy.Webhook
|| task.WebhookUrl is null)
continue;
var client = _httpClientFactory.CreateClient("webhook");
await client.PostAsJsonAsync(task.WebhookUrl, evt, ct);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Webhook delivery failed");
}
}
}
}// ── Generated: CreateZipWebhookNotifier.g.cs ──
public class CreateZipWebhookNotifier : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IDistributedTaskUnitOfWork _uow;
private readonly ILogger<CreateZipWebhookNotifier> _logger;
protected override async Task ExecuteAsync(CancellationToken ct)
{
var sub = _redis.GetSubscriber();
var channel = await sub.SubscribeAsync("task:*:progress");
await foreach (var message in channel.ReadAllAsync(ct))
{
try
{
var evt = JsonSerializer.Deserialize<TaskProgressEvent>(
message.Message!);
if (evt is null) continue;
var task = await _uow.Tasks.FindByIdAsync(
Guid.Parse(evt.TaskId));
if (task?.Listening != ListeningStrategy.Webhook
|| task.WebhookUrl is null)
continue;
var client = _httpClientFactory.CreateClient("webhook");
await client.PostAsJsonAsync(task.WebhookUrl, evt, ct);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Webhook delivery failed");
}
}
}
}The webhook payload:
POST https://example.com/callback
Content-Type: application/json
{
"taskId": "a1b2c3d4",
"stepName": "CreateZipArchive",
"status": "Completed",
"timestamp": "2026-04-03T14:22:07Z"
}POST https://example.com/callback
Content-Type: application/json
{
"taskId": "a1b2c3d4",
"stepName": "CreateZipArchive",
"status": "Completed",
"timestamp": "2026-04-03T14:22:07Z"
}Architecture Summary
Orchestrator kernel
└── _progress.UpdateAsync(taskId, stepName, status)
│
├── Redis HSET (persists state)
│ └── serves Polling GET
│
└── Redis PUBLISH (fans out to subscribers)
│
├── SSE endpoint
│ └── StreamAsync → text/event-stream
│
├── WebSocket endpoint
│ └── StreamAsync → ws.SendAsync
│
├── SignalR hub
│ └── StreamAsync → Clients.Group.SendAsync
│
└── Webhook notifier
└── BackgroundService → POST to client URLOrchestrator kernel
└── _progress.UpdateAsync(taskId, stepName, status)
│
├── Redis HSET (persists state)
│ └── serves Polling GET
│
└── Redis PUBLISH (fans out to subscribers)
│
├── SSE endpoint
│ └── StreamAsync → text/event-stream
│
├── WebSocket endpoint
│ └── StreamAsync → ws.SendAsync
│
├── SignalR hub
│ └── StreamAsync → Clients.Group.SendAsync
│
└── Webhook notifier
└── BackgroundService → POST to client URLAll five strategies from a single UpdateAsync call. No conditional logic in the orchestrator. No transport leaking into business code.
Validation
The controller validates the listening strategy on submission:
[HttpPost]
public async Task<ActionResult<TaskSubmittedResponse>> SubmitAsync(
[FromForm] CreateZipRequest request, CancellationToken ct)
{
if (request.Listening == ListeningStrategy.Webhook
&& string.IsNullOrEmpty(request.WebhookUrl))
return BadRequest("WebhookUrl is required when using Webhook strategy");
var taskId = await _apiOrchestrator.SubmitAsync(request, ct);
var listenUrl = ResolveListenUrl(taskId, request.Listening);
return Accepted(new TaskSubmittedResponse(
taskId, request.Listening, listenUrl));
}
private string ResolveListenUrl(string taskId, ListeningStrategy strategy)
=> strategy switch
{
ListeningStrategy.Polling => $"/api/tasks/create-zip/{taskId}/status",
ListeningStrategy.SSE => $"/api/tasks/create-zip/{taskId}/stream",
ListeningStrategy.WebSocket => $"ws:///api/tasks/create-zip/{taskId}/ws",
ListeningStrategy.SignalR => "/hubs/tasks",
ListeningStrategy.Webhook => "N/A — server will POST to your URL",
_ => throw new ArgumentOutOfRangeException(nameof(strategy))
};[HttpPost]
public async Task<ActionResult<TaskSubmittedResponse>> SubmitAsync(
[FromForm] CreateZipRequest request, CancellationToken ct)
{
if (request.Listening == ListeningStrategy.Webhook
&& string.IsNullOrEmpty(request.WebhookUrl))
return BadRequest("WebhookUrl is required when using Webhook strategy");
var taskId = await _apiOrchestrator.SubmitAsync(request, ct);
var listenUrl = ResolveListenUrl(taskId, request.Listening);
return Accepted(new TaskSubmittedResponse(
taskId, request.Listening, listenUrl));
}
private string ResolveListenUrl(string taskId, ListeningStrategy strategy)
=> strategy switch
{
ListeningStrategy.Polling => $"/api/tasks/create-zip/{taskId}/status",
ListeningStrategy.SSE => $"/api/tasks/create-zip/{taskId}/stream",
ListeningStrategy.WebSocket => $"ws:///api/tasks/create-zip/{taskId}/ws",
ListeningStrategy.SignalR => "/hubs/tasks",
ListeningStrategy.Webhook => "N/A — server will POST to your URL",
_ => throw new ArgumentOutOfRangeException(nameof(strategy))
};What's Next
Part XI covers testing sagas — how to verify compensation chains, retry behavior, and concurrent worker scenarios using the in-memory infrastructure from Part VII.