Hourglass.Workflow (hourglass v0.1.0)

Copy Markdown View Source

Workflow-side API. use Hourglass.Workflow declares the module is a workflow.

Execution model

Workflows execute via the pure-function Hourglass.Workflow.Evaluator. Each activation runs the workflow body inline on an ephemeral evaluator Task (spawned by Hourglass.WorkflowEvaluator.DynamicSupervisor) with state threaded through arguments. The Workflow API primitives consult a process-dict-keyed accumulator (Hourglass.Workflow.CommandAccumulator) and either return a resolved value or throw a sentinel that the evaluator catches.

No GenServer. No :persistent_term. No send + receive suspension.

Determinism comes from re-execution + sorted command_id ordering.

Command identification

Each command issued from workflow code is tagged with a command_id: a tuple {child_index_path, local_seq} where:

  • child_index_path is the lexicographic path identifying the issuing scope. The main workflow body has path []. Direct async children get [0], [1], ... in the order async/1 is called. Nested asyncs extend the path (e.g. [1, 0] is the first child of the second top-level child).
  • local_seq is a monotonic counter local to the issuing scope, incremented on each command.

The evaluator accumulates commands by command_id, sorts by it at flush time (lexicographic [] < [0] < [0,0] < [1] < ...), and assigns monotonic global seqs in that deterministic order before sending to the SDK. This removes BEAM-scheduling non-determinism from the externally observable command sequence.

Summary

Functions

Spawn an asynchronous workflow scope.

Await the result of an async/1 scope. Returns the value the function returned, or raises / rethrows if the scope failed.

Join a list of async/1 scopes, returning their values in order.

Block (suspend) the workflow until a signal named name has arrived, then return its decoded payload.

Returns true if a cancel_workflow activation job has been delivered to this workflow run, false otherwise.

Terminal: stops the workflow body and emits a ContinueAsNewWorkflowExecution command carrying the serialised input. The Temporal server starts a new run of the same workflow type with fresh history, passing input as the initial argument.

Per-activation workflow context.

Functions

async(fun)

@spec async((-> any())) :: {:hourglass_temporal_async, term()}

Spawn an asynchronous workflow scope.

async/1 runs fun.() inline within a child-path scope, swallowing any suspend sentinel so sibling asyncs can also contribute commands. The returned tag is opaque — pass it back to await/1.

The externally observable command_id sequence is governed by child_index_path ++ [child_index].

await(arg)

@spec await({:hourglass_temporal_async, term()}) :: term()

Await the result of an async/1 scope. Returns the value the function returned, or raises / rethrows if the scope failed.

Awaiting a scope whose body suspended (because it issued a command without a resolved result) re-throws the suspend sentinel — so the parent stops cleanly and the activation ships its accumulated commands.

await_all(scopes)

@spec await_all([{:hourglass_temporal_async, term()}]) :: [term()]

Join a list of async/1 scopes, returning their values in order.

Equivalent to mapping await/1 over the list. If any scope has not yet resolved (i.e. it suspended waiting for a command result), await/1 re-throws the suspend sentinel and the activation ships its accumulated commands — correct fan-out behaviour.

await_signal(name, opts \\ [])

@spec await_signal(
  String.t() | atom(),
  keyword()
) :: term() | {:ok, term()} | :timeout

Block (suspend) the workflow until a signal named name has arrived, then return its decoded payload.

Signals are inbound signal_workflow activation jobs. The Nth call to await_signal(name) in a re-execution consumes signals[name][N] (0-based). The call counter resets at the start of every activation (via CommandAccumulator.init/0) so each re-execution of the workflow body deterministically re-consumes the same buffered signals.

When called with timeout: duration, races the signal against a durable timer. Returns {:ok, payload} if the signal arrives first, or :timeout if the timer fires first. The timer command_id is allocated unconditionally every activation to preserve deterministic seq assignment.

Known limitation: if a completion is not acknowledged by the server and the identical activation is redelivered after the signal has already been persisted to state.signals, the signal would be appended a second time (signals lack a seq-based idempotency key). Normal incremental and full-replay paths are correct.

cancelled?()

@spec cancelled?() :: boolean()

Returns true if a cancel_workflow activation job has been delivered to this workflow run, false otherwise.

Non-blocking cooperative cancellation: the workflow checks cancelled?/0 at safe points and decides what to do (e.g. skip remaining work and complete gracefully). The flag is set during job ingestion and persists across activations once set — once a cancel arrives it stays true.

continue_as_new(input)

@spec continue_as_new(term()) :: no_return()

Terminal: stops the workflow body and emits a ContinueAsNewWorkflowExecution command carrying the serialised input. The Temporal server starts a new run of the same workflow type with fresh history, passing input as the initial argument.

Raises if called outside a workflow evaluator.

execute_activity(module, input, opts \\ [])

@spec execute_activity(module(), term(), keyword()) ::
  {:ok, term()} | {:error, term()}

execute_activity!(module, input, opts \\ [])

@spec execute_activity!(module(), term(), keyword()) :: term()

info()

@spec info() :: Hourglass.Workflow.Info.t()

Per-activation workflow context.

Returns a %Info{} carrying the workflow's run_id + task_queue. Reads from the evaluator state stamped on the process dict by Hourglass.Workflow.Evaluator.run_body/2. Raises if no evaluator is active on the calling process — info/0 is only meaningful inside a workflow body during evaluation.

random(max)

@spec random(pos_integer()) :: non_neg_integer()

sleep(duration)

@spec sleep(non_neg_integer() | {atom(), non_neg_integer()}) :: :ok

uuid()

@spec uuid() :: String.t()