Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

Listening Strategy

"The client picks. The system delivers. Polling is always available as fallback."


The Problem

A distributed task takes seconds to minutes. The client needs to know what's happening. There are five ways to find out:

  1. Polling — keep asking "are we there yet?"
  2. Server-Sent Events (SSE) — open a one-way stream
  3. WebSocket — open a bidirectional channel
  4. SignalR — managed WebSocket with reconnection
  5. Webhook — "call me when it's done"

Every team has its preferred transport. Frontend SPAs love SignalR. Mobile apps prefer SSE. B2B integrations use webhooks. CLI tools poll. Microservices use webhooks. Test suites poll.

Building all five? That's 5x the infrastructure. Unless the system generates them.


The Client Chooses

The listening strategy is a first-class concept in the DSL. Every [TaskRequest] gets a generated ListeningStrategy property:

[TaskRequest("CreateZipFromFiles")]
public partial class CreateZipRequest
{
    public List<IFormFile> Files { get; set; } = new();
    public string? OutputFileName { get; set; }

    // ── Generated by [DistributedTask] ──
    public ListeningStrategy Listening { get; set; } = ListeningStrategy.Polling;
    public string? WebhookUrl { get; set; }
}

public enum ListeningStrategy
{
    Polling,     // GET /api/tasks/{id}/status
    SSE,         // GET /api/tasks/{id}/stream
    WebSocket,   // ws:///api/tasks/{id}/ws
    SignalR,     // /hubs/tasks
    Webhook,     // POST to client-provided URL
}

The client simply sets the property:

// cURL — polling (default)
POST /api/tasks/create-zip
Content-Type: multipart/form-data

// cURL — SSE
POST /api/tasks/create-zip?listening=SSE

// cURL — webhook
POST /api/tasks/create-zip?listening=Webhook&webhookUrl=https://example.com/callback

The Response Tells You Where to Listen

// ── Generated: TaskSubmittedResponse ──
public record TaskSubmittedResponse(
    string TaskId,
    ListeningStrategy Strategy,
    string ListenUrl
);

The ListenUrl is computed from the strategy:

Strategy ListenUrl
Polling /api/tasks/create-zip/{taskId}/status
SSE /api/tasks/create-zip/{taskId}/stream
WebSocket ws:///api/tasks/create-zip/{taskId}/ws
SignalR /hubs/tasks (subscribe with taskId)
Webhook Echoes back the client's URL

One UpdateAsync, Five Transports

The magic is in the ITaskProgressTracker. The orchestrator kernel calls UpdateAsync once per step transition. The tracker fans out to all listeners.

The Abstraction

public interface ITaskProgressTracker
{
    Task InitializeAsync(string taskId, ListeningStrategy strategy, string[] steps);
    Task UpdateAsync(string taskId, string stepName, StepStatus status);
    Task CompleteAsync(string taskId);
    Task FailAsync(string taskId, string error);

    // Polling
    Task<TaskProgress?> GetAsync(string taskId, CancellationToken ct);

    // Streaming (SSE, WebSocket, SignalR)
    IAsyncEnumerable<TaskProgressEvent> StreamAsync(
        string taskId, CancellationToken ct);
}

public record TaskProgressEvent(
    string TaskId,
    string StepName,
    StepStatus Status,
    DateTimeOffset Timestamp
);

public record TaskProgress(
    string TaskId,
    Dictionary<string, StepStatus> Steps,
    DistributedTaskStatus OverallStatus
);

The Redis Implementation

public class RedisTaskProgressTracker : ITaskProgressTracker
{
    private readonly IConnectionMultiplexer _redis;

    public async Task InitializeAsync(
        string taskId, ListeningStrategy strategy, string[] steps)
    {
        var db = _redis.GetDatabase();
        foreach (var step in steps)
            await db.HashSetAsync($"task:{taskId}", step, StepStatus.Pending.ToString());
        await db.HashSetAsync($"task:{taskId}", "strategy", strategy.ToString());
    }

    public async Task UpdateAsync(
        string taskId, string stepName, StepStatus status)
    {
        var db = _redis.GetDatabase();

        // 1. Persist current state (serves polling GET)
        await db.HashSetAsync($"task:{taskId}", stepName, status.ToString());

        // 2. Publish event (fans out to SSE, WebSocket, SignalR, Webhook)
        var evt = new TaskProgressEvent(taskId, stepName, status, DateTimeOffset.UtcNow);
        await _redis.GetSubscriber().PublishAsync(
            $"task:{taskId}:progress",
            JsonSerializer.Serialize(evt));
    }

