Sagents.Middleware behaviour (Sagents v0.8.0-rc.8)
Copy MarkdownBehavior for DeepAgent middleware components.
Middleware provides a composable pattern for adding capabilities to agents. Each middleware component can contribute:
- System prompt additions
- Tools (Functions)
- State schema modifications
- Pre/post processing hooks
- LLM callback handlers (token usage, tool execution, message processing)
Middleware Lifecycle
- Initialization -
init/1is called when middleware is configured - Tool Collection -
tools/1provides tools to add to the agent - Prompt Assembly -
system_prompt/1contributes to the system prompt - Callback Collection -
callbacks/1provides LLM event handlers - Before Model -
before_model/2preprocesses state before LLM call - After Model -
after_model/2postprocesses state after LLM response
Example
defmodule MyMiddleware do
@behaviour Sagents.Middleware
@impl true
def init(opts) do
config = %{enabled: Keyword.get(opts, :enabled, true)}
{:ok, config}
end
@impl true
def system_prompt(_config) do
"You have access to custom capabilities."
end
@impl true
def tools(_config) do
[my_custom_tool()]
end
@impl true
def callbacks(_config) do
%{
on_llm_token_usage: fn _chain, usage ->
Logger.info("Token usage: #{inspect(usage)}")
end
}
end
@impl true
def before_model(state, _config) do
# Preprocess state
{:ok, state}
end
endMiddleware Configuration
Middleware can be specified as:
- Module name:
MyMiddleware - Tuple with options:
{MyMiddleware, [enabled: true]}
Restorable interrupts
When an agent is interrupted (e.g. an ask_user question, a HITL approval
request, a sub-agent waiting on the user), the active ToolResult carries
is_interrupt: true plus a payload in the (virtual) interrupt_data field.
Persistence rounds that payload through JSONB. interrupt_data is encoded
opaquely as a Base64 binary alongside the rest of the tool result and decoded
on the way back. Because the data is just bytes to the storage layer, the
framework needs middleware help to decide whether each persisted interrupt
is safe to resume after a process restart.
Implement restorable_interrupt?/1 if your middleware can resume one of
its own interrupt types from a cold start. The framework calls each
middleware's callback at state-load time; if no middleware claims the data,
the interrupted tool result is converted to an error result so the LLM sees
a recoverable failure and conversation continues.
Convention
Middleware "owns" an interrupt type by pattern-matching on the :type
atom that it puts into interrupt_data. Pattern-match on that same atom
in restorable_interrupt?/1:
def restorable_interrupt?(%{type: :ask_user_question}), do: true
def restorable_interrupt?(_), do: falseThe default is false (conservative — opting in is a deliberate decision).
When NOT to opt in
Only opt in if your middleware's resume path requires no in-memory context
beyond what is in interrupt_data itself. Specifically: no GenServer
references, no PIDs, no monitors, no ETS handles. If the original process
is gone, can the data alone tell the middleware what to do? If yes, opt in.
If no, leave the default and let the framework demote.
Example: Sagents.Middleware.SubAgent deliberately does not implement
this callback — its interrupts reference a sub-agent process that is gone
with the BEAM, so demotion is the only correct behaviour.
Schema evolution
If you change the shape of your interrupt_data map, old persisted rows
may either:
- Fail to decode (unknown atoms — Erlang's
binary_to_term(_, [:safe])refuses to materialize atoms it doesn't already know). - Decode but fail at resume time because the shape no longer matches.
Both failures are caught and converted to an error result. The conversation
continues, the LLM re-asks if needed. This is the intended failure mode.
Do not work around it with :unsafe decoding (would re-introduce atom-table
exhaustion as a DoS vector).
Data-only restriction
Anything you place in interrupt_data should be data, not behaviour:
atoms, strings, numbers, lists, maps, tuples are fine. Functions, PIDs,
references, ports — never. None of those round-trip through serialization.
Summary
Callbacks
Process state after receiving LLM response.
Process state before it's sent to the LLM.
Provide LangChain callback handlers for this middleware.
Optional. Returns a debugger-friendly representation of this middleware's runtime config, suitable for rendering in the live debugger's Middleware tab.
Handle messages sent to this middleware.
Handle a resume after an interrupt.
Initialize middleware with configuration options.
Called when the AgentServer starts or restarts.
Return true if this middleware can resume an interrupted tool call from
the given interrupt_data after a process restart (no in-memory context
required), or false otherwise.
Provide the state schema module for this middleware.
Provide system prompt text for this middleware.
Provide tools (Functions) that this middleware adds to the agent.
Functions
Apply after_model hook from middleware.
Apply before_model hook from middleware.
Apply handle_message callback from middleware.
Apply handle_resume callback from middleware.
Apply on_server_start callback from middleware.
Ask a middleware whether it can resume the given interrupt_data from a
cold start. Returns false if the callback isn't implemented, returns
false if the callback raises (a buggy middleware must not crash the load
path), and otherwise returns the callback's boolean result.
Collect callback handler maps from all middleware.
Get LLM callback handlers from middleware.
Get system prompt from middleware.
Get tools from middleware.
Initialize a middleware module with its configuration. Returns a MiddlewareEntry struct.
Normalize middleware specification to {module, config} tuple.
Types
@type config() :: keyword()
@type middleware_config() :: any()
@type middleware_result() :: {:ok, Sagents.State.t()} | {:interrupt, Sagents.State.t(), any()} | {:error, term()}
Callbacks
@callback after_model(Sagents.State.t(), middleware_config()) :: middleware_result()
Process state after receiving LLM response.
Receives the state after the LLM has responded and can modify the response, extract information, or update state.
Defaults to {:ok, state} if not implemented.
Parameters
state- The currentSagents.Statestruct (with LLM response)config- The middleware configuration frominit/1
Returns
{:ok, updated_state}- Success with potentially modified state{:interrupt, state, interrupt_data}- Pause execution for human intervention{:error, reason}- Failure, halts execution
@callback before_model(Sagents.State.t(), middleware_config()) :: middleware_result()
Process state before it's sent to the LLM.
Receives the current agent state and can modify messages, add context, or perform validation before the LLM is invoked.
Defaults to {:ok, state} if not implemented.
Parameters
state- The currentSagents.Statestructconfig- The middleware configuration frominit/1
Returns
{:ok, updated_state}- Success with potentially modified state{:error, reason}- Failure, halts execution
@callback callbacks(middleware_config()) :: map()
Provide LangChain callback handlers for this middleware.
Receives the middleware configuration from init/1 and returns a callback
handler map compatible with LangChain.Chains.LLMChain.add_callback/2.
This allows middleware to observe LLM events such as token usage, tool
execution, and message processing.
When multiple middleware declare callbacks, all handlers are collected and fire in fan-out fashion (every matching handler from every middleware fires).
Defaults to empty map (%{}) if not implemented. Return %{} for no callbacks.
Parameters
config- The middleware configuration frominit/1
Returns
- A map of callback keys to handler functions
Available Callback Keys
These are the LangChain-native keys supported by LLMChain. Use only these
keys in your callback map (see LangChain.Chains.ChainCallbacks for full
type signatures):
Model-level callbacks:
:on_llm_new_delta- Streaming token/delta received:on_llm_new_message- Complete message from LLM:on_llm_ratelimit_info- Rate limit headers from provider:on_llm_token_usage- Token usage information:on_llm_response_headers- Raw response headers
Chain-level callbacks:
:on_message_processed- Message fully processed by chain:on_message_processing_error- Error processing a message:on_error_message_created- Error message created:on_tool_call_identified- Tool call detected during streaming:on_tool_execution_started- Tool begins executing:on_tool_execution_completed- Tool finished successfully:on_tool_execution_failed- Tool execution errored:on_tool_response_created- Tool response message created:on_retries_exceeded- Max retries exhausted
Example
def callbacks(_config) do
%{
on_llm_token_usage: fn _chain, usage ->
Logger.info("Token usage: #{inspect(usage)}")
end,
on_message_processed: fn _chain, message ->
Logger.info("Message: #{inspect(message)}")
end
}
end
@callback debug_summary(middleware_config()) :: map() | String.t()
Optional. Returns a debugger-friendly representation of this middleware's runtime config, suitable for rendering in the live debugger's Middleware tab.
Use this when your config holds large structures (compiled agents, caches, big maps) that would dominate the inspect output and slow down the debugger UI. Keep the result small — this is for human reading.
Return either a map (rendered as a key/value config table) or a string (rendered verbatim in a code block). If not implemented, the debugger falls back to inspecting the raw config map with bounded limits.
@callback handle_message(message :: term(), Sagents.State.t(), middleware_config()) :: {:ok, Sagents.State.t()} | {:error, term()}
Handle messages sent to this middleware.
Messages are routed to a specific middleware by ID through the AgentServer's
middleware registry. Any process can send a targeted message to a middleware
using AgentServer.notify_middleware/3.
This enables two primary patterns:
1. External notifications
LiveViews, controllers, or other processes can send context updates to a
running middleware. The middleware updates state metadata, which before_model/2
reads on the next LLM call.
# In a LiveView — user switched to editing a different blog post
AgentServer.notify_middleware(agent_id, MyApp.UserContext, {:post_changed, %{
slug: "/blog/getting-started-with-elixir",
title: "Getting Started with Elixir"
}})
# In the middleware
def handle_message({:post_changed, post_info}, state, _config) do
{:ok, State.put_metadata(state, "current_post", post_info)}
end2. Async task results
Middleware that spawns background tasks sends results back to itself for state updates.
def handle_message({:title_generated, title}, state, _config) do
{:ok, State.put_metadata(state, "conversation_title", title)}
endDefaults to {:ok, state} if not implemented.
Parameters
message- The message payload (any term — typically a tagged tuple)state- The currentSagents.Statestructconfig- The middleware configuration frominit/1
Returns
{:ok, updated_state}- Success with potentially modified state{:error, reason}- Failure (logged but does not halt agent execution)
@callback handle_resume( Sagents.Agent.t(), Sagents.State.t(), resume_data :: term(), middleware_config(), opts :: keyword() ) :: {:ok, Sagents.State.t()} | {:cont, Sagents.State.t()} | {:interrupt, Sagents.State.t(), interrupt_data :: map()} | {:error, term()}
Handle a resume after an interrupt.
Called by Sagents.Agent.resume/3 to give each middleware a chance to claim and handle
an interrupt. The middleware should pattern-match on state.interrupt_data to
decide whether the interrupt belongs to it.
Return Values
{:cont, state}- "Not mine, pass to next middleware." Default when not implemented.{:ok, updated_state}- "Handled. State is ready for re-execution." Halts the chain.{:interrupt, state, new_interrupt_data}- "Handled, but needs another round." Halts the chain.{:error, reason}- "Handled, but invalid." Halts the chain.
Parameters
agent- TheSagents.Agentstructstate- The currentSagents.Statestruct (with interrupt_data set)resume_data- The data provided by the caller to resume execution (polymorphic)config- The middleware configuration frominit/1opts- Options fromSagents.Agent.resume/3(includes:callbacksfor LLMChain event handlers)
@callback init(config()) :: {:ok, middleware_config()} | {:error, term()}
Initialize middleware with configuration options.
Called once when the middleware is added to an agent. Returns configuration that will be passed to other callbacks.
Convention
- Input:
optsas keyword list - Output:
configas map for efficient runtime access
Defaults to converting opts to a map if not implemented.
Example
def init(opts) do
config = %{
enabled: Keyword.get(opts, :enabled, true),
max_retries: Keyword.get(opts, :max_retries, 3)
}
{:ok, config}
end
@callback on_server_start(Sagents.State.t(), middleware_config()) :: {:ok, Sagents.State.t()} | {:error, term()}
Called when the AgentServer starts or restarts.
This allows middleware to perform initialization actions that require the AgentServer to be running, such as broadcasting initial state to subscribers (e.g., TODOs for UI display).
Receives the current state and middleware config.
Returns {:ok, state} (state is not typically modified here but could be).
Defaults to {:ok, state} if not implemented.
Parameters
state- The currentSagents.Statestructconfig- The middleware configuration frominit/1
Returns
{:ok, state}- Success (state typically unchanged){:error, reason}- Failure (logged but does not halt agent)
Example
def on_server_start(state, _config) do
# Broadcast initial todos when AgentServer starts
broadcast_todos(state.agent_id, state.todos)
{:ok, state}
end
Return true if this middleware can resume an interrupted tool call from
the given interrupt_data after a process restart (no in-memory context
required), or false otherwise.
Called at state-load time for every LangChain.Message.ToolResult with
is_interrupt: true. The framework asks each middleware in the agent's
middleware list; if no middleware says true, the interrupt is converted
to an error result and the LLM is told the prior call could not be resumed.
Default: false (every middleware must opt in).
Recommended pattern — pattern-match on the middleware's own :type value
in interrupt_data and return true only for that type:
def restorable_interrupt?(%{type: :ask_user_question}), do: true
def restorable_interrupt?(_), do: falseSee the "Restorable interrupts" section in the module documentation for the full guidance, including the data-only restriction and schema evolution caveats.
@callback state_schema() :: module() | nil
Provide the state schema module for this middleware.
If the middleware needs to add fields to the agent state, it should return a module that defines those fields.
Defaults to nil if not implemented.
@callback system_prompt(middleware_config()) :: String.t() | [String.t()]
Provide system prompt text for this middleware.
Can return a single string or list of strings that will be joined.
Defaults to empty string if not implemented.
@callback tools(middleware_config()) :: [LangChain.Function.t()]
Provide tools (Functions) that this middleware adds to the agent.
Defaults to empty list if not implemented.
Functions
@spec apply_after_model(Sagents.State.t(), Sagents.MiddlewareEntry.t()) :: middleware_result()
Apply after_model hook from middleware.
Parameters
state- The current agent state (with LLM response)entry- MiddlewareEntry struct with module and config
Returns
{:ok, updated_state}- Success with potentially modified state{:error, reason}- Error from middleware
@spec apply_before_model(Sagents.State.t(), Sagents.MiddlewareEntry.t()) :: middleware_result()
Apply before_model hook from middleware.
Parameters
state- The current agent stateentry- MiddlewareEntry struct with module and config
Returns
{:ok, updated_state}- Success with potentially modified state{:error, reason}- Error from middleware
@spec apply_handle_message(term(), Sagents.State.t(), Sagents.MiddlewareEntry.t()) :: {:ok, Sagents.State.t()} | {:error, term()}
Apply handle_message callback from middleware.
Parameters
message- The message payload to handlestate- The current agent stateentry- MiddlewareEntry struct with module and config
Returns
{:ok, updated_state}- Success with potentially modified state{:error, reason}- Error from middleware
@spec apply_handle_resume( Sagents.Agent.t(), Sagents.State.t(), term(), Sagents.MiddlewareEntry.t(), keyword() ) :: {:ok, Sagents.State.t()} | {:cont, Sagents.State.t()} | {:interrupt, Sagents.State.t(), map()} | {:error, term()}
Apply handle_resume callback from middleware.
Returns {:cont, state} if the middleware does not implement the callback,
allowing the next middleware in the stack to try.
Parameters
agent- The Agent structstate- The current agent state (with interrupt_data)resume_data- The polymorphic resume data from the callerentry- MiddlewareEntry struct with module and config
Returns
{:cont, state}- Middleware does not handle this interrupt{:ok, updated_state}- Interrupt handled, state ready for re-execution{:interrupt, state, new_interrupt_data}- Handled but needs another round{:error, reason}- Handled but invalid
@spec apply_on_server_start(Sagents.State.t(), Sagents.MiddlewareEntry.t()) :: {:ok, Sagents.State.t()} | {:error, term()}
Apply on_server_start callback from middleware.
Called when the AgentServer starts to allow middleware to perform initialization actions like broadcasting initial state.
Parameters
state- The current agent stateentry- MiddlewareEntry struct with module and config
Returns
{:ok, state}- Success (state typically unchanged){:error, reason}- Error from middleware
@spec apply_restorable_interrupt?(Sagents.MiddlewareEntry.t(), map()) :: boolean()
Ask a middleware whether it can resume the given interrupt_data from a
cold start. Returns false if the callback isn't implemented, returns
false if the callback raises (a buggy middleware must not crash the load
path), and otherwise returns the callback's boolean result.
Parameters
entry- MiddlewareEntry struct with module and configinterrupt_data- The decoded interrupt data map from the tool result
@spec collect_callbacks([Sagents.MiddlewareEntry.t()]) :: [map()]
Collect callback handler maps from all middleware.
Calls get_callbacks/1 on each middleware entry and filters out nils.
Returns a list of callback handler maps suitable for passing
to LLMChain.add_callback/2.
Get LLM callback handlers from middleware.
Returns the callback handler map from the middleware's callbacks/1 callback,
or nil if the callback is not implemented.
Get system prompt from middleware.
Get tools from middleware.
Initialize a middleware module with its configuration. Returns a MiddlewareEntry struct.
Configuration Convention
- Input
optsshould be a keyword list - Returned
configshould be a map for efficient runtime access
Normalize middleware specification to {module, config} tuple.
Accepts:
- Module atom:
MyMiddleware->{MyMiddleware, []} - Tuple with keyword list:
{MyMiddleware, [key: value]}->{MyMiddleware, [key: value]}