Continuum (continuum v0.5.0)

Copy Markdown View Source

OTP-native durable execution engine for Elixir.

Continuum lets you write a multi-step business process as straight-line Elixir code. The process survives crashes, node restarts, and partitions: the engine journals each effect to Postgres and replays the workflow's history through the same orchestration code on resume.

See Continuum.Workflow for the workflow DSL and Continuum.Activity for activities (the only place side effects are allowed inside a workflow).

Public API

Summary

Functions

Block until the run completes. Test/synchronous use only.

Cancel a running workflow.

Returns runtime child specs for a named, non-default Continuum instance.

Load one durable run by id.

Whether we are currently executing inside a workflow process. Useful in helper modules that branch on context.

The current wall-clock time, journaled and replayed deterministically.

Journaled patch marker for in-place, backward-compatible workflow changes.

Query durable runs with a closed, structured query spec.

Query durable runs for a named Continuum instance.

A pseudo-random float in [0, 1), journaled and replayed deterministically.

Merge JSON-encodable search attributes into a durable run row.

General-purpose escape hatch for an impure read whose result must be journaled and replayed.

Deliver a signal to a running workflow.

Deliver a signal to a running workflow, selecting a Continuum instance with :instance.

Start a new workflow run.

The current UTC date, journaled and replayed deterministically.

Recover an activity's raw return value from a compensation handle.

A v4 UUID, journaled and replayed deterministically.

Types

input()

@type input() :: term()

run_id()

@type run_id() :: binary()

workflow_module()

@type workflow_module() :: module()

Functions

await(run_id, timeout \\ 5000, opts \\ [])

@spec await(run_id(), timeout(), keyword()) :: {:ok, map()} | {:error, term()}

Block until the run completes. Test/synchronous use only.

cancel(run_id, opts \\ [])

@spec cancel(
  run_id(),
  keyword()
) :: :ok | {:error, term()}

Cancel a running workflow.

children(opts \\ [])

@spec children(keyword()) :: [Supervisor.child_spec()]

Returns runtime child specs for a named, non-default Continuum instance.

children =
  [
    MyApp.Repo,
    Continuum.children(name: :billing_continuum, repo: MyApp.Repo)
  ]

The default Continuum instance is owned by Continuum.Application and Continuum.children() returns [] to avoid duplicate process names.

Child-specific options may be passed with :workflow_modules, :heartbeater, :run_supervisor, :activity_supervisor, :recovery, :dispatcher, :activity_dispatcher, :timer_wheel, :signal_router, and :snapshotter. Passing false for a child omits it from the returned list.

get_run(run_id, opts \\ [])

@spec get_run(
  run_id(),
  keyword()
) :: {:ok, map()} | {:error, :not_found | term()}

Load one durable run by id.

in_workflow?()

@spec in_workflow?() :: boolean()

Whether we are currently executing inside a workflow process. Useful in helper modules that branch on context.

now()

(macro)

The current wall-clock time, journaled and replayed deterministically.

patched?(patch_name)

(since 0.3.0) (macro)

Journaled patch marker for in-place, backward-compatible workflow changes.

def run(input) do
  if Continuum.patched?(:add_fraud_check_v2) do
    activity FraudCheck.v2(input)
  else
    activity FraudCheck.v1(input)
  end
end

Inside a workflow the first call to patched?(name) at a given source line journals a patched event with value: true and returns true; the value is then replayed on resume so the run never changes branch mid-flight. A run that is replaying history recorded before the patch line existed returns false without consuming any event, keeping in-flight runs on the old path.

Outside a workflow process (test setup, ordinary code) it returns false.

Like now/0 and uuid4/0, this is a macro so it captures __CALLER__ for a stable command identity; modules that call it must require Continuum (use Continuum.Workflow does this for you).

query(opts \\ [])

@spec query(keyword()) :: {:ok, map()} | {:error, term()}

Query durable runs with a closed, structured query spec.

See Continuum.Query for supported :where, :order_by, and pagination options. Querying requires a Postgres-backed Continuum instance.

query(instance, opts)

@spec query(
  atom() | Continuum.Runtime.Instance.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Query durable runs for a named Continuum instance.

random()

(macro)

A pseudo-random float in [0, 1), journaled and replayed deterministically.

set_attributes(run_id, attributes, opts \\ [])

@spec set_attributes(run_id(), map(), keyword()) :: :ok | {:error, term()}

Merge JSON-encodable search attributes into a durable run row.

This is external metadata. It is not journaled and workflow code cannot read it during replay.

side_effect(fun)

@spec side_effect((-> term())) :: term()

General-purpose escape hatch for an impure read whose result must be journaled and replayed.

The function is called once on first execution; its return value is journaled and returned on every subsequent replay. Return values must be serializable via :erlang.term_to_binary/1 — pids, refs, ports, and similar local-only terms are rejected.

signal(run_id, name, payload)

@spec signal(run_id(), atom(), term()) :: :ok | {:error, term()}

Deliver a signal to a running workflow.

signal(run_id, name, payload, opts)

@spec signal(run_id(), atom(), term(), keyword()) :: :ok | {:error, term()}

Deliver a signal to a running workflow, selecting a Continuum instance with :instance.

start(workflow_module, input, opts \\ [])

@spec start(workflow_module(), input(), keyword()) ::
  {:ok, run_id()} | {:error, term()}

Start a new workflow run.

Options include :instance for selecting a named Continuum instance, :namespace for soft tenant scoping of list/query paths, :trace_context for persisting an opaque W3C traceparent binary that observability integrations can use to link resumed run attempts, and :attributes for JSON-encodable search metadata stored on the run row.

today()

(macro)

The current UTC date, journaled and replayed deterministically.

unwrap(other)

(since 0.3.0)
@spec unwrap(term()) :: term()

Recover an activity's raw return value from a compensation handle.

When an activity/2 call uses compensate:, a success is returned as {:ok, %Continuum.ActivityRef{}} rather than a bare term. unwrap/1 peels the ref back to the activity's raw return:

  • unwrap(%Continuum.ActivityRef{raw_result: raw})raw
  • unwrap({:ok, %Continuum.ActivityRef{} = ref})ref.raw_result
  • unwrap(other)other (activities without compensate: are unchanged)

uuid4()

(macro)

A v4 UUID, journaled and replayed deterministically.