Continuum.Workflow (continuum v0.5.0)

Copy Markdown View Source

Defines a durable workflow.

defmodule MyApp.OrderFlow do
  use Continuum.Workflow, version: 1, retention: {:days, 30}

  def run(%{order_id: id, items: items}) do
    {:ok, validated} = activity Validation.check(items)
    {:ok, _charge}   = activity Payments.charge(id, validated.total),
                                retry: [max_attempts: 5, backoff: :exponential]

    case await signal(:fraud_review, timeout: hours(24)) do
      {:ok, :approved} -> activity Fulfillment.ship(id)
      {:ok, :rejected} -> {:error, :rejected}
      :timeout         -> activity Fulfillment.ship(id)
    end
  end
end

Determinism

Every public and private function in the module is scanned at compile time by Continuum.AstCheck. Calls that are known to be non-deterministic (DateTime.utc_now/0, :rand.uniform/0, IO.puts/1, ETS access, …) are compile errors with a remediation hint.

See Continuum.AstCheck.forbidden_calls/0 for the denylist and the :trusted_modules config knob for extending the allowlist.

Versioning

The module's AST is hashed at compile time and exposed through __continuum_workflow__/0. Each Postgres run stores that hash on start so drift can be surfaced. As of v0.3, callers may pass workflow: LogicalModule to use Continuum.Workflow to register a concrete module as a hash-specific entrypoint for a logical workflow.

Summary

Functions

Macro: schedule an activity. The result is journaled on first execution and replayed on resume.

Macro: wait for an external signal, optionally with a timeout.

Macro: suspend until a previously start_child-ed child terminates.

Macro: run the compensation of one successful compensated activity.

Macro: run all pending compensations in LIFO order (most-recent first).

Macro: tail-call continuation — complete this run and start a fresh one on the same workflow with new input.

Returns a duration in milliseconds.

Macro: start a child workflow asynchronously, returning a %Continuum.ChildRef{}.

Macro: durable timer.

Functions

activity(call, opts \\ [])

(macro)

Macro: schedule an activity. The result is journaled on first execution and replayed on resume.

activity Payments.charge(order_id, amount)
activity Payments.charge(order_id, amount), retry: [max_attempts: 5]

await(arg)

(macro)

Macro: wait for an external signal, optionally with a timeout.

await signal(:approved)
await signal(:approved, timeout: hours(24))

Or wait for a child workflow. The shorthand accepts exactly child Mod.run(input); use start_child/3 for other setup shapes.

await child MyApp.AuditFlow.run(%{batch_id: id})

await_child(ref)

(since 0.3.0) (macro)

Macro: suspend until a previously start_child-ed child terminates.

Returns the child's result ({:ok, _}/{:error, _} term), the error on child failure, or {:error, :child_cancelled} if the child was cancelled.

compensate(ref)

(since 0.3.0) (macro)

Macro: run the compensation of one successful compensated activity.

{:ok, charge} = activity Payments.charge(id, total), compensate: {Payments, :refund, [id]}
# ...
compensate(charge)

Takes the %Continuum.ActivityRef{} (or {:ok, ref}) returned by a compensated activity/2 call, schedules its compensation MFA through the activity worker, and removes it from the pending compensation set so a later compensate_all/0 cannot run it twice. Returns {:ok, result} or, if the compensation fails terminally, {:error, reason} — the run continues either way.

compensate_all()

(since 0.3.0) (macro)

Macro: run all pending compensations in LIFO order (most-recent first).

rescue
  e ->
    compensate_all()
    reraise e, __STACKTRACE__

Each successful compensated activity that has not already been compensated by compensate/1 is rolled back, newest first. Returns :ok.

compensate_all(opts)

(since 0.4.0) (macro)

continue_as_new(input)

(since 0.3.0) (macro)

Macro: tail-call continuation — complete this run and start a fresh one on the same workflow with new input.

def run(%{cycles_done: n} = state) do
  activity Billing.charge(state.customer_id)
  timer(days(30))

  if n >= 11 do
    {:ok, :year_complete}
  else
    continue_as_new(%{state | cycles_done: n + 1})
  end
end

The current run is marked completed with result: {:continued, next_run_id}; a new run starts with the given input, sharing the chain's correlation_id. Use it to keep history bounded for long-running / cron-style workflows.

days(n)

(macro)

hours(n)

(macro)

minutes(n)

(macro)

seconds(n)

(macro)

Returns a duration in milliseconds.

start_child(workflow, input, opts \\ [])

(since 0.3.0) (macro)

Macro: start a child workflow asynchronously, returning a %Continuum.ChildRef{}.

ref = start_child MyApp.OrderFlow, %{order_id: id}, id: "order-#{id}"
# ... do other work ...
result = await_child(ref)

opts accepts id: to tie the child's deterministic run id to a key under this parent.

timer(duration)

(macro)

Macro: durable timer.

timer(hours(24))