Baton (Baton v0.1.0)

Copy Markdown View Source

Build and insert DAG-based Oban job workflows.

Workflows link Oban jobs together with named dependencies, guaranteeing execution order regardless of scheduling or retries. Jobs are inserted atomically — all at once, in a single transaction alongside their workflow metadata — and each job gates itself at runtime by checking whether its declared dependencies have completed.

Basic Usage

alias Baton

Baton.new()
|> Baton.add(:a, FetchToken.new(%{api_key: "xyz"}))
|> Baton.add(:b, ProcessA.new(%{url: "a.com"}), deps: [:a])
|> Baton.add(:c, ProcessB.new(%{url: "b.com"}), deps: [:a])
|> Baton.add(:d, Finalize.new(%{}), deps: [:b, :c])
|> Baton.insert!()

This produces a diamond-shaped DAG: B and C run in parallel after A, and D waits for both.

Where state lives

Each step has two rows:

  • an oban_jobs row — owned by Oban; its meta carries only immutable identifiers (workflow_id, workflow_name, workflow_label) written once and never mutated, so the engine never races Oban's own meta writes
  • a workflow_nodes row — owned by this engine; holds dependencies, the ignore-flags, and the step's result

Both are inserted in one transaction, so a workflow is all-or-nothing.

Repo / Oban instance

The repo is resolved from Oban's configuration — you don't pass it. If you run a non-default Oban instance, pass oban: MyApp.Oban to insert/2 (or set config :baton, :oban_name, MyApp.Oban). For backwards compatibility, insert/2 still accepts a repo module as its second argument and ignores it.

Options for new/1

  • :workflow_id — override the auto-generated UUIDv4 ID
  • :workflow_name — human-readable label for the whole workflow
  • :ignore_cancelled — apply globally to all steps (default: false)
  • :ignore_discarded — apply globally to all steps (default: false)
  • :debug — enable context-window capture for all steps (default: false)

Options for add/4

  • :deps — list of step names this job depends on (atoms or strings)
  • :ignore_cancelled — override the workflow-level default for this step
  • :ignore_discarded — override the workflow-level default for this step

Summary

Functions

Add a named step to the workflow.

Validate the DAG, then insert all jobs and their workflow nodes in one transaction.

Like insert/2 but returns the jobs directly or raises on failure.

Initialize a new workflow.

Step names in the order they were added.

Validate the workflow DAG without inserting.

Types

entry()

@type entry() :: %{
  name: String.t(),
  changeset: Ecto.Changeset.t(),
  deps: [String.t()],
  ignore_cancelled: boolean(),
  ignore_discarded: boolean()
}

step_name()

@type step_name() :: atom() | String.t()

t()

@type t() :: %Baton{
  debug: boolean(),
  global_ignore_cancelled: boolean(),
  global_ignore_discarded: boolean(),
  jobs: [entry()],
  model_map: %{optional(atom() | String.t()) => String.t()},
  workflow_id: String.t(),
  workflow_name: String.t() | nil
}

Functions

add(workflow, name, job_changeset, opts \\ [])

@spec add(t(), step_name(), Ecto.Changeset.t(), keyword()) :: t()

Add a named step to the workflow.

Raises ArgumentError immediately on a duplicate step name. Full cycle detection runs at insert/2 time.

insert(workflow, opts_or_repo \\ [])

@spec insert(t(), keyword() | module()) ::
  {:ok, [Oban.Job.t()]} | {:error, {String.t(), term()}}

Validate the DAG, then insert all jobs and their workflow nodes in one transaction.

Returns {:ok, jobs} or {:error, {message, reason}}.

Options

For backwards compatibility, passing a repo module as the second argument is accepted and ignored — the repo now comes from Oban.

insert!(workflow, opts_or_repo \\ [])

@spec insert!(t(), keyword() | module()) :: [Oban.Job.t()]

Like insert/2 but returns the jobs directly or raises on failure.

new(opts \\ [])

@spec new(keyword()) :: t()

Initialize a new workflow.

step_names(baton)

@spec step_names(t()) :: [String.t()]

Step names in the order they were added.

validate(workflow)

@spec validate(t()) :: {:ok, [String.t()]} | {:error, {String.t(), term()}}

Validate the workflow DAG without inserting.

Returns {:ok, topological_order} or {:error, {message, reason}}.