SquidMesh.StepRunStore (squid_mesh v0.1.0-alpha.2)

Copy Markdown View Source

Durable store for per-step workflow execution state.

Step runs are used to detect stale or duplicate deliveries and to persist step input, output, and failure details separately from the parent run.

Summary

Functions

Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.

Marks a step run as completed and persists its output.

Lists the completed step outputs for one workflow run in completion order.

Lists the completed step identifiers for one workflow run.

Marks a step run as failed and persists the last error.

Fetches the persisted step run for one workflow run and step identifier.

Persists that a step has been scheduled but not yet claimed by a worker.

Lists the persisted step status for each declared step in a workflow run.

Types

begin_result()

@type begin_result() ::
  {:ok, SquidMesh.Persistence.StepRun.t(), :execute | :skip}
  | {:error, Ecto.Changeset.t()}

schedule_result()

@type schedule_result() ::
  {:ok, SquidMesh.Persistence.StepRun.t(), :schedule | :skip}
  | {:error, Ecto.Changeset.t()}

step_error()

@type step_error() :: map()

step_identifier()

@type step_identifier() :: atom() | String.t()

step_input()

@type step_input() :: map()

step_output()

@type step_output() :: map()

step_status()

@type step_status() :: :pending | :running | :completed | :failed

Functions

begin_step(repo, run_id, step, input)

@spec begin_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) ::
  begin_result()

Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.

complete_step(repo, step_run_id, output)

@spec complete_step(module(), Ecto.UUID.t(), step_output()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, Ecto.Changeset.t() | :not_found}

Marks a step run as completed and persists its output.

completed_outputs(repo, run_id)

@spec completed_outputs(module(), Ecto.UUID.t()) :: [step_output()]

Lists the completed step outputs for one workflow run in completion order.

completed_steps(repo, run_id)

@spec completed_steps(module(), Ecto.UUID.t()) :: [String.t()]

Lists the completed step identifiers for one workflow run.

fail_step(repo, step_run_id, error)

@spec fail_step(module(), Ecto.UUID.t(), step_error()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, Ecto.Changeset.t() | :not_found}

Marks a step run as failed and persists the last error.

get_step_run(repo, run_id, step)

@spec get_step_run(module(), Ecto.UUID.t(), step_identifier()) ::
  SquidMesh.Persistence.StepRun.t() | nil

Fetches the persisted step run for one workflow run and step identifier.

schedule_step(repo, run_id, step, input)

@spec schedule_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) ::
  schedule_result()

Persists that a step has been scheduled but not yet claimed by a worker.

step_statuses(repo, run_id)

@spec step_statuses(module(), Ecto.UUID.t()) :: %{
  optional(String.t()) => step_status()
}

Lists the persisted step status for each declared step in a workflow run.