Skip to main content
Welcome. This site supports keyboard navigation and screen readers. Press ? at any time for keyboard shortcuts. Press [ to focus the sidebar, ] to focus the content. High-contrast themes are available via the toolbar.
serard@dev00:~/cv

The DSL

"Every attribute is a [MetaConcept]. Every constraint is a C# static method. Every step has typed data."


M3 Grounding

Every attribute in the DistributedTask DSL follows the CMF convention established by the M3 meta-metamodel:

  • [MetaConcept] — declares the attribute as a modeling concept
  • [MetaProperty] — typed configuration slots
  • [MetaReference] — directed associations between concepts
  • [MetaConstraint] — validation rules as C# static methods returning ConstraintResult
  • [MetaInherits] — metamodel-level inheritance

This means the DistributedTask DSL participates in the MetamodelRegistry, benefits from the same design-time validation, and follows the same source generation pipeline as DDD, Entity.Dsl, Content, Admin, Pages, and Workflow.


DistributedTask — The Root

The entry point. One class, one task, all steps in one place.

[MetaConcept(typeof(DistributedTaskConcept))]
[MetaConstraint("MustHaveAtLeastOneStep", nameof(MustHaveAtLeastOneStepConstraint),
    Message = "A distributed task must have at least one saga step")]
[MetaConstraint("MustHaveRequest", nameof(MustHaveRequestConstraint),
    Message = "A distributed task must have a corresponding [TaskRequest] class")]
