Deployment
"The system is designed so that stopping a worker mid-flight corrupts nothing. Messages stay in the queue. Locks expire. The next worker picks up where this one left off."
Graceful Shutdown
A worker is a .NET IHostedService. When the container runtime sends SIGTERM, the host calls StopAsync. The worker must:
- Stop accepting new messages from the queue
- Finish the current step (not the entire saga — just the step in progress)
- Release any distributed locks
- Let the remaining messages stay in the queue for the next worker
public sealed class DistributedTaskWorker : BackgroundService
{
private readonly IQueueConsumer _consumer;
private readonly ISagaStepExecutor _executor;
private readonly IDistributedLockManager _lockManager;
private readonly ILogger<DistributedTaskWorker> _logger;
private volatile bool _isShuttingDown;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// stoppingToken is triggered when StopAsync is called
stoppingToken.Register(() => _isShuttingDown = true);
await foreach (var message in _consumer.ConsumeAsync(stoppingToken))
{
if (_isShuttingDown)
{
// Don't ack — message returns to queue
_logger.LogInformation(
"Shutdown in progress. Releasing message {MessageId} " +
"back to queue.",
message.Id);
await message.NackAsync();
break;
}
await ProcessMessageAsync(message, stoppingToken);
}
}
private async Task ProcessMessageAsync(
QueueMessage message,
CancellationToken ct)
{
var lockKey = $"task:{message.TaskId}:step:{message.StepName}";
await using var @lock = await _lockManager.AcquireAsync(lockKey, ct);
if (@lock is null)
{
// Another worker has this step — nack and move on
await message.NackAsync();
return;
}
try
{
await _executor.ExecuteStepAsync(message, ct);
await message.AckAsync();
}
catch (OperationCanceledException) when (_isShuttingDown)
{
// Shutdown requested during step execution.
// The step may or may not have completed.
// Idempotency keys ensure re-execution is safe.
_logger.LogWarning(
"Step {StepName} for task {TaskId} interrupted by shutdown. " +
"Message will be redelivered.",
message.StepName, message.TaskId);
await message.NackAsync();
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Worker shutdown initiated.");
_isShuttingDown = true;
// Wait for the current step to finish, up to the cancellation token
await base.StopAsync(cancellationToken);
_logger.LogInformation("Worker shutdown complete.");
}
}public sealed class DistributedTaskWorker : BackgroundService
{
private readonly IQueueConsumer _consumer;
private readonly ISagaStepExecutor _executor;
private readonly IDistributedLockManager _lockManager;
private readonly ILogger<DistributedTaskWorker> _logger;
private volatile bool _isShuttingDown;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// stoppingToken is triggered when StopAsync is called
stoppingToken.Register(() => _isShuttingDown = true);
await foreach (var message in _consumer.ConsumeAsync(stoppingToken))
{
if (_isShuttingDown)
{
// Don't ack — message returns to queue
_logger.LogInformation(
"Shutdown in progress. Releasing message {MessageId} " +
"back to queue.",
message.Id);
await message.NackAsync();
break;
}
await ProcessMessageAsync(message, stoppingToken);
}
}
private async Task ProcessMessageAsync(
QueueMessage message,
CancellationToken ct)
{
var lockKey = $"task:{message.TaskId}:step:{message.StepName}";
await using var @lock = await _lockManager.AcquireAsync(lockKey, ct);
if (@lock is null)
{
// Another worker has this step — nack and move on
await message.NackAsync();
return;
}
try
{
await _executor.ExecuteStepAsync(message, ct);
await message.AckAsync();
}
catch (OperationCanceledException) when (_isShuttingDown)
{
// Shutdown requested during step execution.
// The step may or may not have completed.
// Idempotency keys ensure re-execution is safe.
_logger.LogWarning(
"Step {StepName} for task {TaskId} interrupted by shutdown. " +
"Message will be redelivered.",
message.StepName, message.TaskId);
await message.NackAsync();
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Worker shutdown initiated.");
_isShuttingDown = true;
// Wait for the current step to finish, up to the cancellation token
await base.StopAsync(cancellationToken);
_logger.LogInformation("Worker shutdown complete.");
}
}Key points:
_isShuttingDownflag — checked before accepting new messagesNackAsync— returns the message to the queue without acknowledgment- Idempotency keys — if the step was partially executed before shutdown, re-execution is safe because each step checks its idempotency key before doing work
- Lock expiry — even if the worker crashes without releasing the lock, Redis TTL ensures the lock expires and another worker can acquire it
Rolling Updates
A rolling update replaces worker containers one at a time. At any moment, old and new code coexist. This raises a question: what happens to in-progress sagas?
The answer: nothing special. The architecture handles it naturally.
Why No Migration Is Needed
- Graceful stop: the old worker finishes its current step, nacks remaining messages, and exits
- Queue redelivery: unacknowledged messages go back to the queue
- New worker picks up: the new container starts consuming from the same queue
- Saga state is in the database: the new worker reads the current step from the
DistributedTaskInstanceaggregate root and continues
There is no in-memory saga state to migrate. The saga's position is persisted in the database. The worker is stateless — it reads the current state, executes one step, writes the new state, and publishes the next message.
Old Worker Queue New Worker
| | |
|-- executing step 3 ------>| |
| | |
| SIGTERM received | |
|-- finish step 3 --------->| |
|-- nack step 4 message --->| |
|-- exit | |
| |-- step 4 message ------->|
| | |-- execute step 4
| | |-- publish step 5Old Worker Queue New Worker
| | |
|-- executing step 3 ------>| |
| | |
| SIGTERM received | |
|-- finish step 3 --------->| |
|-- nack step 4 message --->| |
|-- exit | |
| |-- step 4 message ------->|
| | |-- execute step 4
| | |-- publish step 5Kubernetes Configuration
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-worker
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
template:
spec:
terminationGracePeriodSeconds: 120
containers:
- name: worker
image: myapp/worker:v2
resources:
requests:
cpu: "500m"
memory: "256Mi"
limits:
cpu: "1000m"
memory: "512Mi"apiVersion: apps/v1
kind: Deployment
metadata:
name: task-worker
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
template:
spec:
terminationGracePeriodSeconds: 120
containers:
- name: worker
image: myapp/worker:v2
resources:
requests:
cpu: "500m"
memory: "256Mi"
limits:
cpu: "1000m"
memory: "512Mi"Set terminationGracePeriodSeconds higher than the longest step timeout. If a step can take up to 60 seconds, set the grace period to 120 seconds. Kubernetes will wait that long before sending SIGKILL.
Horizontal Scaling
Multiple workers consume from the same queue. The queue handles load balancing via the competing consumers pattern — each message is delivered to exactly one consumer.
The Distributed Lock Guarantee
Without locking, two workers could process the same task step if the message is redelivered (network glitch, slow ack). The distributed lock ensures one executor per task:
Worker A Redis Worker B
| | |
|-- LOCK task:123:step:3 ->| |
|<-- OK (acquired) -------| |
| | |
| executing step 3 |-- LOCK task:123:step:3 ->|
| |<-- FAIL (already held) --|
| | |-- nack, try later
| | |
|-- UNLOCK ------------->| |
|-- publish step 4 msg -->| |Worker A Redis Worker B
| | |
|-- LOCK task:123:step:3 ->| |
|<-- OK (acquired) -------| |
| | |
| executing step 3 |-- LOCK task:123:step:3 ->|
| |<-- FAIL (already held) --|
| | |-- nack, try later
| | |
|-- UNLOCK ------------->| |
|-- publish step 4 msg -->| |The lock is scoped to {taskId}:{stepName}. Different tasks run concurrently on different workers. Different steps of the same task run sequentially (by design — the next step message is only published after the current step completes).
Scaling Configuration
builder.Services.AddDistributedTaskWorker(options =>
{
// How many messages to process concurrently per worker instance
options.MaxConcurrency = 10;
// Prefetch count — how many messages to pull from the queue at once
options.PrefetchCount = 20;
// Distributed lock TTL — must be > longest step duration
options.LockTimeoutSeconds = 120;
// Lock renewal interval — extend the lock while the step is running
options.LockRenewalIntervalSeconds = 30;
});builder.Services.AddDistributedTaskWorker(options =>
{
// How many messages to process concurrently per worker instance
options.MaxConcurrency = 10;
// Prefetch count — how many messages to pull from the queue at once
options.PrefetchCount = 20;
// Distributed lock TTL — must be > longest step duration
options.LockTimeoutSeconds = 120;
// Lock renewal interval — extend the lock while the step is running
options.LockRenewalIntervalSeconds = 30;
});With MaxConcurrency = 10 and 3 worker replicas, the system processes up to 30 task steps concurrently. The queue and the distributed lock handle all coordination.
Container Deployment
Two containers: the API (receives task submissions) and the Worker (executes saga steps).
Dockerfile
# ── Build stage ──
FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build
WORKDIR /src
COPY . .
RUN dotnet publish src/MyApp.Api -c Release -o /app/api
RUN dotnet publish src/MyApp.Worker -c Release -o /app/worker
# ── API container ──
FROM mcr.microsoft.com/dotnet/aspnet:9.0 AS api
WORKDIR /app
COPY --from=build /app/api .
EXPOSE 8080
ENTRYPOINT ["dotnet", "MyApp.Api.dll"]
# ── Worker container ──
FROM mcr.microsoft.com/dotnet/aspnet:9.0 AS worker
WORKDIR /app
COPY --from=build /app/worker .
ENTRYPOINT ["dotnet", "MyApp.Worker.dll"]# ── Build stage ──
FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build
WORKDIR /src
COPY . .
RUN dotnet publish src/MyApp.Api -c Release -o /app/api
RUN dotnet publish src/MyApp.Worker -c Release -o /app/worker
# ── API container ──
FROM mcr.microsoft.com/dotnet/aspnet:9.0 AS api
WORKDIR /app
COPY --from=build /app/api .
EXPOSE 8080
ENTRYPOINT ["dotnet", "MyApp.Api.dll"]
# ── Worker container ──
FROM mcr.microsoft.com/dotnet/aspnet:9.0 AS worker
WORKDIR /app
COPY --from=build /app/worker .
ENTRYPOINT ["dotnet", "MyApp.Worker.dll"]The API container exposes HTTP endpoints (generated controllers). The Worker container runs as a headless service — no ports, no ingress, just queue consumption.
Docker Compose (Development)
services:
api:
build:
context: .
target: api
ports:
- "8080:8080"
environment:
- ConnectionStrings__Queue=amqp://rabbitmq:5672
- ConnectionStrings__Redis=redis:6379
- ConnectionStrings__Database=Host=postgres;Database=tasks
worker:
build:
context: .
target: worker
deploy:
replicas: 2
environment:
- ConnectionStrings__Queue=amqp://rabbitmq:5672
- ConnectionStrings__Redis=redis:6379
- ConnectionStrings__Database=Host=postgres;Database=tasks
rabbitmq:
image: rabbitmq:3-management
redis:
image: redis:7-alpine
postgres:
image: postgres:16-alpine
environment:
- POSTGRES_DB=tasks
- POSTGRES_PASSWORD=devservices:
api:
build:
context: .
target: api
ports:
- "8080:8080"
environment:
- ConnectionStrings__Queue=amqp://rabbitmq:5672
- ConnectionStrings__Redis=redis:6379
- ConnectionStrings__Database=Host=postgres;Database=tasks
worker:
build:
context: .
target: worker
deploy:
replicas: 2
environment:
- ConnectionStrings__Queue=amqp://rabbitmq:5672
- ConnectionStrings__Redis=redis:6379
- ConnectionStrings__Database=Host=postgres;Database=tasks
rabbitmq:
image: rabbitmq:3-management
redis:
image: redis:7-alpine
postgres:
image: postgres:16-alpine
environment:
- POSTGRES_DB=tasks
- POSTGRES_PASSWORD=devKubernetes Resources
For production, the key resources are:
Horizontal Pod Autoscaler (HPA)
Scale workers based on queue depth:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: task-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: task-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: rabbitmq_queue_messages_ready
selector:
matchLabels:
queue: distributed-tasks
target:
type: AverageValue
averageValue: "50"apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: task-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: task-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: rabbitmq_queue_messages_ready
selector:
matchLabels:
queue: distributed-tasks
target:
type: AverageValue
averageValue: "50"When the queue depth exceeds 50 messages per worker, Kubernetes adds replicas. When it drops, replicas are removed.
Pod Disruption Budget (PDB)
Ensure at least one worker is always running during voluntary disruptions (node maintenance, cluster upgrades):
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: task-worker-pdb
spec:
minAvailable: 1
selector:
matchLabels:
app: task-workerapiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: task-worker-pdb
spec:
minAvailable: 1
selector:
matchLabels:
app: task-workerRolling Deployment Sequence
At every point in the sequence, at least one worker is consuming messages. No downtime. No lost messages. No corrupted saga state.
The Checklist
Before deploying a DistributedTask system to production:
| Concern | Configuration |
|---|---|
| Grace period | terminationGracePeriodSeconds > longest step timeout |
| Lock TTL | LockTimeoutSeconds > longest step duration |
| Lock renewal | LockRenewalIntervalSeconds < lock TTL / 3 |
| Idempotency | Every step checks its idempotency key before executing |
| Queue durability | Messages are persistent (survive broker restart) |
| Health checks | Worker reports unhealthy when _isShuttingDown is true |
| Replicas | Minimum 2 workers (PDB ensures at least 1 during updates) |
| HPA | Scale on queue depth, not CPU |
| Monitoring | Alert on queue depth > threshold for > N minutes |
What's Next
Part XV compares DistributedTask.Dsl against MassTransit sagas, Temporal, Durable Functions, Hangfire, and Camunda/Zeebe — and quantifies the amplification ratio: ~55 lines of developer code producing ~700+ lines of generated infrastructure.