Chimeway.Workflows (chimeway v1.0.0)

Copy Markdown View Source

Persistence helpers for durable workflow definitions and ordered step rows.

Summary

Functions

Appends one workflow_transition row using the supplied repo (so callers can participate in the progression transaction). Required keys: workflow_run_id, to_state, and reason. Optional keys: workflow_step_id, delivery_id, from_state, context, inserted_at.

Returns the workflow run state for the given tenant_id and execution_id (the workflow run's primary key). The returned map includes the authoritative State Spine fields plus the current step key for operator-friendly inspection.

Looks up a workflow_step by step_key inside the same workflow definition. Returns nil if the step does not exist — callers treat this as a noop rather than crashing the progression transaction.

Returns the active workflow_step row for a workflow_run, raising if none.

Returns the canonical workflow_run row by id, raising if not found. Used by the progression service after locking the row inside its transaction.

Returns the structural WorkflowTransition records for a given execution, strictly scoped to the supplied tenant_id.

Locks a workflow run for update inside the given repo and returns it. Returns {:error, :workflow_run_not_found} if the row no longer exists. Must be invoked inside a transaction.

Routes an incoming signal to all waiting workflow runs for the same tenant whose pending_signals list contains the signal's event_name.

Updates a workflow run row with the supplied fields. Used by the progression service to record waiting state and reason/context, advance the current step cursor, or reactivate a previously waiting run.

Functions

active_step_linkage(notification_id)

@spec active_step_linkage(Ecto.UUID.t() | map()) ::
  {:ok,
   %{
     workflow_run_id: Ecto.UUID.t(),
     workflow_step_id: Ecto.UUID.t(),
     channel: String.t()
   }
   | nil}
  | {:error, term()}

append_transition(repo, attrs)

@spec append_transition(Ecto.Repo.t(), map()) ::
  {:ok, Chimeway.Workflows.WorkflowTransition.t()}
  | {:error, Ecto.Changeset.t()}

Appends one workflow_transition row using the supplied repo (so callers can participate in the progression transaction). Required keys: workflow_run_id, to_state, and reason. Optional keys: workflow_step_id, delivery_id, from_state, context, inserted_at.

create_initial_run(repo, notification_id, definition, timestamp, tenant_id)

ensure_definition(repo, notification_key, workflow)

@spec ensure_definition(
  Ecto.Repo.t(),
  String.t(),
  Chimeway.Notifier.workflow_resolution()
) ::
  {:ok, Chimeway.Workflows.WorkflowDefinition.t()} | {:error, term()}

explain(tenant_id, execution_id)

@spec explain(String.t(), Ecto.UUID.t()) ::
  {:ok,
   %{
     id: Ecto.UUID.t(),
     tenant_id: String.t(),
     state: atom(),
     status_reason: String.t() | nil,
     current_step_name: String.t() | nil,
     suspended_until: DateTime.t() | nil,
     pending_signals: [String.t()],
     terminal_reason: String.t() | nil
   }}
  | {:error, :not_found}

Returns the workflow run state for the given tenant_id and execution_id (the workflow run's primary key). The returned map includes the authoritative State Spine fields plus the current step key for operator-friendly inspection.

Returns {:error, :not_found} if the run does not exist or belongs to a different tenant — preventing cross-tenant information disclosure (T-27-05).

fetch_definition(workflow_key, workflow_version)

@spec fetch_definition(String.t(), pos_integer()) ::
  {:ok, Chimeway.Workflows.WorkflowDefinition.t() | nil} | {:error, term()}

fetch_step_by_key(workflow_definition_id, step_key)

@spec fetch_step_by_key(Ecto.UUID.t(), String.t()) ::
  Chimeway.Workflows.WorkflowStep.t() | nil

Looks up a workflow_step by step_key inside the same workflow definition. Returns nil if the step does not exist — callers treat this as a noop rather than crashing the progression transaction.

get_current_step!(workflow_run)

Returns the active workflow_step row for a workflow_run, raising if none.

get_run!(workflow_run_id)

Returns the canonical workflow_run row by id, raising if not found. Used by the progression service after locking the row inside its transaction.

list_traces(tenant_id, execution_id, opts \\ [])

@spec list_traces(String.t(), Ecto.UUID.t(), keyword()) ::
  {:ok, [Chimeway.Workflows.WorkflowTransition.t()]} | {:error, :not_found}

Returns the structural WorkflowTransition records for a given execution, strictly scoped to the supplied tenant_id.

Trace context intentionally contains only structural progression metadata (e.g., event_name, step_key). Raw signal payloads are never written to transition context, making this surface payload-safe by construction (T-27-04).

Returns {:error, :not_found} if the workflow run does not exist or belongs to a different tenant.

Opts:

  • :limit — max number of traces to return (default: all)

lock_run(repo, workflow_run_id)

@spec lock_run(Ecto.Repo.t(), Ecto.UUID.t()) ::
  {:ok, Chimeway.Workflows.WorkflowRun.t()} | {:error, :workflow_run_not_found}

Locks a workflow run for update inside the given repo and returns it. Returns {:error, :workflow_run_not_found} if the row no longer exists. Must be invoked inside a transaction.

persisted_workflow(workflow_definition_id)

@spec persisted_workflow(Ecto.UUID.t() | map()) ::
  {:ok, Chimeway.Notifier.workflow_resolution() | nil} | {:error, term()}

route_signal(signal)

@spec route_signal(Chimeway.Signals.Signal.t()) :: {:ok, map()} | {:error, term()}

Routes an incoming signal to all waiting workflow runs for the same tenant whose pending_signals list contains the signal's event_name.

For each matched run the function:

  1. Transitions the run from :waiting to :active and clears pending_signals.
  2. Appends an immutable WorkflowTransition recording the event_name (but not the raw payload — payload safety is enforced here per the threat model requirement T-27-03).

All mutations per run are wrapped in one Ecto.Multi transaction so the state update and the trace record are always atomically consistent.

Cross-tenant isolation is enforced structurally: the query always filters by tenant_id = ^signal.tenant_id, making it structurally impossible for a signal from one tenant to resume a run belonging to another.

Returns {:ok, results_map} where results_map contains per-run outcomes keyed by {:run_updated, run.id} and {:transition_inserted, run.id}.

update_run(repo, run, attrs)

Updates a workflow run row with the supplied fields. Used by the progression service to record waiting state and reason/context, advance the current step cursor, or reactivate a previously waiting run.

upsert_definition(notification_key, workflow)

@spec upsert_definition(String.t(), Chimeway.Notifier.workflow_resolution()) ::
  {:ok, Chimeway.Workflows.WorkflowDefinition.t()} | {:error, term()}