# Building a Workflow

This guide walks through a complete Baton workflow end to end: a **fan-out /
fan-in** (diamond) DAG, the **pruning** setup that keeps its data from
accumulating, and a **LiveView** that renders progress live over PubSub.

The example is a small "research brief" pipeline:

```
            ┌──────────────────┐
            │   fetch_sources  │            (root)
            └────────┬─────────┘
        ┌────────────┼────────────┐         fan-out: 3 steps run in parallel
        ▼            ▼            ▼
  ┌──────────┐ ┌───────────┐ ┌──────────────┐
  │summarize │ │ keywords  │ │ credibility  │
  └────┬─────┘ └─────┬─────┘ └──────┬───────┘
       └─────────────┼──────────────┘         fan-in: waits for all three
                     ▼
            ┌──────────────────┐
            │  compile_brief   │
            └──────────────────┘
```

`fetch_sources` **fans out** to three independent analyses that run
concurrently; `compile_brief` **fans them in**, running only once all three have
finished, and reads each of their results.

## 1. Setup

Install the schema and register the plugin (see also the
[getting started guide](getting_started.md)):

```elixir
defmodule MyApp.Repo.Migrations.AddBaton do
  use Ecto.Migration
  def up, do: Baton.Migration.up()
  def down, do: Baton.Migration.down()
end
```

```elixir
# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  queues: [default: 20],
  plugins: [
    {Oban.Plugins.Pruner, max_age: 60 * 60 * 24},
    {Baton.Plugin, interval: :timer.seconds(60)}
  ]
```

## 2. Define the steps

Each step is a `Baton.Worker`. A worker implements `perform_workflow/1` and
returns `{:ok, result}`; the result is stored on the step's node and made
available to downstream steps. Downstream steps read their dependencies'
results with `Baton.Results.get_result/2` (or `get_all_results/1`).

```elixir
defmodule MyApp.Research.FetchSources do
  use Baton.Worker, queue: :default

  @impl true
  def perform_workflow(%Oban.Job{args: %{"topic" => topic}}) do
    # In a real pipeline this would hit an API; here we just return some text.
    {:ok, %{"documents" => MyApp.Search.fetch(topic)}}
  end
end
```

The three fan-out steps each depend on `fetch_sources` and read its result:

```elixir
defmodule MyApp.Research.Summarize do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    {:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
    {:ok, %{"summary" => MyApp.LLM.summarize(docs)}}
  end
end

defmodule MyApp.Research.Keywords do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    {:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
    {:ok, %{"keywords" => MyApp.LLM.keywords(docs)}}
  end
end

defmodule MyApp.Research.Credibility do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    {:ok, %{"documents" => docs}} = Results.get_result(job, :fetch_sources)
    {:ok, %{"score" => MyApp.LLM.credibility(docs)}}
  end
end
```

The fan-in step depends on **all three** and reads each result. With
`get_all_results/1` you get every dependency's result keyed by step name:

```elixir
defmodule MyApp.Research.CompileBrief do
  use Baton.Worker, queue: :default
  alias Baton.Results

  @impl true
  def perform_workflow(%Oban.Job{} = job) do
    %{
      "summarize" => %{"summary" => summary},
      "keywords" => %{"keywords" => keywords},
      "credibility" => %{"score" => score}
    } = Results.get_all_results(job)

    brief = %{
      summary: summary,
      keywords: keywords,
      credibility: score
    }

    {:ok, %{"brief" => brief}}
  end
end
```

## 3. Build and insert the workflow

`deps:` is what wires the DAG. The fan-out is three steps sharing one
dependency; the fan-in is one step depending on all three. Baton validates the
graph (no cycles, all deps exist) before inserting anything.

```elixir
alias MyApp.Research.{FetchSources, Summarize, Keywords, Credibility, CompileBrief}

{:ok, jobs} =
  Baton.new(workflow_name: "research:#{topic}")
  |> Baton.add(:fetch_sources, FetchSources.new(%{topic: topic}))
  # fan-out — all three depend only on fetch_sources, so they run in parallel
  |> Baton.add(:summarize,   Summarize.new(%{}),   deps: [:fetch_sources])
  |> Baton.add(:keywords,    Keywords.new(%{}),    deps: [:fetch_sources])
  |> Baton.add(:credibility, Credibility.new(%{}), deps: [:fetch_sources])
  # fan-in — runs only after all three finish
  |> Baton.add(:compile_brief, CompileBrief.new(%{}),
       deps: [:summarize, :keywords, :credibility])
  |> Baton.insert()
```

That's it — Baton handles ordering, parallelism, and result passing. Each step
gates itself at runtime: the fan-in step snoozes until its three dependencies
have completed, then runs once.

