SquidMesh.StepRunStore (squid_mesh v0.1.0-alpha.7)

Copy Markdown View Source

Durable store for per-step workflow execution state.

Step runs are used to detect stale or duplicate deliveries and to persist step input, output, and failure details separately from the parent run.

Summary

Functions

Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.

Marks a paused manual step as completed, persists its output, and records the durable manual action metadata.

Marks a step run as completed and persists its output.

Lists the completed step outputs for one workflow run in completion order.

Lists completed step runs for one workflow run in compensation order.

Lists the completed step identifiers for one workflow run.

Marks a step run as failed and persists the last error.

Fetches the persisted step run for one workflow run and step identifier.

Persists approval resume metadata for a running approval step without completing it.

Persists pause-resume metadata for a running pause step without completing it.

Persists that a step has been scheduled but not yet claimed by a worker.

Lists the persisted step status for each declared step in a workflow run.

Updates the persisted recovery metadata for one step run.

Types

approval_targets()

@type approval_targets() :: %{ok: pause_target(), error: pause_target()}

begin_result()

@type begin_result() :: {:ok, SquidMesh.Persistence.StepRun.t(), :execute | :skip}

failure_recovery()

@type failure_recovery() :: %{
  strategy: :compensation | :undo,
  target: step_identifier()
}

manual_event()

@type manual_event() :: map()

pause_target()

@type pause_target() :: :complete | atom()

recovery_attrs()

@type recovery_attrs() :: map()

recovery_policy()

@type recovery_policy() :: map() | nil

schedule_result()

@type schedule_result() ::
  {:ok, SquidMesh.Persistence.StepRun.t(), :schedule | :skip}
  | {:error, Ecto.Changeset.t()}

stale_error()

@type stale_error() :: {:stale_step_run, String.t()}

step_error()

@type step_error() :: map()

step_identifier()

@type step_identifier() :: atom() | String.t()

step_input()

@type step_input() :: map()

step_output()

@type step_output() :: map()

step_schedule_input()

@type step_schedule_input() ::
  {step_identifier(), step_input()}
  | {step_identifier(), step_input(), recovery_policy()}

step_status()

@type step_status() :: :pending | :running | :completed | :failed

Functions

begin_step(repo, run_id, step, input)

@spec begin_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) ::
  begin_result()

Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.

complete_manual_step(repo, step_run_id, output, manual)

@spec complete_manual_step(module(), Ecto.UUID.t(), step_output(), manual_event()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Marks a paused manual step as completed, persists its output, and records the durable manual action metadata.

complete_step(repo, step_run_id, output)

@spec complete_step(module(), Ecto.UUID.t(), step_output()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Marks a step run as completed and persists its output.

completed_outputs(repo, run_id)

@spec completed_outputs(module(), Ecto.UUID.t()) :: [step_output()]

Lists the completed step outputs for one workflow run in completion order.

completed_step_runs_for_compensation(repo, run_id)

@spec completed_step_runs_for_compensation(module(), Ecto.UUID.t()) :: [
  SquidMesh.Persistence.StepRun.t()
]

Lists completed step runs for one workflow run in compensation order.

Saga rollback must undo the most recently completed reversible effect first. The ordering uses the completed forward attempt timestamp so compensation metadata updates do not change rollback order on redelivery.

completed_steps(repo, run_id)

@spec completed_steps(module(), Ecto.UUID.t()) :: [String.t()]

Lists the completed step identifiers for one workflow run.

fail_step(repo, step_run_id, error)

@spec fail_step(module(), Ecto.UUID.t(), step_error()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Marks a step run as failed and persists the last error.

get_step_run(repo, run_id, step)

@spec get_step_run(module(), Ecto.UUID.t(), step_identifier()) ::
  SquidMesh.Persistence.StepRun.t() | nil

Fetches the persisted step run for one workflow run and step identifier.

persist_approval_resume(repo, step_run_id, map, output_key)

@spec persist_approval_resume(
  module(),
  Ecto.UUID.t(),
  approval_targets(),
  atom() | nil
) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Persists approval resume metadata for a running approval step without completing it.

persist_pause_resume(repo, step_run_id, output, target)

@spec persist_pause_resume(module(), Ecto.UUID.t(), step_output(), pause_target()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Persists pause-resume metadata for a running pause step without completing it.

schedule_step(repo, run_id, step, input)

@spec schedule_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) ::
  schedule_result()

Persists that a step has been scheduled but not yet claimed by a worker.

step_statuses(repo, run_id)

@spec step_statuses(module(), Ecto.UUID.t()) :: %{
  optional(String.t()) => step_status()
}

Lists the persisted step status for each declared step in a workflow run.

update_recovery(repo, step_run_id, recovery)

@spec update_recovery(module(), Ecto.UUID.t(), recovery_attrs()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()} | {:error, :not_found}

Updates the persisted recovery metadata for one step run.

The compensation runtime uses this to mark callbacks as running, completed, or failed without changing the original forward step status.