Drop-in replacement for Oban.Worker with DAG dependency gating,
result passing, PubSub broadcasting, and retry idempotency.
Retry idempotency
Before running dep checks or calling perform_workflow/1, the worker checks
whether this job already stored a result on a previous attempt. If it did,
it returns :ok immediately — no LLM call, no side effects, no token spend.
This handles the case where:
- The LLM responded successfully
store_resultwrote to the DB- The node crashed before Oban marked the job
completed - Oban rescues the job and retries it
Without this guard, the retry would re-call the LLM unnecessarily.
For LLM workers, the only additional thing you need to ensure is that your
perform_workflow/1 has no side effects that happen before the LLM call
that would be harmful to repeat. Side effects after a successful LLM call
(e.g. writing to another table) should be guarded separately if needed.
Lifecycle broadcasts
Each step broadcasts to "workflow:{workflow_id}" and "workflow:all":
:executing— immediately when perform/1 is invoked:snoozed— when deps aren't ready yet (no retry consumed):completed— after perform_workflow/1 returns :ok / {:ok, result},or when a cached result is returned on retry:retryable— after performworkflow/1 returns {:error, } with retries remaining:discarded— after performworkflow/1 returns {:error, } with no retries left:cancelled— when a dep was cancelled/discarded/pruned
Usage
For general workflow steps:
defmodule MyApp.Workers.SomeStep do
use Baton.Worker, queue: :default, max_attempts: 3
@impl true
def perform_workflow(job) do
{:ok, %{result: "something"}}
end
endFor LLM steps, prefer Baton.LLMWorker which sets the :llm queue,
jittered backoff, and a configurable timeout/1 automatically.
Summary
Callbacks
@callback on_dep_cancelled(reason :: String.t(), job :: Oban.Job.t()) :: :ok
@callback on_dep_discarded(reason :: String.t(), job :: Oban.Job.t()) :: :ok
@callback perform_workflow(Oban.Job.t()) :: :ok | {:ok, term()} | {:error, term()} | {:snooze, pos_integer()} | {:discard, term()}