Durable.DSL.Step (Durable v0.1.0-rc)
View SourceProvides macros for defining workflow steps using a pure pipeline model.
Pipeline Model
Data flows from step to step. Each step receives the previous step's output
and returns {:ok, data} or {:error, reason}.
Future work — each/foreach isolation (L-5)
If/when this DSL gains an iteration primitive (e.g. each/3 to run a step
once per item in a list), design with context isolation per iteration.
Parallel-style shared context across concurrent iterations creates races on
put_context/2 writes; isolated contexts avoid that class of bug. See
docs/bug-reports/2026-04-13-follow-up-audit.md.
Usage
workflow "process_order" do
step :validate, fn order ->
{:ok, %{order_id: order["id"], items: order["items"]}}
end
step :calculate, fn data ->
total = data.items |> Enum.map(& &1["price"]) |> Enum.sum()
{:ok, assign(data, :total, total)}
end
step :charge, [retry: [max_attempts: 3]], fn data ->
{:ok, assign(data, :receipt, PaymentService.charge(data.total))}
end
endOptions
:retry- Retry configuration:max_attempts- Maximum retry attempts (default: 1):backoff- Backoff strategy::exponential,:linear,:constant:base- Base for backoff calculation (default: 2):max_backoff- Maximum backoff in ms (default: 3600000)
:timeout- Step timeout in milliseconds:compensate- Name of compensation function for saga pattern:queue- Override queue for this step
Summary
Functions
Defines a conditional branch within a workflow.
Defines a compensation handler for saga pattern.
Defines a decision step for conditional branching.
Defines a parallel execution block.
Defines a step within a workflow.
Functions
Defines a conditional branch within a workflow.
The :on option takes a function that extracts the value to match.
Only ONE branch executes based on the condition value.
Examples
branch on: fn data -> data.type end do
"physical" ->
step :ship, fn data -> {:ok, assign(data, :shipped, true)} end
"digital" ->
step :download, fn data -> {:ok, assign(data, :download_url, "...")} end
_ ->
step :manual, fn data -> {:ok, assign(data, :needs_review, true)} end
end
Defines a compensation handler for saga pattern.
Compensations receive the current data and return {:ok, data}.
Examples
compensate :cancel_flight, fn data ->
FlightAPI.cancel(data.flight_booking_id)
{:ok, data}
end
Defines a decision step for conditional branching.
Decision steps can return:
{:ok, data}- Continue to the next sequential step{:goto, :step_name, data}- Jump to the named step
Examples
decision :check_amount, fn data ->
if data.total > 1000 do
{:goto, :manual_review, data}
else
{:ok, data}
end
end
Defines a parallel execution block.
All steps inside execute concurrently. Each step receives a copy of the context.
Results are collected into a __results__ map with tagged tuples.
Options
:into- Optional callback to transform results (default: none)Receives
(ctx, results)where results =%{step_name => {:ok, data} | {:error, reason}}Returns
{:ok, ctx}|{:error, reason}|{:goto, step, ctx}
:on_error- Error handling (default::fail_fast):fail_fast- Cancel siblings on first failure:complete_all- Wait for all, collect results
Behavior
Without :into, results go to ctx.__results__ and the next step handles them:
parallel do
step :payment, fn ctx -> {:ok, %{id: 123}} end
step :delivery, fn ctx -> {:error, :not_found} end
end
# Next step receives:
# %{...ctx, __results__: %{payment: {:ok, %{id: 123}}, delivery: {:error, :not_found}}}With :into, you control what the next step receives:
parallel into: fn ctx, results ->
case {results.payment, results.delivery} do
{{:ok, payment}, {:ok, _}} ->
{:ok, Map.put(ctx, :payment_id, payment.id)}
{{:ok, _}, {:error, :not_found}} ->
{:goto, :handle_backorder, ctx}
_ ->
{:error, "Critical failure"}
end
end do
step :payment, fn ctx -> {:ok, %{id: 123}} end
step :delivery, fn ctx -> {:error, :not_found} end
endStep Options
:returns- Key name for this step's result (default: step name)parallel do
step :fetch_order, returns: :order do fn ctx -> {:ok, %{items: [...]}} end endend # Result: %{...ctx, results: %{order: {:ok, %{items: [...]}}}}
Defines a step within a workflow.
The step receives the previous step's output (or workflow input for the first step)
and must return {:ok, data} or {:error, reason}.
Examples
step :validate, fn order ->
{:ok, %{order_id: order["id"], items: order["items"]}}
end
step :charge, [retry: [max_attempts: 3]], fn data ->
{:ok, assign(data, :receipt, PaymentService.charge(data.total))}
end