Baton.Worker behaviour (Baton v0.1.0)

Copy Markdown View Source

Drop-in replacement for Oban.Worker with DAG dependency gating, result passing, PubSub broadcasting, and retry idempotency.

Retry idempotency

Before running dep checks or calling perform_workflow/1, the worker checks whether this job already stored a result on a previous attempt. If it did, it returns :ok immediately — no LLM call, no side effects, no token spend.

This handles the case where:

  • The LLM responded successfully
  • store_result wrote to the DB
  • The node crashed before Oban marked the job completed
  • Oban rescues the job and retries it

Without this guard, the retry would re-call the LLM unnecessarily.

For LLM workers, the only additional thing you need to ensure is that your perform_workflow/1 has no side effects that happen before the LLM call that would be harmful to repeat. Side effects after a successful LLM call (e.g. writing to another table) should be guarded separately if needed.

Lifecycle broadcasts

Each step broadcasts to "workflow:{workflow_id}" and "workflow:all":

  • :executing — immediately when perform/1 is invoked
  • :snoozed — when deps aren't ready yet (no retry consumed)
  • :completed — after perform_workflow/1 returns :ok / {:ok, result},
                or when a cached result is returned on retry
  • :retryable — after performworkflow/1 returns {:error, } with retries remaining
  • :discarded — after performworkflow/1 returns {:error, } with no retries left
  • :cancelled — when a dep was cancelled/discarded/pruned

Usage

For general workflow steps:

defmodule MyApp.Workers.SomeStep do
  use Baton.Worker, queue: :default, max_attempts: 3

  @impl true
  def perform_workflow(job) do
    {:ok, %{result: "something"}}
  end
end

For LLM steps, prefer Baton.LLMWorker which sets the :llm queue, jittered backoff, and a configurable timeout/1 automatically.

Summary

Callbacks

on_dep_cancelled(reason, job)

@callback on_dep_cancelled(reason :: String.t(), job :: Oban.Job.t()) :: :ok

on_dep_discarded(reason, job)

@callback on_dep_discarded(reason :: String.t(), job :: Oban.Job.t()) :: :ok

perform_workflow(t)

@callback perform_workflow(Oban.Job.t()) ::
  :ok
  | {:ok, term()}
  | {:error, term()}
  | {:snooze, pos_integer()}
  | {:discard, term()}