Baton.Stats (Baton v0.1.0)

Copy Markdown View Source

Record and query LLM usage statistics per workflow step.

Recording stats (from a worker)

Call record/2 with the current job and a usage map after a successful LLM call. LLMWorker calls this automatically when perform_workflow/1 returns {:ok, result} and the result contains a "llm_usage" key.

You can also call it manually for more control:

def perform_workflow(%Oban.Job{} = job) do
  start = System.monotonic_time(:millisecond)
  {:ok, response} = MyApp.LLM.complete(prompt)
  latency = System.monotonic_time(:millisecond) - start

  Stats.record(job, %{
    model: response.model,
    input_tokens: response.usage.input_tokens,
    output_tokens: response.usage.output_tokens,
    cache_read_tokens: response.usage.cache_read_input_tokens,
    cache_write_tokens: response.usage.cache_creation_input_tokens,
    latency_ms: latency
  })

  {:ok, %{text: response.content}}
end

Convention: embed usage in the result map

The easiest approach is to embed llm_usage in the result you return from perform_workflow/1. LLMWorker.__handle_result__/2 will strip it out before passing the result to downstream steps and record stats automatically:

def perform_workflow(job) do
  {:ok, response} = call_llm(job)

  {:ok, %{
    text: response.text,
    llm_usage: %{                          # ← picked up automatically
      model: response.model,
      input_tokens: response.usage.input_tokens,
      output_tokens: response.usage.output_tokens,
      latency_ms: response.latency_ms
    }
  }}
end

The llm_usage key is stripped before the result is stored in meta and before it's returned to downstream steps, so they never see it.

Querying

# Total cost for a workflow
Stats.workflow_totals("my-workflow-uuid")
# => %{input_tokens: 12_430, output_tokens: 3_100, cost_usd: #Decimal<0.12>}

# Per-step breakdown
Stats.steps_for_workflow("my-workflow-uuid")

# Cost by model across all workflows in a time window
Stats.cost_by_model(~U[2025-01-01 00:00:00Z], ~U[2025-02-01 00:00:00Z])

Summary

Functions

Cost grouped by model across all workflows in a time window. Useful for a cost-by-model trend dashboard.

Cost grouped by workflow label across a time window. Useful for "which workflow type is most expensive?" queries.

Daily cost aggregation over a time window, for sparkline/trend charts. Returns %{date: ~D[...], cost_usd: Decimal} per day.

Record usage stats for a single step execution.

Per-step stats for a workflow, ordered by insertion time. Useful for displaying a step-by-step cost breakdown in the dashboard.

Aggregate totals for a single workflow — tokens and cost across all steps.

Functions

cost_by_model(from_dt, to_dt)

@spec cost_by_model(DateTime.t(), DateTime.t()) :: [map()]

Cost grouped by model across all workflows in a time window. Useful for a cost-by-model trend dashboard.

Returns a list of %{model, total_cost_usd, total_tokens, call_count}.

cost_by_workflow_label(from_dt, to_dt)

@spec cost_by_workflow_label(DateTime.t(), DateTime.t()) :: [map()]

Cost grouped by workflow label across a time window. Useful for "which workflow type is most expensive?" queries.

daily_cost(from_dt, to_dt)

@spec daily_cost(DateTime.t(), DateTime.t()) :: [map()]

Daily cost aggregation over a time window, for sparkline/trend charts. Returns %{date: ~D[...], cost_usd: Decimal} per day.

record(job, usage)

@spec record(Oban.Job.t(), map()) ::
  {:ok, Baton.StepStat.t()} | {:error, Ecto.Changeset.t()}

Record usage stats for a single step execution.

usage map keys (all optional except where noted):

  • :model — model string, e.g. "claude-sonnet-4-20250514" (required for cost calc)
  • :input_tokens — standard input token count
  • :output_tokens — output token count
  • :cache_read_tokens — Anthropic cache read tokens (cheaper than input)
  • :cache_write_tokens — Anthropic cache write tokens
  • :latency_ms — wall-clock ms from request to first byte / full response
  • :from_cache — true if result was served from idempotency cache (no API call)

Cost is calculated automatically from the model + token counts using Pricing. If the model is unknown, cost_usd is stored as nil.

steps_for_workflow(workflow_id)

@spec steps_for_workflow(String.t()) :: [Baton.StepStat.t()]

Per-step stats for a workflow, ordered by insertion time. Useful for displaying a step-by-step cost breakdown in the dashboard.

workflow_totals(workflow_id)

@spec workflow_totals(String.t()) :: map()

Aggregate totals for a single workflow — tokens and cost across all steps.

Returns a map with:

  • :input_tokens — total standard input tokens
  • :output_tokens — total output tokens
  • :cache_read_tokens — total cache read tokens
  • :cache_write_tokens — total cache write tokens
  • :total_tokens — input + output (excludes cache tokens)
  • :cost_usd — total cost as Decimal (nil if any step has unknown model)
  • :latency_ms — sum of all step latencies
  • :step_count — number of steps recorded
  • :cached_steps — steps served from idempotency cache (no API spend)