This guide walks through a complete Baton workflow end to end: a fan-out / fan-in (diamond) DAG, the pruning setup that keeps its data from accumulating, and a LiveView that renders progress live over PubSub.
The example is a small "research brief" pipeline:
┌──────────────────┐
│ fetch_sources │ (root)
└────────┬─────────┘
┌────────────┼────────────┐ fan-out: 3 steps run in parallel
▼ ▼ ▼
┌──────────┐ ┌───────────┐ ┌──────────────┐
│summarize │ │ keywords │ │ credibility │
└────┬─────┘ └─────┬─────┘ └──────┬───────┘
└─────────────┼──────────────┘ fan-in: waits for all three
▼
┌──────────────────┐
│ compile_brief │
└──────────────────┘fetch_sources fans out to three independent analyses that run
concurrently; compile_brief fans them in, running only once all three have
finished, and reads each of their results.
1. Setup
Install the schema and register the plugin (see also the getting started guide):
defmodule MyApp.Repo.Migrations.AddBaton do
use Ecto.Migration
def up, do: Baton.Migration.up()
def down, do: Baton.Migration.down()
end# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 20],
plugins: [
{Oban.Plugins.Pruner, max_age: 60 * 60 * 24},
{Baton.Plugin, interval: :timer.seconds(60)}
]2. Define the steps
Each step is a Baton.Worker. A worker implements perform_workflow/1 and
returns {:ok, result}; the result is stored on the step's node and made
available to downstream steps. Downstream steps read their dependencies'
results with Baton.Results.get_result/2 (or get_all_results/1).
defmodule MyApp.Research.FetchSources do
use Baton.Worker, queue: :default
@impl true
def perform_workflow(%Oban.Job{args: %{"topic" => topic}}) do
# In a real pipeline this would hit an API; here we just return some text.
{:ok, %{"documents" => MyApp.Search.fetch(topic)}}
end
endThe three fan-out steps each depend on fetch_sources and read its result:
defmodule MyApp.Research.Summarize do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
{:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
{:ok, %{"summary" => MyApp.LLM.summarize(docs)}}
end
end
defmodule MyApp.Research.Keywords do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
{:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
{:ok, %{"keywords" => MyApp.LLM.keywords(docs)}}
end
end
defmodule MyApp.Research.Credibility do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
{:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
{:ok, %{"score" => MyApp.LLM.credibility(docs)}}
end
endThe fan-in step depends on all three and reads each result. With
get_all_results/1 you get every dependency's result keyed by step name:
defmodule MyApp.Research.CompileBrief do
use Baton.Worker, queue: :default
alias Baton.Results
@impl true
def perform_workflow(%Oban.Job{} = job) do
%{
"summarize" => %{"summary" => summary},
"keywords" => %{"keywords" => keywords},
"credibility" => %{"score" => score}
} = Results.get_all_results(job)
brief = %{
summary: summary,
keywords: keywords,
credibility: score
}
{:ok, %{"brief" => brief}}
end
end3. Build and insert the workflow
deps: is what wires the DAG. The fan-out is three steps sharing one
dependency; the fan-in is one step depending on all three. Baton validates the
graph (no cycles, all deps exist) before inserting anything.
alias MyApp.Research.{FetchSources, Summarize, Keywords, Credibility, CompileBrief}
{:ok, jobs} =
Baton.new(workflow_name: "research:#{topic}")
|> Baton.add(:fetch_sources, FetchSources.new(%{topic: topic}))
# fan-out — all three depend only on fetch_sources, so they run in parallel
|> Baton.add(:summarize, Summarize.new(%{}), deps: [:fetch_sources])
|> Baton.add(:keywords, Keywords.new(%{}), deps: [:fetch_sources])
|> Baton.add(:credibility, Credibility.new(%{}), deps: [:fetch_sources])
# fan-in — runs only after all three finish
|> Baton.add(:compile_brief, CompileBrief.new(%{}),
deps: [:summarize, :keywords, :credibility])
|> Baton.insert()That's it — Baton handles ordering, parallelism, and result passing. Each step gates itself at runtime: the fan-in step snoozes until its three dependencies have completed, then runs once.
Tip
To run the same worker across several models and synthesize the outputs,
use Baton.MultiModel.fan_out/4 instead of adding the parallel steps by hand
— see the multi-model guide.
4. Set up pruning
Baton's tables (workflow_nodes, workflow_step_stats, workflow_debug_logs,
workflow_completions) have no foreign key to oban_jobs, so Oban's Pruner
won't clean them up. Turn on pruning in the plugin so Baton rows are removed
once their backing Oban job has been pruned:
plugins: [
{Oban.Plugins.Pruner, max_age: 60 * 60 * 24}, # 24h — see note below
{Baton.Plugin,
interval: :timer.seconds(60),
prune: true, # off by default
debug_log_max_age: 60 * 60 * 24} # optional: cap debug logs at 24h (seconds)
]prune: truedeletes orphaned Baton rows each sweep (orphaned = their Oban job is gone). This piggybacks on Oban'sPruner, so you have a single retention policy.debug_log_max_agegivesworkflow_debug_logs— the largest rows — a separate, shorter age cap.
The Pruner must outlast your workflows
Set the Pruner's max_age longer than your slowest workflow's total
runtime. Baton resolves a dependency's result from its Oban job; if Oban prunes
a completed step while the workflow is still running, downstream steps will
treat that dependency as failed. A day is generous for most workflows.
5. Render progress in a LiveView
Baton broadcasts every step transition over Phoenix.PubSub, plus a single
terminal event when the whole workflow settles. Point Baton at your app's
PubSub server:
config :baton, pubsub: MyApp.PubSubEach transition is published on two topics — "workflow:all" and
"workflow:<workflow_id>" — as {:workflow_step_updated, payload}. When the
last step settles, a one-shot {:workflow_finished, payload} is published on
the same topics. Subscribe with Baton.Events:
defmodule MyAppWeb.ResearchLive do
use MyAppWeb, :live_view
alias Baton.Events
@impl true
def mount(%{"id" => workflow_id}, _session, socket) do
if connected?(socket), do: Events.subscribe_workflow(workflow_id)
{:ok,
socket
|> assign(:workflow_id, workflow_id)
|> assign(:steps, %{}) # step_name => latest event
|> assign(:outcome, nil)}
end
# A step changed state (executing / completed / retryable / discarded / ...).
@impl true
def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
{:noreply, update(socket, :steps, &Map.put(&1, name, event))}
end
# Fired once, when the whole workflow is done.
def handle_info({:workflow_finished, %{outcome: outcome, failed_steps: failed}}, socket) do
{:noreply,
socket
|> assign(:outcome, outcome) # :completed | :failed
|> assign(:failed_steps, failed)}
end
@impl true
def render(assigns) do
~H"""
<h1>Research brief <small>{@workflow_id}</small></h1>
<p :if={@outcome}>Workflow {@outcome}</p>
<ul>
<li :for={{name, ev} <- Enum.sort_by(@steps, &elem(&1, 0))}>
<strong>{name}</strong>: {ev.state}
<span :if={ev.error}>— {ev.error}</span>
</li>
</ul>
"""
end
endThe {:workflow_step_updated, _} payload carries step_name, state,
worker, attempt, has_result, and error; state is one of "executing",
"snoozed", "completed", "retryable", "discarded", or "cancelled".
Seed initial state on mount
PubSub only delivers events that occur after mount. To reflect a workflow
that's already running (or finished) when the page loads, seed @steps from
Baton.Query.get_workflow_jobs/1 in mount, then let incoming events keep it
current.
The crash-case terminal event relies on Baton.Plugin being in your Oban
plugins: list — see Baton.Plugin and the README's
Integrating with Phoenix LiveView section.
What you built
- A diamond DAG with one fan-out point and one fan-in point, with results flowing across every edge — no manual coordination.
- Pruning that keeps Baton's tables bounded, tied to Oban's own retention.
- A LiveView that updates in real time and reacts to workflow completion.
For LLM-specific concerns (cost tracking, idempotent retries, context capture,
multi-model fan-out) see Baton.LLMWorker, Baton.Stats, Baton.Debug, and
the multi-model guide.