    public async Task<TaskProgress?> GetAsync(
        string taskId, CancellationToken ct)
    {
        var db = _redis.GetDatabase();
        var hash = await db.HashGetAllAsync($"task:{taskId}");
        if (hash.Length == 0) return null;

        var steps = hash
            .Where(h => h.Name != "strategy" && h.Name != "overall")
            .ToDictionary(
                h => h.Name.ToString(),
                h => Enum.Parse<StepStatus>(h.Value!));

        var overall = hash.FirstOrDefault(h => h.Name == "overall");
        return new TaskProgress(taskId, steps,
            overall.Value.HasValue
                ? Enum.Parse<DistributedTaskStatus>(overall.Value!)
                : DistributedTaskStatus.Running);
    }

    public async IAsyncEnumerable<TaskProgressEvent> StreamAsync(
        string taskId, [EnumeratorCancellation] CancellationToken ct)
    {
        var sub = _redis.GetSubscriber();
        var channel = await sub.SubscribeAsync($"task:{taskId}:progress");

        await foreach (var message in channel.ReadAllAsync(ct))
        {
            var evt = JsonSerializer.Deserialize<TaskProgressEvent>(message.Message!);
            if (evt is not null) yield return evt;
        }
    }

    public async Task CompleteAsync(string taskId)
    {
        var db = _redis.GetDatabase();
        await db.HashSetAsync($"task:{taskId}", "overall",
            DistributedTaskStatus.Completed.ToString());
        await _redis.GetSubscriber().PublishAsync(
            $"task:{taskId}:progress",
            JsonSerializer.Serialize(new TaskProgressEvent(
                taskId, "__completed__", StepStatus.Completed, DateTimeOffset.UtcNow)));
    }

    public async Task FailAsync(string taskId, string error)
    {
        var db = _redis.GetDatabase();
        await db.HashSetAsync($"task:{taskId}", "overall",
            DistributedTaskStatus.Failed.ToString());
        await _redis.GetSubscriber().PublishAsync(
            $"task:{taskId}:progress",
            JsonSerializer.Serialize(new TaskProgressEvent(
                taskId, "__failed__", StepStatus.Failed, DateTimeOffset.UtcNow)));
    }
}

Polling — Always Available

// ── Generated: CreateZipTaskController.g.cs (excerpt) ──

[HttpGet("{taskId}/status")]
public async Task<ActionResult<TaskProgress>> GetStatusAsync(
    string taskId, CancellationToken ct)
{
    var progress = await _progress.GetAsync(taskId, ct);
    return progress is null ? NotFound() : Ok(progress);
}

Response:

{
  "taskId": "a1b2c3d4",
  "steps": {
    "UploadSourceFiles": "Completed",
    "CreateZipArchive": "Running",
    "UploadZip": "Pending"
  },
  "overallStatus": "Running"
}

SSE — Server-Sent Events

[HttpGet("{taskId}/stream")]
public async Task StreamAsync(string taskId, CancellationToken ct)
{
    Response.ContentType = "text/event-stream";
    Response.Headers.CacheControl = "no-cache";
    Response.Headers.Connection = "keep-alive";

    await foreach (var evt in _progress.StreamAsync(taskId, ct))
    {
        await Response.WriteAsync(
            $"data: {JsonSerializer.Serialize(evt)}\n\n", ct);
        await Response.Body.FlushAsync(ct);
    }
}

Client-side:

const source = new EventSource('/api/tasks/create-zip/a1b2c3d4/stream');
source.onmessage = (e) => {
    const evt = JSON.parse(e.data);
    console.log(`${evt.stepName}: ${evt.status}`);
};

WebSocket — Raw

[HttpGet("{taskId}/ws")]
public async Task WebSocketAsync(string taskId, CancellationToken ct)
{
    if (!HttpContext.WebSockets.IsWebSocketRequest)
    {
        HttpContext.Response.StatusCode = 400;
        return;
    }

    using var ws = await HttpContext.WebSockets.AcceptWebSocketAsync();
    await foreach (var evt in _progress.StreamAsync(taskId, ct))
    {
        var bytes = JsonSerializer.SerializeToUtf8Bytes(evt);
        await ws.SendAsync(bytes, WebSocketMessageType.Text, true, ct);
    }
    await ws.CloseAsync(
        WebSocketCloseStatus.NormalClosure, "Task completed", ct);
}

SignalR — Managed WebSocket

// ── Generated: CreateZipTaskHub.g.cs ──
public class CreateZipTaskHub : Hub
{
    private readonly ITaskProgressTracker _progress;

