Durable.DSL.Step (Durable v0.1.0-rc)

View Source

Provides macros for defining workflow steps using a pure pipeline model.

Pipeline Model

Data flows from step to step. Each step receives the previous step's output and returns {:ok, data} or {:error, reason}.

Future work — each/foreach isolation (L-5)

If/when this DSL gains an iteration primitive (e.g. each/3 to run a step once per item in a list), design with context isolation per iteration. Parallel-style shared context across concurrent iterations creates races on put_context/2 writes; isolated contexts avoid that class of bug. See docs/bug-reports/2026-04-13-follow-up-audit.md.

Usage

workflow "process_order" do
  step :validate, fn order ->
    {:ok, %{order_id: order["id"], items: order["items"]}}
  end

  step :calculate, fn data ->
    total = data.items |> Enum.map(& &1["price"]) |> Enum.sum()
    {:ok, assign(data, :total, total)}
  end

  step :charge, [retry: [max_attempts: 3]], fn data ->
    {:ok, assign(data, :receipt, PaymentService.charge(data.total))}
  end
end

Options

  • :retry - Retry configuration
    • :max_attempts - Maximum retry attempts (default: 1)
    • :backoff - Backoff strategy: :exponential, :linear, :constant
    • :base - Base for backoff calculation (default: 2)
    • :max_backoff - Maximum backoff in ms (default: 3600000)
  • :timeout - Step timeout in milliseconds
  • :compensate - Name of compensation function for saga pattern
  • :queue - Override queue for this step

Summary

Functions

Defines a conditional branch within a workflow.

Defines a compensation handler for saga pattern.

Defines a decision step for conditional branching.

Defines a parallel execution block.

Defines a step within a workflow.

Functions

branch(opts, list)

(macro)

Defines a conditional branch within a workflow.

The :on option takes a function that extracts the value to match. Only ONE branch executes based on the condition value.

Examples

branch on: fn data -> data.type end do
  "physical" ->
    step :ship, fn data -> {:ok, assign(data, :shipped, true)} end
  "digital" ->
    step :download, fn data -> {:ok, assign(data, :download_url, "...")} end
  _ ->
    step :manual, fn data -> {:ok, assign(data, :needs_review, true)} end
end

compensate(name, opts_or_fn, maybe_fn \\ nil)

(macro)

Defines a compensation handler for saga pattern.

Compensations receive the current data and return {:ok, data}.

Examples

compensate :cancel_flight, fn data ->
  FlightAPI.cancel(data.flight_booking_id)
  {:ok, data}
end

decision(name, opts_or_fn, maybe_fn \\ nil)

(macro)

Defines a decision step for conditional branching.

Decision steps can return:

  • {:ok, data} - Continue to the next sequential step
  • {:goto, :step_name, data} - Jump to the named step

Examples

decision :check_amount, fn data ->
  if data.total > 1000 do
    {:goto, :manual_review, data}
  else
    {:ok, data}
  end
end

parallel(opts \\ [], list)

(macro)

Defines a parallel execution block.

All steps inside execute concurrently. Each step receives a copy of the context. Results are collected into a __results__ map with tagged tuples.

Options

  • :into - Optional callback to transform results (default: none)
    • Receives (ctx, results) where results = %{step_name => {:ok, data} | {:error, reason}}

    • Returns {:ok, ctx} | {:error, reason} | {:goto, step, ctx}

  • :on_error - Error handling (default: :fail_fast)
    • :fail_fast - Cancel siblings on first failure
    • :complete_all - Wait for all, collect results

Behavior

Without :into, results go to ctx.__results__ and the next step handles them:

parallel do
  step :payment, fn ctx -> {:ok, %{id: 123}} end
  step :delivery, fn ctx -> {:error, :not_found} end
end
# Next step receives:
# %{...ctx, __results__: %{payment: {:ok, %{id: 123}}, delivery: {:error, :not_found}}}

With :into, you control what the next step receives:

parallel into: fn ctx, results ->
  case {results.payment, results.delivery} do
    {{:ok, payment}, {:ok, _}} ->
      {:ok, Map.put(ctx, :payment_id, payment.id)}
    {{:ok, _}, {:error, :not_found}} ->
      {:goto, :handle_backorder, ctx}
    _ ->
      {:error, "Critical failure"}
  end
end do
  step :payment, fn ctx -> {:ok, %{id: 123}} end
  step :delivery, fn ctx -> {:error, :not_found} end
end

Step Options

  • :returns - Key name for this step's result (default: step name)

    parallel do

    step :fetch_order, returns: :order do
      fn ctx -> {:ok, %{items: [...]}} end
    end

    end # Result: %{...ctx, results: %{order: {:ok, %{items: [...]}}}}

step(name, opts_or_fn, maybe_fn \\ nil)

(macro)

Defines a step within a workflow.

The step receives the previous step's output (or workflow input for the first step) and must return {:ok, data} or {:error, reason}.

Examples

step :validate, fn order ->
  {:ok, %{order_id: order["id"], items: order["items"]}}
end

step :charge, [retry: [max_attempts: 3]], fn data ->
  {:ok, assign(data, :receipt, PaymentService.charge(data.total))}
end