ALF-backed engine for stepwise workflows.
Stepwise workflows are intended for wizards and import pipelines:
- linear(ish) progression
- resumable via checkpoints
- lighter semantics than full FSM (events are typically
:next/:back)
Usage
spec = %{
profile: :stepwise,
initial_state: :step_one,
steps: [:step_one, :step_two, :step_three],
states: %{
step_one: %{step_number: 1, ui: %{key: :step_one}},
step_two: %{step_number: 2, ui: %{key: :step_two}},
step_three: %{step_number: 3, ui: %{key: :step_three}}
}
}
{:ok, runtime} = Engine.init(spec, %{tenant_id: "t1", execution_id: "e1", sync: true})
{:ok, runtime} = Engine.handle_event(runtime, :next, %{name: "Alice"})
projection = Engine.get_state(runtime)
Summary
Functions
Extracts a serializable checkpoint from the runtime for persistence.
Returns the current Mobus.Stepwise.Projection for the runtime.
Processes an event against the current runtime, advancing the workflow.
Initializes a new stepwise workflow runtime from a spec and runtime context.
Restores a runtime from a previously saved checkpoint.
Types
@type runtime() :: %{ :execution_id => String.t(), :tenant_id => String.t(), :spec => map(), :current_state => atom() | String.t(), :pipeline_mod => module(), optional(:context) => map(), optional(:history) => list(), optional(:trace) => list(), optional(:blocked_reasons) => map(), optional(:breakpoint_hits) => list(), optional(:meta) => map(), optional(:errors) => [map()], optional(:projection) => Mobus.Stepwise.Projection.t() }
Functions
Extracts a serializable checkpoint from the runtime for persistence.
Strips the projection (non-serializable) and retains only the core runtime
fields needed to resume the workflow later via restore/3.
Parameters
runtime— current runtime map
Returns
- A plain map suitable for JSON serialization or database storage.
Examples
checkpoint = Engine.checkpoint(runtime)
# => %{execution_id: "e1", current_state: :step_two, context: %{name: "Alice"}, ...}
@spec get_state(runtime()) :: Mobus.Stepwise.Projection.t() | map()
Returns the current Mobus.Stepwise.Projection for the runtime.
If the runtime already contains a computed projection, returns it directly. Otherwise, recomputes the projection by running the pipeline in projection-only mode (no state transition).
Parameters
runtime— current runtime map
Returns
%Mobus.Stepwise.Projection{}— the canonical UI projection struct
Examples
projection = Engine.get_state(runtime)
projection.current_state #=> :step_two
projection.available_events #=> [:back, :next]
@spec handle_event(runtime(), atom() | String.t(), map()) :: {:ok, runtime()} | {:wait, runtime(), map()} | {:error, term(), runtime()}
Processes an event against the current runtime, advancing the workflow.
Sends the event through the ALF pipeline which, in order:
- Merges the payload into
runtime.context - Executes any step action (capability) defined for the current state
- Advances or reverses
current_statebased on the event - Fires entry actions if the state changed
- Records breakpoint hits
- Computes the updated projection
Parameters
runtime— current runtime map (as returned byinit/2or a priorhandle_event/3)event— event atom or string (:next,:back,"next","back", or custom)payload— map of user input to merge into context
Returns
{:ok, runtime}— successful state transition with updated projection{:wait, runtime, wait_cfg}— transition requires async resolution{:error, reason, runtime}— action or pipeline failure
Examples
{:ok, runtime} = Engine.handle_event(runtime, :next, %{name: "Alice"})
{:ok, runtime} = Engine.handle_event(runtime, :back, %{})
@spec init(map(), map()) :: {:ok, runtime()} | {:wait, runtime(), map()} | {:error, {:initial_entry_action_failed, term(), runtime()} | term()}
Initializes a new stepwise workflow runtime from a spec and runtime context.
Normalizes the spec into internal representation (IR), starts the ALF pipeline (if not already running), builds the initial runtime map, fires entry actions for the initial state, and computes the first projection.
If the initial state defines a capability action whose trigger matches :enter,
that action runs during init/2. When that action returns {:error, reason},
the error is propagated to the caller as
{:error, {:initial_entry_action_failed, reason, runtime}} with blocked_reasons
populated on the returned runtime. Previously such errors were silently discarded,
causing workflows whose first step errored to advance as if the step had succeeded.
Parameters
spec— workflow specification map containing:profile,:initial_state,:steps,:states, and optionally:transitions,:breakpoints,:subscriptionsruntime_context— context map requiring:tenant_idand optionally:execution_id,:sync, and:initial_context
Returns
{:ok, runtime}— runtime map with computed projection{:error, reason}— if tenant_id is missing or pipeline fails to start{:error, {:initial_entry_action_failed, reason, runtime}}— if the initial state's entry capability returned{:error, reason}
Examples
spec = %{profile: :stepwise, initial_state: :step_one, steps: [:step_one, :step_two],
states: %{step_one: %{step_number: 1, ui: %{key: :step_one}},
step_two: %{step_number: 2, ui: %{key: :step_two}}}}
{:ok, runtime} = Engine.init(spec, %{tenant_id: "t1", execution_id: "e1", sync: true})
Restores a runtime from a previously saved checkpoint.
Re-normalizes the spec into IR, reconstitutes the runtime from checkpoint
data, starts the pipeline if needed, and recomputes the projection. The
restored runtime is fully functional for subsequent handle_event/3 calls.
Parameters
spec— the original workflow specification mapcheckpoint— map previously returned bycheckpoint/1runtime_context— context map requiring:tenant_idand optionally:sync
Returns
{:ok, runtime}— restored runtime with recomputed projection{:error, reason}— if tenant_id is missing or pipeline fails to start
Examples
checkpoint = Engine.checkpoint(runtime)
{:ok, restored} = Engine.restore(spec, checkpoint, %{tenant_id: "t1", sync: true})