Hourglass.Workflow.State (hourglass v0.1.0)

Copy Markdown View Source

Per-run state threaded through Hourglass.Workflow.Evaluator.evaluate/3.

Across activations:

  • next_global_seq — monotonic SDK seq counter; persists across activations (Temporal requires monotonic seqs per workflow lifetime).
  • pending_resolvers%{global_seq => command_id} registered when a command was scheduled. The next activation's resolve_activity{seq} uses this to map back to the command_id.
  • command_id_to_seq — inverse of pending_resolvers. Lets re-execution look up "have I seen this command_id before? what seq did it get?"
  • resolved_results%{global_seq => decoded_value} populated from prior resolve_activity jobs delivered on this or earlier activations.
  • input — the decoded initialize_workflow first argument; set once.
  • workflow_module — the workflow module bound to this run (e.g. MyApp.Workflows.IngestSource). Set by Hourglass.Workflow.Evaluator.evaluate/3 on the first activation and persisted across cache round-trips so subsequent activations (e.g. resolve_activity deliveries that don't carry initialize_workflow) can be routed to the same module by the worker's workflow type resolver.

Per-activation (rebuilt each call):

  • commands — newest-first list of {command_id, command_term} issued during this activation's re-execution. Sorted + assigned seqs at the end of evaluate/3.
  • resultnil | {:completed, term} | {:failed, term} | :evict determines the completion shape.

  • child_count — counter that replaces the expected_children MapSet. The pure-function evaluator runs async/1 inline, so it has no child pids to track; the counter only exists for diagnostics.

Summary

Types

A workflow command_id: a tuple {child_index_path, local_seq} deterministic per workflow re-run. See Hourglass.Workflow.CommandAccumulator.

t()

Functions

Type alias used by callers — the proto module the evaluator emits.

Build the initial state for a fresh workflow (no prior activations).

After a flush, register the newly-assigned {seq => command_id} resolvers and bump next_global_seq. Also extends command_id_to_seq (the inverse index used by re-execution to recognise already-scheduled commands).

Types

command_entry()

@type command_entry() :: {command_id(), command_term()}

command_id()

@type command_id() :: {[non_neg_integer()], non_neg_integer()}

A workflow command_id: a tuple {child_index_path, local_seq} deterministic per workflow re-run. See Hourglass.Workflow.CommandAccumulator.

command_term()

@type command_term() ::
  {:execute_activity, %{module: module(), args: term(), options: keyword()}}
  | {:start_timer, %{duration_ms: non_neg_integer()}}
  | {:uuid, map()}
  | {:random, %{max: pos_integer()}}

result()

@type result() ::
  nil | {:completed, term()} | {:failed, term()} | :evict | :continue_as_new

t()

@type t() :: %Hourglass.Workflow.State{
  cancel_requested: boolean(),
  child_count: non_neg_integer(),
  command_id_to_seq: %{optional(command_id()) => pos_integer()},
  commands: [command_entry()],
  input: term(),
  next_global_seq: pos_integer(),
  pending_resolvers: %{optional(pos_integer()) => command_id()},
  resolved_results: %{optional(pos_integer()) => term()},
  result: result(),
  run_id: String.t() | nil,
  signals: %{optional(String.t()) => [term()]},
  task_queue: String.t(),
  workflow_module: module() | nil
}

Functions

command_proto_module()

@spec command_proto_module() :: module()

Type alias used by callers — the proto module the evaluator emits.

new(run_id, task_queue)

@spec new(String.t(), String.t()) :: t()

Build the initial state for a fresh workflow (no prior activations).

register_assignments(state, assignments)

@spec register_assignments(t(), [{pos_integer(), command_id(), command_term()}]) ::
  t()

After a flush, register the newly-assigned {seq => command_id} resolvers and bump next_global_seq. Also extends command_id_to_seq (the inverse index used by re-execution to recognise already-scheduled commands).

assignments is [{seq, command_id, _command_term}] in seq order.