Continuum.Runtime.Effect (continuum v0.5.0)

Copy Markdown View Source

The bridge between workflow code and the engine.

Every effect (activity, await_signal, timer, side_effect) is dispatched through run/2. The function consults the journal at the current cursor:

  • If the next event matches the effect being requested, the recorded result is returned immediately. This is replay.

  • If the cursor has reached the end of the journal, this is the live tail. We append a *_scheduled event, schedule the work, then throw {:continuum_suspend, reason} to unwind back to the engine.

  • If the next event does not match the effect being requested, we raise Continuum.ReplayDriftError.

run/2 accepts either an effect tuple or a 0-arity producer function for deterministic primitives (Continuum.now/0, Continuum.uuid4/0, etc.).

Summary

Functions

Suspend until the child referenced by ref terminates; return its result.

Run the compensation of one successful compensated activity.

Run all pending compensations in LIFO order (most-recent first).

Complete this run as {:continued, next_run_id} and start a fresh run on the same workflow with new input. Throws the :continuum_continued_as_new sentinel; the engine acknowledges it and stops without re-entering the workflow.

Dispatch an effect.

Start a child workflow asynchronously and return a %Continuum.ChildRef{}.

Token thrown when a workflow needs to suspend pending external work.

Types

effect()

@type effect() ::
  {:activity, {module(), atom(), list()}, keyword()}
  | {:await_signal, atom(), keyword()}
  | {:timer, pos_integer()}
  | {:side_effect, atom()}

Functions

await_child(ref, arg)

(since 0.3.0)

Suspend until the child referenced by ref terminates; return its result.

Returns the child's result on completion, {:error, error} on child failure, and {:error, :child_cancelled} if the child was cancelled.

compensate(ref_or_ok, arg)

(since 0.3.0)

Run the compensation of one successful compensated activity.

Schedules ref.compensate through the activity worker (Postgres) or runs it inline (in-memory), journaling compensation_scheduled/compensation_completed (or compensation_failed), then removes the activity from the pending compensation set. Returns {:ok, result} or {:error, reason}.

compensate_all(command, opts \\ [])

(since 0.3.0)

Run all pending compensations in LIFO order (most-recent first).

Each entry is scheduled as a deterministic compensation effect with a stable per-item command id derived from the call site, the target activity id, and the LIFO index. Returns :ok.

continue_as_new(input, arg)

(since 0.3.0)

Complete this run as {:continued, next_run_id} and start a fresh run on the same workflow with new input. Throws the :continuum_continued_as_new sentinel; the engine acknowledges it and stops without re-entering the workflow.

run(effect, producer)

@spec run(
  effect(),
  (-> term())
  | pos_integer()
  | {:command, term()}
  | {:command, term(), (-> term())}
) :: term()

Dispatch an effect.

Two arity-2 forms:

  • run({:side_effect, kind}, producer) — for deterministic primitives. The producer (a 0-arity function) is invoked exactly once on first execution; its return value is journaled. On replay, the journaled value is returned without invoking the producer.

  • run(effect, {:command, command_base}) — for workflow DSL effects. The command base is expanded from the AST call site and becomes part of the journaled command identity.

  • run(effect, line) — compatibility form for journaled effects (:activity, :await_signal, :timer). Live execution suspends the workflow process via throw {:continuum_suspend, _}.

start_child(workflow, input, opts, arg)

(since 0.3.0)

Start a child workflow asynchronously and return a %Continuum.ChildRef{}.

The child run id is derived deterministically from the parent run id, the start command id, and any id: option, so a parent at the same cursor never starts two children on replay. The child inherits the parent's trace context.

suspend_token()

Token thrown when a workflow needs to suspend pending external work.