Baton.Nodes (Baton v0.1.0)

Copy Markdown View Source

All database access for workflow_nodes.

Centralizing node queries here keeps the engine modules (Check, Results, Reschedule) free of Ecto and gives the library a single, testable data layer. Every function resolves its repo at runtime via Baton.Config.repo/0.

Results are stored wrapped as %{"value" => term} so that non-map results (lists, scalars, strings) round-trip through the jsonb column. Callers always receive the unwrapped value.

Summary

Functions

Step names in a workflow whose Oban job is currently completed.

Dependency states for the named steps in a workflow.

Load the node for a given Oban job id, or nil.

Bulk-insert node rows within an existing transaction.

The result this job stored on a previous attempt, for the idempotency guard.

The stored result of a specific step in a workflow.

Results for all of the given steps that have stored one, keyed by step name. Steps with no result yet are omitted.

The current Oban state of every step in a workflow.

Store (wrap) a step result on its node. Returns :ok or {:error, reason}.

Nodes that list completed_step as a dependency and whose job is currently scheduled (i.e. snoozing). Returns %{id, deps} for the completion-triggered reschedule to evaluate. Uses a native array membership test on deps.

Functions

completed_step_names(workflow_id)

@spec completed_step_names(String.t()) :: [String.t()]

Step names in a workflow whose Oban job is currently completed.

dep_states(workflow_id, dep_names)

@spec dep_states(String.t(), [String.t()]) :: [map()]

Dependency states for the named steps in a workflow.

LEFT JOINs to oban_jobs, so a dep whose job has been pruned comes back with state: nil (detected as pruned by the caller) rather than vanishing.

Returns a list of %{name, state, attempted_at}.

for_job(oban_job_id)

@spec for_job(integer()) :: Baton.Node.t() | nil

Load the node for a given Oban job id, or nil.

insert_all(repo, rows)

@spec insert_all(module(), [map()]) :: {non_neg_integer(), nil}

Bulk-insert node rows within an existing transaction.

Takes the repo explicitly because this runs inside the same transaction as the Oban job insert (see Baton.insert/2), where the repo is already resolved.

own_result(oban_job_id)

@spec own_result(integer()) :: {:ok, term()} | {:error, :no_result}

The result this job stored on a previous attempt, for the idempotency guard.

Returns {:ok, value} or {:error, :no_result}. A missing node or a null result both yield :no_result — both correctly mean "nothing cached".

result_for(workflow_id, step_name)

@spec result_for(String.t(), String.t()) ::
  {:ok, term()} | {:error, :not_found | :no_result}

The stored result of a specific step in a workflow.

Distinguishes a missing step (:not_found) from a step that exists but hasn't stored a result yet (:no_result).

results_for(workflow_id, step_names)

@spec results_for(String.t(), [String.t()]) :: %{required(String.t()) => term()}

Results for all of the given steps that have stored one, keyed by step name. Steps with no result yet are omitted.

step_states(workflow_id)

@spec step_states(String.t()) :: [%{name: String.t(), state: String.t() | nil}]

The current Oban state of every step in a workflow.

LEFT JOINs to oban_jobs, so a step whose job has been pruned comes back with state: nil. Returns a list of %{name, state}. Used by Baton.Completion to decide whether a workflow has fully settled.

store_result(oban_job_id, result)

@spec store_result(integer(), term()) :: :ok | {:error, term()}

Store (wrap) a step result on its node. Returns :ok or {:error, reason}.

waiting_dependents(workflow_id, completed_step)

@spec waiting_dependents(String.t(), String.t()) :: [
  %{id: integer(), deps: [String.t()]}
]

Nodes that list completed_step as a dependency and whose job is currently scheduled (i.e. snoozing). Returns %{id, deps} for the completion-triggered reschedule to evaluate. Uses a native array membership test on deps.