Durable store for per-step workflow execution state.
Step runs are used to detect stale or duplicate deliveries and to persist step input, output, and failure details separately from the parent run.
Summary
Functions
Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.
Marks a step run as completed and persists its output.
Lists the completed step outputs for one workflow run in completion order.
Lists the completed step identifiers for one workflow run.
Marks a step run as failed and persists the last error.
Fetches the persisted step run for one workflow run and step identifier.
Persists that a step has been scheduled but not yet claimed by a worker.
Lists the persisted step status for each declared step in a workflow run.
Types
@type begin_result() :: {:ok, SquidMesh.Persistence.StepRun.t(), :execute | :skip} | {:error, Ecto.Changeset.t()}
@type schedule_result() :: {:ok, SquidMesh.Persistence.StepRun.t(), :schedule | :skip} | {:error, Ecto.Changeset.t()}
@type step_error() :: map()
@type step_input() :: map()
@type step_output() :: map()
@type step_status() :: :pending | :running | :completed | :failed
Functions
@spec begin_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) :: begin_result()
Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.
@spec complete_step(module(), Ecto.UUID.t(), step_output()) :: {:ok, SquidMesh.Persistence.StepRun.t()} | {:error, Ecto.Changeset.t() | :not_found}
Marks a step run as completed and persists its output.
@spec completed_outputs(module(), Ecto.UUID.t()) :: [step_output()]
Lists the completed step outputs for one workflow run in completion order.
@spec completed_steps(module(), Ecto.UUID.t()) :: [String.t()]
Lists the completed step identifiers for one workflow run.
@spec fail_step(module(), Ecto.UUID.t(), step_error()) :: {:ok, SquidMesh.Persistence.StepRun.t()} | {:error, Ecto.Changeset.t() | :not_found}
Marks a step run as failed and persists the last error.
@spec get_step_run(module(), Ecto.UUID.t(), step_identifier()) :: SquidMesh.Persistence.StepRun.t() | nil
Fetches the persisted step run for one workflow run and step identifier.
@spec schedule_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) :: schedule_result()
Persists that a step has been scheduled but not yet claimed by a worker.
@spec step_statuses(module(), Ecto.UUID.t()) :: %{ optional(String.t()) => step_status() }
Lists the persisted step status for each declared step in a workflow run.