Building a Workflow

Copy Markdown View Source

This guide walks through a complete Baton workflow end to end: a fan-out / fan-in (diamond) DAG, the pruning setup that keeps its data from accumulating, and a LiveView that renders progress live over PubSub.

The example is a small "research brief" pipeline:

            
               fetch_sources              (root)
            
                 fan-out: 3 steps run in parallel
                                
    
  summarize   keywords    credibility  
    
                fan-in: waits for all three
                     
            
              compile_brief   
            

fetch_sources fans out to three independent analyses that run concurrently; compile_brief fans them in, running only once all three have finished, and reads each of their results.

1. Setup

Install the schema and register the plugin (see also the getting started guide):

defmodule MyApp.Repo.Migrations.AddBaton do
  use Ecto.Migration
  def up, do: Baton.Migration.up()
  def down, do: Baton.Migration.down()
end
# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  queues: [default: 20],
  plugins: [
    {Oban.Plugins.Pruner, max_age: 60 * 60 * 24},
    {Baton.Plugin, interval: :timer.seconds(60)}
  ]

2. Define the steps

Each step is a Baton.Worker. A worker implements perform_workflow/1 and returns {:ok, result}; the result is stored on the step's node and made available to downstream steps. Downstream steps read their dependencies' results with Baton.Results.get_result/2 (or get_all_results/1).

defmodule MyApp.Research.FetchSources do
  use Baton.Worker, queue: :default

  @impl true
  def perform_workflow(%Oban.Job{args: %{"topic" => topic}}) do
    # In a real pipeline this would hit an API; here we just return some text.
    {:ok, %{"documents" => MyApp.Search.fetch(topic)}}
  end
end

The three fan-out steps each depend on fetch_sources and read its result:

defmodule MyApp.Research.Summarize do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    {:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
    {:ok, %{"summary" => MyApp.LLM.summarize(docs)}}
  end
end

defmodule MyApp.Research.Keywords do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    {:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
    {:ok, %{"keywords" => MyApp.LLM.keywords(docs)}}
  end
end

defmodule MyApp.Research.Credibility do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    {:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
    {:ok, %{"score" => MyApp.LLM.credibility(docs)}}
  end
end

The fan-in step depends on all three and reads each result. With get_all_results/1 you get every dependency's result keyed by step name:

defmodule MyApp.Research.CompileBrief do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    %{
      "summarize" => %{"summary" => summary},
      "keywords" => %{"keywords" => keywords},
      "credibility" => %{"score" => score}
    } = Results.get_all_results(job)

    brief = %{
      summary: summary,
      keywords: keywords,
      credibility: score
    }

    {:ok, %{"brief" => brief}}
  end
end

3. Build and insert the workflow

deps: is what wires the DAG. The fan-out is three steps sharing one dependency; the fan-in is one step depending on all three. Baton validates the graph (no cycles, all deps exist) before inserting anything.

alias MyApp.Research.{FetchSources, Summarize, Keywords, Credibility, CompileBrief}

{:ok, jobs} =
  Baton.new(workflow_name: "research:#{topic}")
  |> Baton.add(:fetch_sources, FetchSources.new(%{topic: topic}))
  # fan-out — all three depend only on fetch_sources, so they run in parallel
  |> Baton.add(:summarize,   Summarize.new(%{}),   deps: [:fetch_sources])
  |> Baton.add(:keywords,    Keywords.new(%{}),    deps: [:fetch_sources])
  |> Baton.add(:credibility, Credibility.new(%{}), deps: [:fetch_sources])
  # fan-in — runs only after all three finish
  |> Baton.add(:compile_brief, CompileBrief.new(%{}),
       deps: [:summarize, :keywords, :credibility])
  |> Baton.insert()

That's it — Baton handles ordering, parallelism, and result passing. Each step gates itself at runtime: the fan-in step snoozes until its three dependencies have completed, then runs once.

Tip

To run the same worker across several models and synthesize the outputs, use Baton.MultiModel.fan_out/4 instead of adding the parallel steps by hand — see the multi-model guide.

4. Set up pruning

Baton's tables (workflow_nodes, workflow_step_stats, workflow_debug_logs, workflow_completions) have no foreign key to oban_jobs, so Oban's Pruner won't clean them up. Turn on pruning in the plugin so Baton rows are removed once their backing Oban job has been pruned:

plugins: [
  {Oban.Plugins.Pruner, max_age: 60 * 60 * 24},     # 24h — see note below
  {Baton.Plugin,
    interval: :timer.seconds(60),
    prune: true,                       # off by default
    debug_log_max_age: 60 * 60 * 24}   # optional: cap debug logs at 24h (seconds)
]
  • prune: true deletes orphaned Baton rows each sweep (orphaned = their Oban job is gone). This piggybacks on Oban's Pruner, so you have a single retention policy.
  • debug_log_max_age gives workflow_debug_logs — the largest rows — a separate, shorter age cap.

The Pruner must outlast your workflows

Set the Pruner's max_age longer than your slowest workflow's total runtime. Baton resolves a dependency's result from its Oban job; if Oban prunes a completed step while the workflow is still running, downstream steps will treat that dependency as failed. A day is generous for most workflows.

5. Render progress in a LiveView

Baton broadcasts every step transition over Phoenix.PubSub, plus a single terminal event when the whole workflow settles. Point Baton at your app's PubSub server:

config :baton, pubsub: MyApp.PubSub

Each transition is published on two topics — "workflow:all" and "workflow:<workflow_id>" — as {:workflow_step_updated, payload}. When the last step settles, a one-shot {:workflow_finished, payload} is published on the same topics. Subscribe with Baton.Events:

defmodule MyAppWeb.ResearchLive do
  use MyAppWeb, :live_view
  alias Baton.Events

  @impl true
  def mount(%{"id" => workflow_id}, _session, socket) do
    if connected?(socket), do: Events.subscribe_workflow(workflow_id)

    {:ok,
     socket
     |> assign(:workflow_id, workflow_id)
     |> assign(:steps, %{})       # step_name => latest event
     |> assign(:outcome, nil)}
  end

  # A step changed state (executing / completed / retryable / discarded / ...).
  @impl true
  def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
    {:noreply, update(socket, :steps, &Map.put(&1, name, event))}
  end

  # Fired once, when the whole workflow is done.
  def handle_info({:workflow_finished, %{outcome: outcome, failed_steps: failed}}, socket) do
    {:noreply,
     socket
     |> assign(:outcome, outcome)        # :completed | :failed
     |> assign(:failed_steps, failed)}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <h1>Research brief <small>{@workflow_id}</small></h1>

    <p :if={@outcome}>Workflow {@outcome}</p>

    <ul>
      <li :for={{name, ev} <- Enum.sort_by(@steps, &elem(&1, 0))}>
        <strong>{name}</strong>: {ev.state}
        <span :if={ev.error}>— {ev.error}</span>
      </li>
    </ul>
    """
  end
end

The {:workflow_step_updated, _} payload carries step_name, state, worker, attempt, has_result, and error; state is one of "executing", "snoozed", "completed", "retryable", "discarded", or "cancelled".

Seed initial state on mount

PubSub only delivers events that occur after mount. To reflect a workflow that's already running (or finished) when the page loads, seed @steps from Baton.Query.get_workflow_jobs/1 in mount, then let incoming events keep it current.

The crash-case terminal event relies on Baton.Plugin being in your Oban plugins: list — see Baton.Plugin and the README's Integrating with Phoenix LiveView section.

What you built

  • A diamond DAG with one fan-out point and one fan-in point, with results flowing across every edge — no manual coordination.
  • Pruning that keeps Baton's tables bounded, tied to Oban's own retention.
  • A LiveView that updates in real time and reacts to workflow completion.

For LLM-specific concerns (cost tracking, idempotent retries, context capture, multi-model fan-out) see Baton.LLMWorker, Baton.Stats, Baton.Debug, and the multi-model guide.