Baton

Hex.pm Documentation CI License

DAG-based job workflows for Oban: dependency ordering, fan-out/synthesis, result passing, retry idempotency, and per-step LLM cost tracking — built entirely on Oban OSS, no Oban Pro required.

Features

  • Directed acyclic graphs of Oban jobs with named dependencies, validated for cycles before insertion (Kahn's algorithm).
  • Self-gating execution — each job checks its dependencies at runtime and snoozes, proceeds, or cancels accordingly. No external scheduler.
  • Completion-triggered rescheduling so downstream steps start promptly instead of waiting out a snooze timer.
  • Result passing between steps, stored in the engine's own table (never in oban_jobs.meta).
  • Retry idempotency — a retried step that already produced a result returns it without re-running side effects (important for paid LLM calls).
  • Multi-model fan-out — run the same step across several models and synthesize the results.
  • Observability — telemetry for every transition, optional per-step token/ cost stats, optional full context-window capture, and live step events over Phoenix.PubSub for building a LiveView dashboard.

Installation

def deps do
  [
    {:baton, "~> 0.1"},
    {:oban, "~> 2.17"}
  ]
end

Add the schema via a migration:

defmodule MyApp.Repo.Migrations.AddBaton do
  use Ecto.Migration
  # Omit :version to install the latest schema. The migration is idempotent
  # (create_if_not_exists), so to upgrade an existing install you can ship a new
  # migration that simply calls Baton.Migration.up/0 again.
  def up,   do: Baton.Migration.up()
  def down, do: Baton.Migration.down()
end

Configure (the repo is inherited from Oban automatically):

config :baton,
  oban_name: Oban,
  pubsub: MyApp.PubSub,            # only for live events/dashboard
  pricing: MyApp.LLMPricing        # only if tracking cost

config :my_app, Oban,
  plugins: [
    Oban.Plugins.Pruner,
    {Baton.Plugin, interval: :timer.seconds(60)}
  ],
  queues: [default: 20]

Data retention

Baton's tables (workflow_nodes, workflow_step_stats, workflow_debug_logs, workflow_completions) have no foreign key to oban_jobs, so Oban's Pruner does not clean them up — left alone they grow without bound. Enable pruning on Baton.Plugin to delete Baton rows once their backing Oban job has been pruned:

{Baton.Plugin,
  interval: :timer.seconds(60),
  prune: true,                       # off by default
  debug_log_max_age: 24 * 60 * 60}   # optional: cap debug logs at 24h (seconds)

This piggybacks on Oban's Pruner, so there's a single retention policy. For this to be safe, the Pruner's max_age must exceed your longest workflow's runtime (which Baton already requires for correct dependency gating) — set it generously, e.g. {Oban.Plugins.Pruner, max_age: 60 * 60 * 24}.

Usage

defmodule MyApp.Steps.Fetch do
  use Baton.Worker, queue: :default

  @impl true
  def perform_workflow(%Oban.Job{args: %{"url" => url}}) do
    {:ok, %{body: fetch(url)}}
  end
end

Baton.new(workflow_name: "ingest")
|> Baton.add(:fetch, MyApp.Steps.Fetch.new(%{url: "https://example.com"}))
|> Baton.add(:parse, MyApp.Steps.Parse.new(%{}), deps: [:fetch])
|> Baton.add(:store, MyApp.Steps.Store.new(%{}), deps: [:parse])
|> Baton.insert!()

See the getting started guide, the building a workflow guide (fan-out/fan-in, pruning, and a live LiveView), and the multi-model guide.

Integrating with Phoenix LiveView

Baton ships no LiveView of its own. Instead, every step transition is broadcast over Phoenix.PubSub, so you render progress however you like. (The same transitions are also emitted as telemetry — see Baton.Telemetry — if you'd rather not use Phoenix at all.)

1. Point Baton at your PubSub

A Phoenix app already starts one in its supervision tree ({Phoenix.PubSub, name: MyApp.PubSub}). Tell Baton to use it:

config :baton, pubsub: MyApp.PubSub

If :pubsub is left unset, broadcasting is a no-op and the engine runs fine without Phoenix — only telemetry is emitted.

2. Topics and message shape

Each transition is published on two topics so views can subscribe at the granularity they need:

  • "workflow:all" — every event from every workflow (index views)
  • "workflow:<workflow_id>" — one workflow's events (detail views)

Don't build these strings by hand — use the helpers in Baton.Events. The message is always:

{:workflow_step_updated, %{
  workflow_id:    "uuid",
  workflow_label: "patent:US11234567B2",  # the :workflow_name you passed to new/1
  step_name:      "assess_quality",
  worker:         "MyApp.Patent.AssessQuality",
  state:          "completed",  # see below
  job_id:         123,
  attempt:        1,
  has_result:     true,
  error:          nil,          # an error string on failure, else nil
  timestamp:      ~U[2026-06-14 18:00:00Z]
}}

state is one of "executing", "snoozed", "completed", "retryable", "discarded", or "cancelled".

When the last step in a workflow settles, a single terminal event is published on the same two topics:

{:workflow_finished, %{
  workflow_id:    "uuid",
  workflow_label: "patent:US11234567B2",
  outcome:        :completed,    # or :failed
  failed_steps:   [],            # step names that were cancelled/discarded
  timestamp:      ~U[2026-06-14 18:00:24Z]
}}

Use it to flip the page to a done state, redirect, or fire a notification without polling. (The same signal is available as [:baton, :workflow, :finished] telemetry if you're not using PubSub.)

Requires Baton.Plugin for crash-case coverage. When a step fails by returning {:error, reason}, the finished event fires immediately. But if a step hard-crashes (raises/exits) or is killed by Oban, the worker never gets to announce — Baton.Plugin's periodic sweep is what detects the settled workflow and broadcasts {:workflow_finished, outcome: :failed} as a backstop (typically within one sweep interval). Make sure the plugin is in your Oban plugins: list (see Installation); without it, workflows that die from a hard crash won't emit a terminal event.

3. Subscribe in a LiveView

defmodule MyAppWeb.WorkflowLive do
  use MyAppWeb, :live_view
  alias Baton.Events

  def mount(%{"id" => workflow_id}, _session, socket) do
    if connected?(socket), do: Events.subscribe_workflow(workflow_id)
    {:ok, assign(socket, workflow_id: workflow_id, steps: %{})}
  end

  def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
    {:noreply, update(socket, :steps, &Map.put(&1, name, event))}
  end

  # ... render @steps ...
end

For an index of all running workflows, subscribe with Events.subscribe_all/0 and key your state by event.workflow_id. A complete, copy-paste pair of detail and index LiveViews lives in examples/my_app/live/workflow_live.ex.

Seeding initial state

PubSub only delivers events that occur after mount, so a fresh page load (or a step that completed before the user opened the view) won't be reflected by events alone. Seed @steps from the database on mount using Baton.Query, then let incoming events keep it current — and handle {:workflow_finished, _} to react when the whole workflow is done.

How it compares to Oban Pro Workflow

Baton covers DAG ordering, fan-out/fan-in, dynamic workflows, result passing, and dependency-failure cascading. It adds cycle detection, retry idempotency, multi-model fan-out, and LLM cost tracking. The main mechanical difference is that completion uses snooze-based gating plus an opportunistic reschedule rather than Pro's event-driven completion; correctness does not depend on the reschedule.

License

MIT — see LICENSE.