Toward BPM
"A distributed task is a unit of work. Chain them with conditions and shared state, and you have a process."
What We Already Have
Before adding anything new, consider what the DistributedTask DSL already provides:
| BPM Characteristic | DistributedTask Implementation |
|---|---|
| State tracked and persisted | DistributedTaskInstance is an AggregateRoot — EF Core, full audit |
| Condition snapshots | StepConditionSnapshot records variable values at evaluation time |
| Audit trail | TaskAuditEntry — who triggered, what changed, when, trace ID |
| Typed data between steps | Step data ValueObjects — no JSON bags, no Dictionary<string, object> |
| Compensation | Saga pattern — every step has an undo |
| Retry and resilience | Polly policies per step, idempotency keys, distributed locks |
| Observability | OpenTelemetry spans, structured logging, health checks |
This is not a toy. It is a production-grade task executor with persistence, auditing, and resilience. The question is: can we compose multiple tasks into a process?
Task Chaining with [DependsOn]
A single distributed task is a saga — a linear or parallel sequence of steps. But real processes are graphs of tasks. Order processing is not one task; it is validation, then payment, then fulfillment, then notification — each a separate saga with its own retry policy, compensation chain, and data model.
The [DependsOn] attribute declares edges in a Directed Acyclic Graph (DAG) of tasks:
[DistributedTask("ValidateInventory")]
public partial class ValidateInventorySaga
{
[SagaStep("CheckStock", Order = 1)]
public partial class CheckStock { }
[SagaStep("ReserveItems", Order = 2)]
public partial class ReserveItems { }
}
[DistributedTask("ChargePayment")]
[DependsOn("ValidateInventory", Condition = "Response.AllInStock == true")]
public partial class ChargePaymentSaga
{
[SagaStep("AuthorizeCard", Order = 1)]
public partial class AuthorizeCard { }
[SagaStep("CapturePayment", Order = 2)]
public partial class CapturePayment { }
}
[DistributedTask("FulfillOrder")]
[DependsOn("ValidateInventory", Condition = "Response.AllInStock == true")]
[DependsOn("ChargePayment", Condition = "Response.ChargeSucceeded == true")]
public partial class FulfillOrderSaga
{
[SagaStep("PickItems", Order = 1)]
public partial class PickItems { }
[SagaStep("ShipPackage", Order = 2)]
public partial class ShipPackage { }
}
[DistributedTask("SendConfirmation")]
[DependsOn("FulfillOrder")]
public partial class SendConfirmationSaga
{
[SagaStep("GenerateInvoice", Order = 1)]
public partial class GenerateInvoice { }
[SagaStep("SendEmail", Order = 2)]
public partial class SendEmail { }
}[DistributedTask("ValidateInventory")]
public partial class ValidateInventorySaga
{
[SagaStep("CheckStock", Order = 1)]
public partial class CheckStock { }
[SagaStep("ReserveItems", Order = 2)]
public partial class ReserveItems { }
}
[DistributedTask("ChargePayment")]
[DependsOn("ValidateInventory", Condition = "Response.AllInStock == true")]
public partial class ChargePaymentSaga
{
[SagaStep("AuthorizeCard", Order = 1)]
public partial class AuthorizeCard { }
[SagaStep("CapturePayment", Order = 2)]
public partial class CapturePayment { }
}
[DistributedTask("FulfillOrder")]
[DependsOn("ValidateInventory", Condition = "Response.AllInStock == true")]
[DependsOn("ChargePayment", Condition = "Response.ChargeSucceeded == true")]
public partial class FulfillOrderSaga
{
[SagaStep("PickItems", Order = 1)]
public partial class PickItems { }
[SagaStep("ShipPackage", Order = 2)]
public partial class ShipPackage { }
}
[DistributedTask("SendConfirmation")]
[DependsOn("FulfillOrder")]
public partial class SendConfirmationSaga
{
[SagaStep("GenerateInvoice", Order = 1)]
public partial class GenerateInvoice { }
[SagaStep("SendEmail", Order = 2)]
public partial class SendEmail { }
}Four distributed tasks. Three dependency edges. Two conditions. The generator produces a DAG executor that orchestrates the entire process.
The DAG
FulfillOrder waits for both ValidateInventory and ChargePayment. SendConfirmation waits for FulfillOrder. The generator resolves the topological order and evaluates conditions at each edge.
Shared Process Variables
Chained tasks need shared state. The IProcessContext provides typed, scoped variables that flow across tasks within the same process:
public interface IProcessContext
{
/// Set a variable visible to all downstream tasks
void Set<T>(string key, T value);
/// Get a variable set by an upstream task
T Get<T>(string key);
/// Try to get a variable (returns false if not set)
bool TryGet<T>(string key, out T value);
/// The process instance ID (shared across all tasks in the chain)
Guid ProcessId { get; }
/// All variables as a read-only snapshot (for audit)
IReadOnlyDictionary<string, object> Snapshot();
}public interface IProcessContext
{
/// Set a variable visible to all downstream tasks
void Set<T>(string key, T value);
/// Get a variable set by an upstream task
T Get<T>(string key);
/// Try to get a variable (returns false if not set)
bool TryGet<T>(string key, out T value);
/// The process instance ID (shared across all tasks in the chain)
Guid ProcessId { get; }
/// All variables as a read-only snapshot (for audit)
IReadOnlyDictionary<string, object> Snapshot();
}Each task receives the context through dependency injection:
public partial class ValidateInventorySaga
{
[SagaStep("CheckStock", Order = 1)]
public partial class CheckStock
{
public async Task ExecuteAsync(
CheckStockData data,
IProcessContext process,
CancellationToken ct)
{
var result = await _inventoryService.CheckAsync(data.Items, ct);
// Set variables for downstream tasks
process.Set("AllInStock", result.AllAvailable);
process.Set("ReservedItems", result.ReservedItems);
process.Set("InventoryCheckTime", DateTimeOffset.UtcNow);
}
}
}public partial class ValidateInventorySaga
{
[SagaStep("CheckStock", Order = 1)]
public partial class CheckStock
{
public async Task ExecuteAsync(
CheckStockData data,
IProcessContext process,
CancellationToken ct)
{
var result = await _inventoryService.CheckAsync(data.Items, ct);
// Set variables for downstream tasks
process.Set("AllInStock", result.AllAvailable);
process.Set("ReservedItems", result.ReservedItems);
process.Set("InventoryCheckTime", DateTimeOffset.UtcNow);
}
}
}Downstream tasks read the variables:
public partial class FulfillOrderSaga
{
[SagaStep("PickItems", Order = 1)]
public partial class PickItems
{
public async Task ExecuteAsync(
PickItemsData data,
IProcessContext process,
CancellationToken ct)
{
var reservedItems = process.Get<List<ReservedItem>>("ReservedItems");
// Pick the reserved items from warehouse
}
}
}public partial class FulfillOrderSaga
{
[SagaStep("PickItems", Order = 1)]
public partial class PickItems
{
public async Task ExecuteAsync(
PickItemsData data,
IProcessContext process,
CancellationToken ct)
{
var reservedItems = process.Get<List<ReservedItem>>("ReservedItems");
// Pick the reserved items from warehouse
}
}
}The process context is persisted alongside the task instances. Every Set call is recorded in the audit trail with the variable name, type, serialized value, and timestamp.
Condition Evaluation
The Condition property on [DependsOn] is an expression evaluated against the upstream task's response:
[DependsOn("ValidateInventory", Condition = "Response.AllInStock == true")][DependsOn("ValidateInventory", Condition = "Response.AllInStock == true")]The generator parses this at compile time and emits a strongly-typed condition evaluator:
// ── Generated: ChargePaymentSaga.DependencyConditions.g.cs ──
public static class ChargePaymentDependencyConditions
{
public static bool EvaluateValidateInventory(
ValidateInventoryResponse response)
{
return response.AllInStock == true;
}
}// ── Generated: ChargePaymentSaga.DependencyConditions.g.cs ──
public static class ChargePaymentDependencyConditions
{
public static bool EvaluateValidateInventory(
ValidateInventoryResponse response)
{
return response.AllInStock == true;
}
}No expression trees at runtime. No string parsing. The condition is compiled C# code. If the condition references a property that does not exist on the response type, the compiler catches it.
A ConditionSnapshot is recorded when the condition is evaluated:
public sealed record ConditionSnapshot(
string DependencyTaskName,
string Expression,
bool Result,
IReadOnlyDictionary<string, object> VariableValues,
DateTimeOffset EvaluatedAt);public sealed record ConditionSnapshot(
string DependencyTaskName,
string Expression,
bool Result,
IReadOnlyDictionary<string, object> VariableValues,
DateTimeOffset EvaluatedAt);This is the audit trail. Six months from now, when someone asks "why did fulfillment start even though inventory was low?" — the snapshot answers: AllInStock was true at 2026-04-03T14:22:31Z because the stock check ran before the concurrent order consumed the last items.
The Generated DAG Executor
// ── Generated: ProcessOrder.DagExecutor.g.cs ──
public sealed class ProcessOrderDagExecutor : IDagExecutor
{
private readonly IDistributedTaskSubmitter _submitter;
private readonly IProcessContextStore _contextStore;
private readonly ILogger<ProcessOrderDagExecutor> _logger;
public async Task ExecuteAsync(
ProcessOrderRequest request,
CancellationToken ct)
{
var processId = Guid.NewGuid();
var context = await _contextStore.CreateAsync(processId, ct);
// Layer 0: no dependencies
var validateResult = await _submitter
.SubmitAndWaitAsync<ValidateInventoryResponse>(
new ValidateInventoryRequest { Items = request.Items },
processId, ct);
// Evaluate conditions for Layer 1
if (!ChargePaymentDependencyConditions
.EvaluateValidateInventory(validateResult))
{
_logger.LogInformation(
"Process {ProcessId}: inventory check failed, aborting",
processId);
return;
}
// Layer 1: depends on ValidateInventory
var chargeResult = await _submitter
.SubmitAndWaitAsync<ChargePaymentResponse>(
new ChargePaymentRequest { Amount = request.TotalAmount },
processId, ct);
// Evaluate conditions for Layer 2
if (!FulfillOrderDependencyConditions
.EvaluateChargePayment(chargeResult))
{
_logger.LogWarning(
"Process {ProcessId}: payment failed, triggering compensation",
processId);
// Compensate ValidateInventory (release reserved items)
return;
}
// Layer 2: depends on ValidateInventory + ChargePayment
var fulfillResult = await _submitter
.SubmitAndWaitAsync<FulfillOrderResponse>(
new FulfillOrderRequest(),
processId, ct);
// Layer 3: depends on FulfillOrder
await _submitter.SubmitAsync(
new SendConfirmationRequest { OrderId = request.OrderId },
processId, ct);
}
}// ── Generated: ProcessOrder.DagExecutor.g.cs ──
public sealed class ProcessOrderDagExecutor : IDagExecutor
{
private readonly IDistributedTaskSubmitter _submitter;
private readonly IProcessContextStore _contextStore;
private readonly ILogger<ProcessOrderDagExecutor> _logger;
public async Task ExecuteAsync(
ProcessOrderRequest request,
CancellationToken ct)
{
var processId = Guid.NewGuid();
var context = await _contextStore.CreateAsync(processId, ct);
// Layer 0: no dependencies
var validateResult = await _submitter
.SubmitAndWaitAsync<ValidateInventoryResponse>(
new ValidateInventoryRequest { Items = request.Items },
processId, ct);
// Evaluate conditions for Layer 1
if (!ChargePaymentDependencyConditions
.EvaluateValidateInventory(validateResult))
{
_logger.LogInformation(
"Process {ProcessId}: inventory check failed, aborting",
processId);
return;
}
// Layer 1: depends on ValidateInventory
var chargeResult = await _submitter
.SubmitAndWaitAsync<ChargePaymentResponse>(
new ChargePaymentRequest { Amount = request.TotalAmount },
processId, ct);
// Evaluate conditions for Layer 2
if (!FulfillOrderDependencyConditions
.EvaluateChargePayment(chargeResult))
{
_logger.LogWarning(
"Process {ProcessId}: payment failed, triggering compensation",
processId);
// Compensate ValidateInventory (release reserved items)
return;
}
// Layer 2: depends on ValidateInventory + ChargePayment
var fulfillResult = await _submitter
.SubmitAndWaitAsync<FulfillOrderResponse>(
new FulfillOrderRequest(),
processId, ct);
// Layer 3: depends on FulfillOrder
await _submitter.SubmitAsync(
new SendConfirmationRequest { OrderId = request.OrderId },
processId, ct);
}
}The executor respects the topological order. Tasks at the same layer with no mutual dependencies can run in parallel (the generator detects this from the DAG structure). Condition failures short-circuit the process and trigger upstream compensation.
What This Is
- A task chaining mechanism with typed conditions and shared state
- A DAG executor that respects dependency order and evaluates conditions
- An audit-first system where every condition evaluation is recorded
- A compile-time checked process — invalid conditions, missing dependencies, and type mismatches are caught by the compiler
What This Is NOT
This is not a full BPMN engine. There is no visual designer. There are no swimlanes. There are no inclusive gateways or complex merge semantics. There is no DMN decision table integration.
It is not Camunda. It is not Zeebe. It is not Temporal (though it shares the "durable execution" philosophy).
It is a developer-facing DSL for machine-driven processes. The conditions are expressions on data, not human decisions. The "actors" are services, not people. The "forms" are API requests, not UI screens.
Human workflow — approvals, reviews, escalations — belongs to the Workflow DSL (Part XII). Machine workflow — validation, payment, fulfillment, notification — belongs here.
The two compose. That is the point.
The Future: A Full BPM DSL
The DistributedTask DSL handles machine processes. The Workflow DSL handles human processes. A future BPM DSL will unify both:
- Process definitions that mix human stages and machine tasks
- Gateways — exclusive, parallel, inclusive — as first-class attributes
- Subprocess nesting — a process step that is itself a process
- Timer events — escalation after N hours, SLA tracking
- Error boundary events — catch-and-redirect patterns
That is a future article. The building blocks are already here.
What's Next
Part XIV covers deployment — graceful worker shutdown, rolling updates without saga corruption, and horizontal scaling with distributed locks.