Record and query LLM usage statistics per workflow step.
Recording stats (from a worker)
Call record/2 with the current job and a usage map after a successful
LLM call. LLMWorker calls this automatically when perform_workflow/1
returns {:ok, result} and the result contains a "llm_usage" key.
You can also call it manually for more control:
def perform_workflow(%Oban.Job{} = job) do
start = System.monotonic_time(:millisecond)
{:ok, response} = MyApp.LLM.complete(prompt)
latency = System.monotonic_time(:millisecond) - start
Stats.record(job, %{
model: response.model,
input_tokens: response.usage.input_tokens,
output_tokens: response.usage.output_tokens,
cache_read_tokens: response.usage.cache_read_input_tokens,
cache_write_tokens: response.usage.cache_creation_input_tokens,
latency_ms: latency
})
{:ok, %{text: response.content}}
endConvention: embed usage in the result map
The easiest approach is to embed llm_usage in the result you return from
perform_workflow/1. LLMWorker.__handle_result__/2 will strip it out
before passing the result to downstream steps and record stats automatically:
def perform_workflow(job) do
{:ok, response} = call_llm(job)
{:ok, %{
text: response.text,
llm_usage: %{ # ← picked up automatically
model: response.model,
input_tokens: response.usage.input_tokens,
output_tokens: response.usage.output_tokens,
latency_ms: response.latency_ms
}
}}
endThe llm_usage key is stripped before the result is stored in meta and
before it's returned to downstream steps, so they never see it.
Querying
# Total cost for a workflow
Stats.workflow_totals("my-workflow-uuid")
# => %{input_tokens: 12_430, output_tokens: 3_100, cost_usd: #Decimal<0.12>}
# Per-step breakdown
Stats.steps_for_workflow("my-workflow-uuid")
# Cost by model across all workflows in a time window
Stats.cost_by_model(~U[2025-01-01 00:00:00Z], ~U[2025-02-01 00:00:00Z])
Summary
Functions
Cost grouped by model across all workflows in a time window. Useful for a cost-by-model trend dashboard.
Cost grouped by workflow label across a time window. Useful for "which workflow type is most expensive?" queries.
Daily cost aggregation over a time window, for sparkline/trend charts.
Returns %{date: ~D[...], cost_usd: Decimal} per day.
Record usage stats for a single step execution.
Per-step stats for a workflow, ordered by insertion time. Useful for displaying a step-by-step cost breakdown in the dashboard.
Aggregate totals for a single workflow — tokens and cost across all steps.
Functions
@spec cost_by_model(DateTime.t(), DateTime.t()) :: [map()]
Cost grouped by model across all workflows in a time window. Useful for a cost-by-model trend dashboard.
Returns a list of %{model, total_cost_usd, total_tokens, call_count}.
@spec cost_by_workflow_label(DateTime.t(), DateTime.t()) :: [map()]
Cost grouped by workflow label across a time window. Useful for "which workflow type is most expensive?" queries.
@spec daily_cost(DateTime.t(), DateTime.t()) :: [map()]
Daily cost aggregation over a time window, for sparkline/trend charts.
Returns %{date: ~D[...], cost_usd: Decimal} per day.
@spec record(Oban.Job.t(), map()) :: {:ok, Baton.StepStat.t()} | {:error, Ecto.Changeset.t()}
Record usage stats for a single step execution.
usage map keys (all optional except where noted):
:model— model string, e.g."claude-sonnet-4-20250514"(required for cost calc):input_tokens— standard input token count:output_tokens— output token count:cache_read_tokens— Anthropic cache read tokens (cheaper than input):cache_write_tokens— Anthropic cache write tokens:latency_ms— wall-clock ms from request to first byte / full response:from_cache— true if result was served from idempotency cache (no API call)
Cost is calculated automatically from the model + token counts using Pricing.
If the model is unknown, cost_usd is stored as nil.
@spec steps_for_workflow(String.t()) :: [Baton.StepStat.t()]
Per-step stats for a workflow, ordered by insertion time. Useful for displaying a step-by-step cost breakdown in the dashboard.
Aggregate totals for a single workflow — tokens and cost across all steps.
Returns a map with:
:input_tokens— total standard input tokens:output_tokens— total output tokens:cache_read_tokens— total cache read tokens:cache_write_tokens— total cache write tokens:total_tokens— input + output (excludes cache tokens):cost_usd— total cost as Decimal (nil if any step has unknown model):latency_ms— sum of all step latencies:step_count— number of steps recorded:cached_steps— steps served from idempotency cache (no API spend)