Observability & Custom Telemetry
Copy MarkdownThis guide shows how to build custom observability middleware that emits telemetry events, OpenTelemetry spans, or any other instrumentation from your agent's LLM and tool execution lifecycle.
Why Middleware?
Sagents provides a callbacks/1 middleware callback that lets you hook into every LLM event — token usage, tool execution, message processing, errors, and more. Rather than building a one-size-fits-all telemetry integration, Sagents leaves this to you as a custom middleware because:
- Your telemetry metadata is unique — customer IDs, tenant context, billing tiers, feature flags
- Your instrumentation stack is unique —
:telemetry, OpenTelemetry, StatsD, Prometheus, Datadog - Your event shapes are unique — what you measure and how you label it depends on your domain
The callbacks/1 callback gives you direct access to LLM lifecycle events, and your middleware's init/1 config is the natural place to carry your application-specific context.
Quick Start
Here's a minimal observability middleware that emits :telemetry events for token usage:
defmodule MyApp.Middleware.Observability do
@behaviour Sagents.Middleware
@impl true
def init(opts) do
{:ok, %{service_name: Keyword.get(opts, :service_name, "agent")}}
end
@impl true
def callbacks(config) do
%{
on_llm_token_usage: fn chain, usage ->
%{model: model_name} = chain.llm
:telemetry.execute(
[:myapp, :llm, :token_usage],
%{input: usage.input, output: usage.output},
%{service: config.service_name, model: model_name}
)
end
}
end
endAdd it to your agent:
{:ok, agent} = Sagents.Agent.new(%{
model: model,
middleware: [{MyApp.Middleware.Observability, service_name: "support-agent"}]
})That's it. Every LLM call made by this agent will now emit a [:myapp, :llm, :token_usage] telemetry event with your service name attached.
Passing Application-Specific Context
The real power of middleware-based observability is that init/1 receives your application context and callbacks/1 closes over it. This means every event you emit can carry metadata that is specific to your system — customer IDs, user IDs, billing tiers, or anything else.
defmodule MyApp.Middleware.Observability do
@behaviour Sagents.Middleware
@impl true
def init(opts) do
{:ok, %{
service_name: Keyword.get(opts, :service_name, "agent"),
customer_id: Keyword.fetch!(opts, :customer_id),
user_id: Keyword.fetch!(opts, :user_id)
}}
end
@impl true
def callbacks(config) do
%{
on_llm_token_usage: fn chain, usage ->
%{model: model_name} = chain.llm
:telemetry.execute(
[:myapp, :llm, :token_usage],
%{input: usage.input, output: usage.output},
%{
service: config.service_name,
model: model_name,
customer_id: config.customer_id,
user_id: config.user_id
}
)
end,
on_tool_execution_started: fn _chain, tool_call, _function ->
:telemetry.execute(
[:myapp, :tool, :started],
%{system_time: System.system_time()},
%{
tool: tool_call.name,
customer_id: config.customer_id,
user_id: config.user_id
}
)
end,
on_tool_execution_completed: fn _chain, tool_call, _tool_result ->
:telemetry.execute(
[:myapp, :tool, :completed],
%{system_time: System.system_time()},
%{
tool: tool_call.name,
customer_id: config.customer_id,
user_id: config.user_id
}
)
end,
on_tool_execution_failed: fn _chain, tool_call, error ->
:telemetry.execute(
[:myapp, :tool, :failed],
%{system_time: System.system_time()},
%{
tool: tool_call.name,
error: inspect(error),
customer_id: config.customer_id,
user_id: config.user_id
}
)
end
}
end
endWhen creating the agent, pass the context from your application:
# In your Coordinator, Factory, or wherever agents are created
{:ok, agent} = Sagents.Agent.new(%{
model: model,
middleware: [
{MyApp.Middleware.Observability,
service_name: "support-agent",
customer_id: customer.id,
user_id: current_user.id},
# ... other middleware
]
})Now every telemetry event carries the customer and user context, so you can slice metrics by customer, track per-user usage for billing, or correlate tool failures with specific accounts.
Available Callback Keys
The callbacks/1 function returns a map of callback keys to handler functions. Here is the complete list of available keys with their signatures:
Model-Level Callbacks
| Key | Signature | When it fires |
|---|---|---|
:on_llm_new_delta | fn chain, [delta] -> any() | Each streaming token/delta received |
:on_llm_new_message | fn chain, message -> any() | Complete message from LLM (non-streaming) |
:on_llm_ratelimit_info | fn chain, info_map -> any() | Rate limit headers from provider |
:on_llm_token_usage | fn chain, token_usage -> any() | Token usage for the request |
:on_llm_response_headers | fn chain, headers_map -> any() | Raw HTTP response headers |
Chain-Level Callbacks
| Key | Signature | When it fires |
|---|---|---|
:on_message_processed | fn chain, message -> any() | Message fully processed (streaming or not) |
:on_message_processing_error | fn chain, message -> any() | Error while processing a message |
:on_error_message_created | fn chain, message -> any() | Automated error response message created |
:on_tool_call_identified | fn chain, tool_call, function -> any() | Tool call detected during streaming |
:on_tool_execution_started | fn chain, tool_call, function -> any() | Tool begins executing (fires in the parent chain process, before any per-tool async Task is spawned) |
:on_tool_pre_execution | fn chain, tool_call, function -> any() | Fires inside the process that runs the tool, immediately before invocation. For async: true tools this is the spawned Task.async/1; for sync tools and HITL-resumed tools it is the chain's own process. Use this for code that depends on per-process state (OpenTelemetry context, Sentry scope, tenancy, Logger metadata) |
:on_tool_execution_completed | fn chain, tool_call, tool_result -> any() | Tool finished successfully |
:on_tool_execution_failed | fn chain, tool_call, error -> any() | Tool execution errored |
:on_tool_response_created | fn chain, message -> any() | Tool response message created |
:on_retries_exceeded | fn chain -> any() | Max retries exhausted |
All handler return values are discarded. Callbacks are for observation only — they cannot modify the chain or its state.
Tip: Extracting the model name. The
chainargument is anLLMChainstruct. You can extract the model name with%{model: model_name} = chain.llm. This works regardless of which chat model provider is being used (Anthropic, OpenAI, etc.), since they all have a:modelfield. This is especially useful in token usage callbacks for cost tracking across different models.
Fan-Out Behavior
When multiple middleware declare callbacks, all handlers fire for each event. If two middleware both define on_llm_token_usage, both handlers execute. This means you can have separate middleware for metrics, logging, and tracing without them interfering with each other.
{:ok, agent} = Sagents.Agent.new(%{
model: model,
middleware: [
{MyApp.Middleware.Metrics, customer_id: customer.id},
{MyApp.Middleware.AuditLog, user_id: user.id},
# ... other middleware
]
})Both Metrics and AuditLog can declare callbacks/1 and both will fire.
Sub-Agent Propagation
By default, an agent's middleware stack is passed down to any sub-agents it spawns. This means your observability middleware automatically covers the entire agent tree — the parent agent and all of its sub-agents — without any extra configuration.
If your parent agent is configured with:
{:ok, agent} = Sagents.Agent.new(%{
model: model,
middleware: [
{MyApp.Middleware.Observability,
service_name: "support-agent",
customer_id: customer.id,
user_id: current_user.id},
Sagents.Middleware.SubAgent,
# ... other middleware
]
})When this agent spawns sub-agents, those sub-agents inherit the same middleware stack including your observability middleware. Token usage, tool execution, and errors from sub-agents all emit the same telemetry events with the same customer and user context as the parent. You get full visibility across the entire agent interaction without any additional wiring.
Full Example: Comprehensive Observability
Here's a more complete example that covers the most useful events for production observability:
defmodule MyApp.Middleware.Observability do
@behaviour Sagents.Middleware
require Logger
@impl true
def init(opts) do
{:ok, %{
service_name: Keyword.get(opts, :service_name, "agent"),
customer_id: Keyword.get(opts, :customer_id),
user_id: Keyword.get(opts, :user_id)
}}
end
@impl true
def callbacks(config) do
metadata = %{
service: config.service_name,
customer_id: config.customer_id,
user_id: config.user_id
}
%{
# Track token usage for cost monitoring and billing
on_llm_token_usage: fn chain, usage ->
%{model: model_name} = chain.llm
:telemetry.execute(
[:myapp, :llm, :token_usage],
%{input: usage.input, output: usage.output},
Map.put(metadata, :model, model_name)
)
end,
# Track rate limits to detect throttling
on_llm_ratelimit_info: fn _chain, info ->
:telemetry.execute(
[:myapp, :llm, :ratelimit],
info,
metadata
)
end,
# Track tool execution lifecycle
on_tool_execution_started: fn _chain, tool_call, _function ->
:telemetry.execute(
[:myapp, :tool, :started],
%{system_time: System.system_time()},
Map.put(metadata, :tool, tool_call.name)
)
end,
on_tool_execution_completed: fn _chain, tool_call, _tool_result ->
:telemetry.execute(
[:myapp, :tool, :completed],
%{system_time: System.system_time()},
Map.put(metadata, :tool, tool_call.name)
)
end,
on_tool_execution_failed: fn _chain, tool_call, error ->
Logger.warning("Tool #{tool_call.name} failed: #{inspect(error)}",
customer_id: config.customer_id
)
:telemetry.execute(
[:myapp, :tool, :failed],
%{system_time: System.system_time()},
Map.merge(metadata, %{tool: tool_call.name, error: inspect(error)})
)
end,
# Track when retries are exhausted (potential reliability issue)
on_retries_exceeded: fn _chain ->
:telemetry.execute(
[:myapp, :llm, :retries_exceeded],
%{count: 1},
metadata
)
end
}
end
endAttaching Telemetry Handlers
Wire up the telemetry events in your application startup:
# In your Application.start/2 or a dedicated Telemetry module
:telemetry.attach_many(
"myapp-agent-metrics",
[
[:myapp, :llm, :token_usage],
[:myapp, :tool, :started],
[:myapp, :tool, :completed],
[:myapp, :tool, :failed],
[:myapp, :llm, :retries_exceeded]
],
&MyApp.Telemetry.handle_event/4,
nil
)Testing Your Observability Middleware
Test that your callbacks fire and emit the expected telemetry events:
defmodule MyApp.Middleware.ObservabilityTest do
use ExUnit.Case
alias MyApp.Middleware.Observability
test "callbacks/1 returns expected callback keys" do
{:ok, config} = Observability.init(
service_name: "test",
customer_id: "cust-1",
user_id: "user-1"
)
callbacks = Observability.callbacks(config)
assert is_function(callbacks[:on_llm_token_usage], 2)
assert is_function(callbacks[:on_tool_execution_started], 3)
assert is_function(callbacks[:on_tool_execution_completed], 3)
assert is_function(callbacks[:on_tool_execution_failed], 3)
end
test "on_llm_token_usage emits telemetry event" do
{:ok, config} = Observability.init(
service_name: "test",
customer_id: "cust-1",
user_id: "user-1"
)
ref = :telemetry_test.attach_event_handlers(self(), [
[:myapp, :llm, :token_usage]
])
callbacks = Observability.callbacks(config)
usage = %LangChain.TokenUsage{input: 100, output: 50}
callbacks.on_llm_token_usage.(nil, usage)
assert_received {[:myapp, :llm, :token_usage], ^ref,
%{input: 100, output: 50},
%{service: "test", customer_id: "cust-1", user_id: "user-1"}}
end
endPropagating Caller Context Across Process Boundaries
Observability that depends on per-process state — OpenTelemetry trace context, Sentry context, request-scoped logger metadata, multi-tenant context — runs into a structural problem: a Sagents agent crosses three process boundaries during a single invocation, and per-process state does not cross any of them automatically.
The boundaries:
- Caller → AgentServer GenServer. The agent's lifecycle hooks run inside a supervised GenServer, not the process that created the agent.
- AgentServer → chain Task. Each LLM turn spawns a
Taskfor the chain run. - Chain Task → per-tool async Task.
LangChain.Functions declared withasync: truerun in freshTask.async/1processes.
Without explicit propagation, an OpenTelemetry span started by your tool runs detached from the parent trace, a Sentry exception captured during tool execution arrives without user/request context, and a tenant-scoped DB query in a tool raises because Process.get(:org_id) returns nil.
Sagents.Middleware.ProcessContext
The built-in Sagents.Middleware.ProcessContext middleware closes all three boundaries with one configuration block. Add it as the first middleware in your stack so its before_model/2 runs before any other middleware that might query the Repo or open a span.
{:ok, agent} = Sagents.Agent.new(%{
model: model,
middleware: [
{Sagents.Middleware.ProcessContext,
keys: [:sentry_context],
propagators: [
{&OpenTelemetry.get_current/0, &OpenTelemetry.attach/1},
{&MyApp.Tenancy.get_context/0, &MyApp.Tenancy.set_context/1}
]},
# ... your other middleware (Observability, TodoList, etc.)
]
})Two configuration options, both optional, freely combined:
:keys— list of process-dictionary keys (atoms). Each key has its value captured viaProcess.get/1in the caller's process atinit/1time, then re-applied withProcess.put/2on the receiving side of every boundary. Use for state that genuinely lives in the process dict, like:sentry_context.:propagators— list of{capture_fn, apply_fn}pairs.capture_fnis 0-arity, called once atinit/1in the caller's process.apply_fnis 1-arity, called on the receiving side of each boundary with the captured value. Use for state that lives somewhere other than the process dict — OpenTelemetry's context stash, ETS-backed contexts, application-specific tenancy modules.
The propagator pairs are read like English at the call site: {&OpenTelemetry.get_current/0, &OpenTelemetry.attach/1} says "capture the current OTel context, re-apply it on the other side." There is no hidden behavior — the middleware does exactly what the pair specifies.
Refreshing the snapshot for long-lived agents
init/1 captures once, at agent construction time. For agents that handle a single request and are then discarded — a fresh agent per LiveView mount, per Coordinator session, per Oban job — that one capture is the only one you need.
For long-lived agents that handle many user messages over time — a conversation-scoped AgentServer reused across hours of user interaction, for example — the captured snapshot goes stale. The OTel trace ID, the Sentry context, the active tenant might all be different by the time message #50 arrives.
update/1 refreshes it. The middleware already has the spec from init/1, so the caller only supplies the agent_id. Capture functions run in the caller of update/1 against its current process dictionary, then the new snapshot replaces the stored snapshot in the agent's state.metadata:
# In a LiveView handle_event, an Oban worker, a Phoenix controller — anywhere
# a new request boundary is crossed before relaying a message to the agent:
def handle_event("send_message", %{"text" => text}, socket) do
Sagents.Middleware.ProcessContext.update(socket.assigns.agent_id)
Sagents.AgentServer.add_message(socket.assigns.agent_id, Message.new_user!(text))
{:noreply, socket}
endBoth the update/1 call and the add_message/2 call go through the same AgentServer mailbox in order, so the refresh always lands before the next execute begins.
update/1 returns:
:okon success{:error, :not_found}if no AgentServer is running for thatagent_id{:error, :no_process_context_middleware}if the agent is running but doesn't haveProcessContextin its middleware stack
Important: within-execute consistency
A single execute_loop is one logical request — one user message resolved by potentially many LLM turns and tool calls. Within that loop, the snapshot is intentionally frozen. An update/1 call arriving mid-execute does not retarget in-flight tools; the chain captured its custom_context snapshot when the loop began, and per-tool callbacks see that snapshot.
This is the right behavior. A single request should see one consistent context for its duration — interleaving partial OTel contexts or two different tenant scopes inside one logical operation would be a correctness disaster, not a feature. Refresh between requests, not during one.
If you genuinely need to retarget mid-flight (rare), the right tool is to interrupt the agent (Sagents.Agent.resume/3 after an interrupt), update the context, and resume.
Sub-agent propagation
ProcessContext is just another middleware, and middleware stacks are inherited by sub-agents by default (see Sub-Agent Propagation above). Configure it once at the top-level agent and every sub-agent spawned by Sagents.Middleware.SubAgent inherits the same propagation behaviour and the same update/1 plumbing automatically.