[MetaConstraint("MustHaveResponse", nameof(MustHaveResponseConstraint),
    Message = "A distributed task must have a corresponding [TaskResponse] class")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class DistributedTaskAttribute : Attribute
{
    [MetaProperty("Name", "string", Required = true)]
    public string Name { get; }

    [MetaProperty("Queue", "string")]
    public string Queue { get; set; } = "default";

    [MetaProperty("MaxRetries", "int")]
    public int MaxRetries { get; set; } = 3;

    [MetaProperty("TimeoutSeconds", "int")]
    public int TimeoutSeconds { get; set; } = 300;

    [MetaProperty("Mode", "TaskMode")]
    public TaskMode Mode { get; set; } = TaskMode.Distributed;

    [MetaProperty("Serialization", "SerializationFormat")]
    public SerializationFormat Serialization { get; set; } = SerializationFormat.Json;

    [MetaProperty("ArchiveAfterDays", "int")]
    public int ArchiveAfterDays { get; set; } = 30;

    public DistributedTaskAttribute(string name) => Name = name;

    // ── Constraints as C# static methods ──

    public static ConstraintResult MustHaveAtLeastOneStepConstraint(
        ConceptValidationContext ctx)
    {
        var hasSteps = ctx.ClassAttributes.Any(a =>
            a is SagaStepAttribute or FileUploadStepAttribute
            or FileDownloadStepAttribute or CustomStepAttribute);
        return hasSteps
            ? ConstraintResult.Satisfied()
            : ConstraintResult.Failed("No saga steps found on this class");
    }

    public static ConstraintResult MustHaveRequestConstraint(
        ConceptValidationContext ctx)
    {
        var taskName = ctx.ClassAttributes.OfType<DistributedTaskAttribute>().First().Name;
        var hasRequest = ctx.Compilation.GetTypesByAttribute<TaskRequestAttribute>()
            .Any(r => r.TaskName == taskName);
        return hasRequest
            ? ConstraintResult.Satisfied()
            : ConstraintResult.Failed($"No [TaskRequest(\"{taskName}\")] class found");
    }

    public static ConstraintResult MustHaveResponseConstraint(
        ConceptValidationContext ctx)
    {
        var taskName = ctx.ClassAttributes.OfType<DistributedTaskAttribute>().First().Name;
        var hasResponse = ctx.Compilation.GetTypesByAttribute<TaskResponseAttribute>()
            .Any(r => r.TaskName == taskName);
        return hasResponse
            ? ConstraintResult.Satisfied()
            : ConstraintResult.Failed($"No [TaskResponse(\"{taskName}\")] class found");
    }
}

public enum TaskMode { InProcess, Distributed }
public enum SerializationFormat { Json, MessagePack, Protobuf }

TaskRequest / TaskResponse — Typed Payloads

[MetaConcept(typeof(TaskRequestConcept))]
[MetaReference("DistributedTask", "DistributedTask", Multiplicity = "1")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class TaskRequestAttribute : Attribute
{
    [MetaProperty("TaskName", "string", Required = true)]
    public string TaskName { get; }

    public TaskRequestAttribute(string taskName) => TaskName = taskName;
}

[MetaConcept(typeof(TaskResponseConcept))]
[MetaReference("DistributedTask", "DistributedTask", Multiplicity = "1")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class TaskResponseAttribute : Attribute
{
    [MetaProperty("TaskName", "string", Required = true)]
    public string TaskName { get; }

    public TaskResponseAttribute(string taskName) => TaskName = taskName;
}

The source generator reads the TaskRequest class and generates a *WorkerMessage — a serializable version of the request without IFormFile properties, replaced by S3 keys from the upload steps. The developer never has to define the worker message manually.

Every TaskRequest also gets a generated ListeningStrategy property (see Part X):

// Generated property injected into the partial class
public partial class CreateZipRequest
{
    public ListeningStrategy Listening { get; set; } = ListeningStrategy.Polling;
    public string? WebhookUrl { get; set; }
}

SagaStep — The Base

[MetaConcept(typeof(SagaStepConcept))]
[MetaReference("DistributedTask", "DistributedTask", Multiplicity = "1")]
[MetaConstraint("OrderMustBePositive", nameof(OrderMustBePositiveConstraint),
    Message = "Step order must be a positive integer")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class SagaStepAttribute : Attribute
{
    [MetaProperty("Name", "string", Required = true)]
    public string Name { get; }

    [MetaProperty("Order", "int", Required = true)]
    public int Order { get; set; }

    [MetaProperty("Group", "string")]
    public string? Group { get; set; }  // for ParallelStepGroup membership

    public SagaStepAttribute(string name) => Name = name;

    public static ConstraintResult OrderMustBePositiveConstraint(
        ConceptValidationContext ctx)
    {
        var order = ctx.ClassAttributes.OfType<SagaStepAttribute>()
            .Select(s => s.Order);
        return order.All(o => o > 0)
            ? ConstraintResult.Satisfied()
            : ConstraintResult.Failed("Step orders must be positive integers");
    }
}

Built-in Step Types (MetaInherits)

[MetaConcept(typeof(FileUploadStepConcept))]
[MetaInherits(typeof(SagaStepConcept))]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class FileUploadStepAttribute : Attribute
{
    [MetaProperty("Name", "string", Required = true)]
    public string Name { get; }

    [MetaProperty("Order", "int", Required = true)]
    public int Order { get; set; }

    [MetaProperty("Bucket", "string", Required = true)]
    public string Bucket { get; set; }

    [MetaProperty("KeyPrefix", "string")]
    public string? KeyPrefix { get; set; }  // supports {TaskId} placeholder

    [MetaProperty("SourceProperty", "string", Required = true)]
    public string SourceProperty { get; set; }  // property name on request

    public FileUploadStepAttribute(string name) => Name = name;
}

[MetaConcept(typeof(FileDownloadStepConcept))]
[MetaInherits(typeof(SagaStepConcept))]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class FileDownloadStepAttribute : Attribute
{
    [MetaProperty("Name", "string", Required = true)]
    public string Name { get; }

    [MetaProperty("Order", "int", Required = true)]
    public int Order { get; set; }

    [MetaProperty("Bucket", "string", Required = true)]
    public string Bucket { get; set; }

    [MetaProperty("Keys", "string")]
    public string? Keys { get; set; }  // property name holding S3 keys from a previous step

    public FileDownloadStepAttribute(string name) => Name = name;
}

[MetaConcept(typeof(CustomStepConcept))]
[MetaInherits(typeof(SagaStepConcept))]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class CustomStepAttribute : Attribute
{
    [MetaProperty("Name", "string", Required = true)]
    public string Name { get; }

    [MetaProperty("Order", "int", Required = true)]
    public int Order { get; set; }

    public CustomStepAttribute(string name) => Name = name;
}

The hierarchy:

Diagram

Built-in steps (FileUploadStep, FileDownloadStep) generate complete execute and compensate implementations. Custom steps generate abstract methods — the developer must provide the implementation.


Step Placement — The API/Worker Split

The saga class is shared between the API and the Worker — all steps are visible in one place. But each step declares where it runs:

[MetaConcept(typeof(StepPlacementConcept))]
[MetaReference("SagaStep", "SagaStep", Multiplicity = "1")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class StepPlacementAttribute : Attribute
{
    [MetaProperty("StepName", "string", Required = true)]
    public string StepName { get; }

    [MetaProperty("Host", "StepHost", Required = true)]
    public StepHost Host { get; set; }

    public StepPlacementAttribute(string stepName) => StepName = stepName;
}

public enum StepHost
{
    Api,     // Runs in the API process (before queue publish)
    Worker,  // Runs in the worker process (after queue consume)
}

The source generator uses [StepPlacement] to split the saga into two orchestrators:

  • *ApiOrchestrator — executes StepHost.Api steps, then publishes a message containing the typed step data from completed API steps
  • *WorkerOrchestrator — consumes the message, restores context, and executes StepHost.Worker steps

Both inherit from a shared *OrchestratorKernel containing the common logic: retry, compensation, progress tracking, and audit.


Parallel Steps — Fan-out/Fan-in

[MetaConcept(typeof(ParallelStepGroupConcept))]
[MetaConstraint("MustHaveMultipleSteps", nameof(MustHaveMultipleStepsConstraint),
    Message = "A parallel step group must contain at least two steps")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class ParallelStepGroupAttribute : Attribute
{
    [MetaProperty("Name", "string", Required = true)]
    public string Name { get; }

    [MetaProperty("Order", "int", Required = true)]
    public int Order { get; set; }

    [MetaProperty("AwaitAll", "bool")]
    public bool AwaitAll { get; set; } = true;  // false = proceed after first success

    public ParallelStepGroupAttribute(string name) => Name = name;

    public static ConstraintResult MustHaveMultipleStepsConstraint(
        ConceptValidationContext ctx)
    {
        var groupName = ctx.ClassAttributes.OfType<ParallelStepGroupAttribute>()
            .Select(g => g.Name);
        foreach (var name in groupName)
        {
            var memberCount = ctx.ClassAttributes.OfType<SagaStepAttribute>()
                .Count(s => s.Group == name);
            if (memberCount < 2)
                return ConstraintResult.Failed(
                    $"Group '{name}' has {memberCount} step(s), needs at least 2");
        }
        return ConstraintResult.Satisfied();
    }
}

Usage:

[ParallelStepGroup("TranscodeAll", Order = 3, AwaitAll = true)]
[SagaStep("Transcode720p", Group = "TranscodeAll", Order = 3)]
[SagaStep("Transcode1080p", Group = "TranscodeAll", Order = 3)]
[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]

Steps within a group share the same Order value. The generator executes them concurrently with Task.WhenAll and compensates completed parallel steps if any fail.


Step Conditions — Conditional Execution

[MetaConcept(typeof(StepConditionConcept))]
[MetaReference("SagaStep", "SagaStep", Multiplicity = "1")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class StepConditionAttribute : Attribute
{
    [MetaProperty("StepName", "string", Required = true)]
    public string StepName { get; }

    [MetaProperty("Description", "string", Required = true)]
    public string Description { get; }

    [MetaProperty("MethodName", "string", Required = true)]
    public string MethodName { get; }  // nameof(IsSource4KOrHigher)

    public StepConditionAttribute(string stepName, string description, string methodName)
    {
        StepName = stepName;
        Description = description;
        MethodName = methodName;
    }
}

Usage:

[SagaStep("Transcode4K", Group = "TranscodeAll", Order = 3)]
[StepCondition("Transcode4K", "Source resolution >= 4K", nameof(IsSource4KOrHigher))]
public partial class VideoTranscodeTask
{
    private bool IsSource4KOrHigher(SagaContext<VideoTranscodeRequest> context)
    {
        var metadata = context.GetStepData<ExtractMetadataStepData>();
        return metadata.Width >= 3840 && metadata.Height >= 2160;
    }
}

When a condition evaluates, the result is snapshotted as a StepConditionSnapshot in the database — capturing the condition name, expression, evaluated value, and timestamp. This provides an audit trail for debugging and compliance: "Why was 4K skipped? Because the source was 1920x1080 at 2026-04-03T14:22:07Z."


Retry Policy — Per-Step Polly Integration

[MetaConcept(typeof(RetryPolicyConcept))]
[MetaReference("SagaStep", "SagaStep", Multiplicity = "1")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public sealed class RetryPolicyAttribute : Attribute
{
    [MetaProperty("StepName", "string", Required = true)]
    public string StepName { get; }

    [MetaProperty("MaxRetries", "int")]
    public int MaxRetries { get; set; } = 3;

    [MetaProperty("BackoffType", "BackoffType")]
    public BackoffType BackoffType { get; set; } = BackoffType.Exponential;

    [MetaProperty("DelayMs", "int")]
    public int DelayMs { get; set; } = 500;

    [MetaProperty("OnRetryExhausted", "RetryExhaustedAction")]
    public RetryExhaustedAction OnRetryExhausted { get; set; }
        = RetryExhaustedAction.Compensate;

    public RetryPolicyAttribute(string stepName) => StepName = stepName;
}

public enum BackoffType { Constant, Linear, Exponential }
public enum RetryExhaustedAction { Compensate, Fail, DeadLetter }

The generator emits IAsyncPolicy instances wrapping each step execution. Each attempt is recorded in the SagaStepRecord.AttemptCount.


Cancellable — Task Cancellation

[MetaConcept(typeof(CancellableConcept))]
[MetaReference("DistributedTask", "DistributedTask", Multiplicity = "1")]
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class CancellableAttribute : Attribute { }

When present, the generator emits:

  • A DELETE /api/tasks/{taskId} endpoint on the controller
  • CancellationToken propagation into running steps
  • Compensation of already-completed steps on cancellation
  • CancelledAt timestamp and audit entry

Typed Step Data — ValueObjects, Not JSON Bags

Every step gets a generated ValueObject for its inter-step data. Instead of:

// Fragile: string key, no compile-time checking
context.StepData.Set("UploadedKeys", keys);
var keys = context.StepData.Get<List<string>>("UploadedKeys");  // typo = runtime error

The developer writes:

// Type-safe: compiler catches errors
stepData.UploadedKeys = keys;
var keys = context.GetStepData<UploadSourceFilesStepData>().UploadedKeys;

The source generator analyzes the properties written in each step's override method and generates the corresponding ValueObject. The data is serialized (JSON/MessagePack/Protobuf) in the database but the developer API is fully typed.

See Part III for the concrete example.


Listening Strategy — Client-Chosen Notifications

public enum ListeningStrategy
{
    Polling,     // GET /api/tasks/{id}/status
    SSE,         // GET /api/tasks/{id}/stream
    WebSocket,   // ws:///api/tasks/{id}/ws
    SignalR,     // /hubs/tasks
    Webhook,     // POST to client-provided URL
}

The client picks its preferred notification mode in the submit request. The system generates all five transports, all powered by a single ITaskProgressTracker.UpdateAsync() call. Polling is always available as fallback.

Full details in Part X.


Compile-Time Diagnostics

The source generator validates the attribute surface and emits Roslyn diagnostics:

ID Severity Rule
DST001 Error DistributedTask has zero SagaSteps
DST002 Error No [TaskRequest] class references this task
DST003 Error No [TaskResponse] class references this task
DST004 Error Step Order values are not unique (outside parallel groups)
DST005 Error FileUploadStep.SourceProperty does not exist on request class
DST006 Error FileDownloadStep.Keys references non-existent step data
DST007 Warning CustomStep has no override for Execute method
DST008 Error Step Order values have gaps (must be contiguous 1..N)
DST009 Warning No compensation override for CustomStep
DST010 Error Queue name is empty
DST011 Error ParallelStepGroup has fewer than 2 member steps
DST012 Error StepPlacement references unknown step name
DST013 Warning StepCondition.MethodName method not found on class
DST014 Error RetryPolicy references unknown step name

These appear as IDE squiggles and build errors, catching configuration mistakes before any code runs.


What's Next

Part III walks through the complete ZIP creation example — from attribute declaration through generated orchestrators, typed step data, and the developer's custom logic.