Build and insert DAG-based Oban job workflows.
Workflows link Oban jobs together with named dependencies, guaranteeing execution order regardless of scheduling or retries. Jobs are inserted atomically — all at once, in a single transaction alongside their workflow metadata — and each job gates itself at runtime by checking whether its declared dependencies have completed.
Basic Usage
alias Baton
Baton.new()
|> Baton.add(:a, FetchToken.new(%{api_key: "xyz"}))
|> Baton.add(:b, ProcessA.new(%{url: "a.com"}), deps: [:a])
|> Baton.add(:c, ProcessB.new(%{url: "b.com"}), deps: [:a])
|> Baton.add(:d, Finalize.new(%{}), deps: [:b, :c])
|> Baton.insert!()This produces a diamond-shaped DAG: B and C run in parallel after A,
and D waits for both.
Where state lives
Each step has two rows:
- an
oban_jobsrow — owned by Oban; itsmetacarries only immutable identifiers (workflow_id,workflow_name,workflow_label) written once and never mutated, so the engine never races Oban's ownmetawrites - a
workflow_nodesrow — owned by this engine; holds dependencies, the ignore-flags, and the step's result
Both are inserted in one transaction, so a workflow is all-or-nothing.
Repo / Oban instance
The repo is resolved from Oban's configuration — you don't pass it. If you
run a non-default Oban instance, pass oban: MyApp.Oban to insert/2 (or set
config :baton, :oban_name, MyApp.Oban). For backwards compatibility,
insert/2 still accepts a repo module as its second argument and ignores it.
Options for new/1
:workflow_id— override the auto-generated UUIDv4 ID:workflow_name— human-readable label for the whole workflow:ignore_cancelled— apply globally to all steps (default:false):ignore_discarded— apply globally to all steps (default:false):debug— enable context-window capture for all steps (default:false)
Options for add/4
:deps— list of step names this job depends on (atoms or strings):ignore_cancelled— override the workflow-level default for this step:ignore_discarded— override the workflow-level default for this step
Summary
Functions
Add a named step to the workflow.
Validate the DAG, then insert all jobs and their workflow nodes in one transaction.
Like insert/2 but returns the jobs directly or raises on failure.
Initialize a new workflow.
Step names in the order they were added.
Validate the workflow DAG without inserting.
Types
@type entry() :: %{ name: String.t(), changeset: Ecto.Changeset.t(), deps: [String.t()], ignore_cancelled: boolean(), ignore_discarded: boolean() }
Functions
@spec add(t(), step_name(), Ecto.Changeset.t(), keyword()) :: t()
Add a named step to the workflow.
Raises ArgumentError immediately on a duplicate step name. Full cycle
detection runs at insert/2 time.
Validate the DAG, then insert all jobs and their workflow nodes in one transaction.
Returns {:ok, jobs} or {:error, {message, reason}}.
Options
:oban— Oban instance name (default:Baton.Config.oban_name/0)
For backwards compatibility, passing a repo module as the second argument is accepted and ignored — the repo now comes from Oban.
@spec insert!(t(), keyword() | module()) :: [Oban.Job.t()]
Like insert/2 but returns the jobs directly or raises on failure.
Initialize a new workflow.
Step names in the order they were added.
Validate the workflow DAG without inserting.
Returns {:ok, topological_order} or {:error, {message, reason}}.