> #### Tip {: .tip}
>
> To run the *same* worker across several models and synthesize the outputs,
> use `Baton.MultiModel.fan_out/4` instead of adding the parallel steps by hand
> — see the [multi-model guide](multi_model.md).

## 4. Set up pruning

Baton's tables (`workflow_nodes`, `workflow_step_stats`, `workflow_debug_logs`,
`workflow_completions`) have no foreign key to `oban_jobs`, so Oban's `Pruner`
won't clean them up. Turn on pruning in the plugin so Baton rows are removed
once their backing Oban job has been pruned:

```elixir
plugins: [
  {Oban.Plugins.Pruner, max_age: 60 * 60 * 24},     # 24h — see note below
  {Baton.Plugin,
    interval: :timer.seconds(60),
    prune: true,                       # off by default
    debug_log_max_age: 60 * 60 * 24}   # optional: cap debug logs at 24h (seconds)
]
```

- `prune: true` deletes orphaned Baton rows each sweep (orphaned = their Oban
  job is gone). This piggybacks on Oban's `Pruner`, so you have a single
  retention policy.
- `debug_log_max_age` gives `workflow_debug_logs` — the largest rows — a
  separate, shorter age cap.

> #### The Pruner must outlast your workflows {: .warning}
>
> Set the `Pruner`'s `max_age` longer than your slowest workflow's total
> runtime. Baton resolves a dependency's result from its Oban job; if Oban prunes
> a completed step while the workflow is still running, downstream steps will
> treat that dependency as failed. A day is generous for most workflows.

## 5. Render progress in a LiveView

Baton broadcasts every step transition over `Phoenix.PubSub`, plus a single
terminal event when the whole workflow settles. Point Baton at your app's
PubSub server:

```elixir
config :baton, pubsub: MyApp.PubSub
```

Each transition is published on two topics — `"workflow:all"` and
`"workflow:<workflow_id>"` — as `{:workflow_step_updated, payload}`. When the
last step settles, a one-shot `{:workflow_finished, payload}` is published on
the same topics. Subscribe with `Baton.Events`:

```elixir
defmodule MyAppWeb.ResearchLive do
  use MyAppWeb, :live_view
  alias Baton.Events

  @impl true
  def mount(%{"id" => workflow_id}, _session, socket) do
    if connected?(socket), do: Events.subscribe_workflow(workflow_id)

    {:ok,
     socket
     |> assign(:workflow_id, workflow_id)
     |> assign(:steps, %{})       # step_name => latest event
     |> assign(:outcome, nil)}
  end

  # A step changed state (executing / completed / retryable / discarded / ...).
  @impl true
  def handle_info({:workflow_step_updated, %{step_name: name} = event}, socket) do
    {:noreply, update(socket, :steps, &Map.put(&1, name, event))}
  end

  # Fired once, when the whole workflow is done.
  def handle_info({:workflow_finished, %{outcome: outcome, failed_steps: failed}}, socket) do
    {:noreply,
     socket
     |> assign(:outcome, outcome)        # :completed | :failed
     |> assign(:failed_steps, failed)}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <h1>Research brief <small>{@workflow_id}</small></h1>

    <p :if={@outcome}>Workflow {@outcome}</p>

    <ul>
      <li :for={{name, ev} <- Enum.sort_by(@steps, &elem(&1, 0))}>
        <strong>{name}</strong>: {ev.state}
        <span :if={ev.error}>— {ev.error}</span>
      </li>
    </ul>
    """
  end
end
```

The `{:workflow_step_updated, _}` payload carries `step_name`, `state`,
`worker`, `attempt`, `has_result`, and `error`; `state` is one of `"executing"`,
`"snoozed"`, `"completed"`, `"retryable"`, `"discarded"`, or `"cancelled"`.

> #### Seed initial state on mount {: .info}
>
> PubSub only delivers events that occur *after* `mount`. To reflect a workflow
> that's already running (or finished) when the page loads, seed `@steps` from
> `Baton.Query.get_workflow_jobs/1` in `mount`, then let incoming events keep it
> current.
>
> The crash-case terminal event relies on `Baton.Plugin` being in your Oban
> `plugins:` list — see [`Baton.Plugin`](Baton.Plugin.html) and the README's
> *Integrating with Phoenix LiveView* section.

## What you built

- A diamond DAG with one **fan-out** point and one **fan-in** point, with
  results flowing across every edge — no manual coordination.
- **Pruning** that keeps Baton's tables bounded, tied to Oban's own retention.
- A **LiveView** that updates in real time and reacts to workflow completion.

For LLM-specific concerns (cost tracking, idempotent retries, context capture,
multi-model fan-out) see `Baton.LLMWorker`, `Baton.Stats`, `Baton.Debug`, and
the [multi-model guide](multi_model.md).