    public CreateZipTaskHub(ITaskProgressTracker progress)
        => _progress = progress;

    public async Task WatchTask(string taskId)
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, $"task:{taskId}");

        _ = Task.Run(async () =>
        {
            try
            {
                await foreach (var evt in _progress.StreamAsync(
                    taskId, Context.ConnectionAborted))
                {
                    await Clients.Group($"task:{taskId}")
                        .SendAsync("ProgressUpdate", evt,
                            Context.ConnectionAborted);
                }
            }
            catch (OperationCanceledException) { }
        });
    }

    public async Task UnwatchTask(string taskId)
    {
        await Groups.RemoveFromGroupAsync(
            Context.ConnectionId, $"task:{taskId}");
    }
}

Client-side:

const connection = new signalR.HubConnectionBuilder()
    .withUrl('/hubs/tasks')
    .build();

connection.on('ProgressUpdate', (evt) => {
    console.log(`${evt.stepName}: ${evt.status}`);
});

await connection.start();
await connection.invoke('WatchTask', 'a1b2c3d4');

Webhook — POST to Client URL

// ── Generated: CreateZipWebhookNotifier.g.cs ──
public class CreateZipWebhookNotifier : BackgroundService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly IDistributedTaskUnitOfWork _uow;
    private readonly ILogger<CreateZipWebhookNotifier> _logger;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var sub = _redis.GetSubscriber();
        var channel = await sub.SubscribeAsync("task:*:progress");

        await foreach (var message in channel.ReadAllAsync(ct))
        {
            try
            {
                var evt = JsonSerializer.Deserialize<TaskProgressEvent>(
                    message.Message!);
                if (evt is null) continue;

                var task = await _uow.Tasks.FindByIdAsync(
                    Guid.Parse(evt.TaskId));
                if (task?.Listening != ListeningStrategy.Webhook
                    || task.WebhookUrl is null)
                    continue;

                var client = _httpClientFactory.CreateClient("webhook");
                await client.PostAsJsonAsync(task.WebhookUrl, evt, ct);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, "Webhook delivery failed");
            }
        }
    }
}

The webhook payload:

POST https://example.com/callback
Content-Type: application/json

{
  "taskId": "a1b2c3d4",
  "stepName": "CreateZipArchive",
  "status": "Completed",
  "timestamp": "2026-04-03T14:22:07Z"
}

Architecture Summary

Orchestrator kernel
    └── _progress.UpdateAsync(taskId, stepName, status)
            │
            ├── Redis HSET (persists state)
            │       └── serves Polling GET
            │
            └── Redis PUBLISH (fans out to subscribers)
                    │
                    ├── SSE endpoint
                    │   └── StreamAsync → text/event-stream
                    │
                    ├── WebSocket endpoint
                    │   └── StreamAsync → ws.SendAsync
                    │
                    ├── SignalR hub
                    │   └── StreamAsync → Clients.Group.SendAsync
                    │
                    └── Webhook notifier
                        └── BackgroundService → POST to client URL

All five strategies from a single UpdateAsync call. No conditional logic in the orchestrator. No transport leaking into business code.


Validation

The controller validates the listening strategy on submission:

[HttpPost]
public async Task<ActionResult<TaskSubmittedResponse>> SubmitAsync(
    [FromForm] CreateZipRequest request, CancellationToken ct)
{
    if (request.Listening == ListeningStrategy.Webhook
        && string.IsNullOrEmpty(request.WebhookUrl))
        return BadRequest("WebhookUrl is required when using Webhook strategy");

    var taskId = await _apiOrchestrator.SubmitAsync(request, ct);
    var listenUrl = ResolveListenUrl(taskId, request.Listening);

    return Accepted(new TaskSubmittedResponse(
        taskId, request.Listening, listenUrl));
}

private string ResolveListenUrl(string taskId, ListeningStrategy strategy)
    => strategy switch
    {
        ListeningStrategy.Polling   => $"/api/tasks/create-zip/{taskId}/status",
        ListeningStrategy.SSE       => $"/api/tasks/create-zip/{taskId}/stream",
        ListeningStrategy.WebSocket => $"ws:///api/tasks/create-zip/{taskId}/ws",
        ListeningStrategy.SignalR   => "/hubs/tasks",
        ListeningStrategy.Webhook   => "N/A — server will POST to your URL",
        _ => throw new ArgumentOutOfRangeException(nameof(strategy))
    };

What's Next

Part XI covers testing sagas — how to verify compensation chains, retry behavior, and concurrent worker scenarios using the in-memory infrastructure from Part VII.