Normandy.Agents.BaseAgent
(normandy v0.6.1)
View Source
Core agent implementation providing conversational AI capabilities.
BaseAgent manages agent state, memory, and LLM interactions through a stateful configuration approach.
Summary
Functions
Adds an MCP server configuration for server-side MCP.
Gets a tool from the agent's tool registry by name.
Checks if the agent has any tools registered.
Lists all tools available to the agent.
Process multiple inputs through the agent concurrently.
Process a batch and return detailed statistics.
Discovers and registers client-side MCP tools in the agent's tool registry.
Registers a tool in the agent's tool registry.
Run the agent with optional streaming support.
Run the agent with tool calling support.
Stream a response from the LLM with real-time callbacks.
Stream responses with tool calling support.
Types
@type config_input() :: %{ :client => struct(), :model => String.t(), :temperature => float(), optional(:input_schema) => struct(), optional(:output_schema) => struct(), optional(:memory) => map(), optional(:prompt_specification) => Normandy.Components.PromptSpecification.t(), optional(:max_tokens) => pos_integer() | nil, optional(:tool_registry) => Normandy.Tools.Registry.t(), optional(:max_tool_iterations) => pos_integer(), optional(:max_tool_concurrency) => integer(), optional(:retry_options) => keyword(), optional(:enable_circuit_breaker) => boolean(), optional(:circuit_breaker_options) => keyword(), optional(:enable_json_retry) => boolean(), optional(:json_retry_max_attempts) => pos_integer(), optional(:input_guardrails) => [Normandy.Guardrails.spec()], optional(:output_guardrails) => [Normandy.Guardrails.spec()], optional(:output_guardrails_streaming_mode) => :accumulate | :incremental, optional(:output_guardrails_chunk_size) => pos_integer() }
Functions
@spec add_mcp_server(Normandy.Agents.BaseAgentConfig.t(), struct() | map()) :: Normandy.Agents.BaseAgentConfig.t()
Adds an MCP server configuration for server-side MCP.
The server config is passed to the Anthropic API, which connects to the MCP server on your behalf.
Examples
alias Normandy.MCP.ServerConfig
server = ServerConfig.new("my_server", "https://mcp.example.com/sse")
agent = BaseAgent.add_mcp_server(agent, server)
@spec delete_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom()) :: Normandy.Agents.BaseAgentConfig.t()
@spec get_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom()) :: struct()
@spec get_response(Normandy.Agents.BaseAgentConfig.t(), struct() | nil) :: struct()
@spec get_tool(Normandy.Agents.BaseAgentConfig.t(), String.t()) :: {:ok, struct()} | :error
Gets a tool from the agent's tool registry by name.
Examples
iex> agent = BaseAgent.register_tool(agent, %Calculator{})
iex> BaseAgent.get_tool(agent, "calculator")
{:ok, %Calculator{}}
@spec has_tools?(Normandy.Agents.BaseAgentConfig.t()) :: boolean()
Checks if the agent has any tools registered.
Examples
iex> BaseAgent.has_tools?(agent)
true
@spec init(config_input()) :: Normandy.Agents.BaseAgentConfig.t()
@spec list_tools(Normandy.Agents.BaseAgentConfig.t()) :: [struct()]
Lists all tools available to the agent.
Examples
iex> BaseAgent.list_tools(agent)
[%Calculator{}, %StringManipulator{}]
@spec process_batch(Normandy.Agents.BaseAgentConfig.t(), [term()], keyword()) :: {:ok, [term()] | map()}
Process multiple inputs through the agent concurrently.
Provides efficient batch processing with configurable concurrency.
Delegates to Normandy.Batch.Processor.process_batch/3.
Options
:max_concurrency- Maximum concurrent tasks (default: 10):ordered- Preserve input order in results (default: true):timeout- Timeout per task in milliseconds (default: 300_000ms):on_progress- Callback function called after each completion:on_error- Callback function called on each error
Examples
# Simple batch
inputs = [
%{chat_message: "Hello"},
%{chat_message: "How are you?"}
]
{:ok, results} = BaseAgent.process_batch(agent, inputs)
# With options
{:ok, results} = BaseAgent.process_batch(
agent,
inputs,
max_concurrency: 5
)
@spec process_batch_with_stats( Normandy.Agents.BaseAgentConfig.t(), [term()], keyword() ) :: {:ok, map()}
Process a batch and return detailed statistics.
Returns success/error breakdown with counts.
Examples
{:ok, stats} = BaseAgent.process_batch_with_stats(agent, inputs)
#=> %{
success: [result1, result2],
errors: [{input3, error}],
total: 3,
success_count: 2,
error_count: 1
}
@spec register_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom(), struct()) :: Normandy.Agents.BaseAgentConfig.t()
@spec register_mcp_tools( Normandy.Agents.BaseAgentConfig.t(), module(), term(), keyword() ) :: {:ok, Normandy.Agents.BaseAgentConfig.t()} | {:error, term()}
Discovers and registers client-side MCP tools in the agent's tool registry.
Connects to an MCP server via the specified adapter, discovers available tools, and registers them as Normandy tools that can be used in the agent's tool loop.
Options
:prefix- Namespace prefix for tool names (e.g.,"my_server")
Examples
agent = BaseAgent.register_mcp_tools(agent, MyAdapter, mcp_client, prefix: "server1")
@spec register_tool( Normandy.Agents.BaseAgentConfig.t(), struct() ) :: Normandy.Agents.BaseAgentConfig.t()
Registers a tool in the agent's tool registry.
Creates a new registry if one doesn't exist.
Examples
iex> agent = BaseAgent.init(config)
iex> tool = %Normandy.Tools.Examples.Calculator{}
iex> agent = BaseAgent.register_tool(agent, tool)
@spec reset_memory(Normandy.Agents.BaseAgentConfig.t()) :: Normandy.Agents.BaseAgentConfig.t()
@spec run(Normandy.Agents.BaseAgentConfig.t(), struct() | map() | nil) :: {Normandy.Agents.BaseAgentConfig.t(), struct()}
Run the agent with optional streaming support.
Accepts a keyword list as the third argument with options:
:stream- Boolean, enables streaming mode:on_chunk- Callback function for streaming chunks
Example
BaseAgent.run(agent, %{chat_message: "Hello"}, stream: true, on_chunk: fn chunk ->
IO.write(chunk)
end)
@spec run_with_tools(Normandy.Agents.BaseAgentConfig.t(), struct() | nil) :: {Normandy.Agents.BaseAgentConfig.t(), struct()}
Run the agent with tool calling support.
This method handles the full tool execution loop:
- Send user input to LLM
- If LLM requests tool calls, execute them
- Send tool results back to LLM
- Repeat until LLM provides final response or max iterations reached
Parameters
- config: Agent configuration
- user_input: Optional user input to start the conversation
Returns
Tuple of {updated_config, final_response}
Examples
iex> {config, response} = BaseAgent.run_with_tools(agent, user_input)
@spec stream_response(Normandy.Agents.BaseAgentConfig.t(), struct() | nil, function()) :: {Normandy.Agents.BaseAgentConfig.t(), map()}
Stream a response from the LLM with real-time callbacks.
This method enables streaming mode, allowing you to process LLM responses as they're generated. Callbacks are invoked for each event type.
Parameters
- config: Agent configuration
- user_input: Optional user input (can be nil to continue conversation)
- callback: Function
(event_type, data) -> :okcalled for each event
Event Types
:text_delta- Incremental text content:tool_use_start- Tool call beginning:thinking_delta- Extended thinking content:message_start- Stream beginning:message_stop- Stream complete
Returns
{config, final_response} - Updated config and accumulated response
Example
callback = fn
:text_delta, text -> IO.write(text)
:tool_use_start, tool -> IO.puts("\nCalling tool: #{tool["name"]}")
_, _ -> :ok
end
{agent, response} = BaseAgent.stream_response(agent, input, callback)
@spec stream_with_tools( Normandy.Agents.BaseAgentConfig.t(), struct() | nil, function() ) :: {Normandy.Agents.BaseAgentConfig.t(), struct()}
Stream responses with tool calling support.
Combines streaming and tool execution - as the LLM streams its response, tool calls are detected and executed, with results fed back into the stream.
Parameters
- config: Agent configuration
- user_input: Optional user input to start the conversation
- callback: Function
(event_type, data) -> :okcalled for each event
Event Types
:text_delta- Incremental text content:tool_use_start- Tool call beginning:tool_result- Tool execution result (custom event):thinking_delta- Extended thinking content:message_start- Stream beginning:message_stop- Stream complete:guardrail_violation- Output guardrail violation
Returns
{config, final_response} - Updated config and accumulated response
Guardrail Semantics
If :output_guardrails_streaming_mode is :incremental and a violation
fires mid-stream, the current iteration is halted and any in-flight
tool_use content block is stripped from the returned response — the
caller won't execute a tool whose arguments were still streaming. Tool
results from earlier iterations remain in memory; memory commits
happen after each stream ends, not after the loop completes.
Example
callback = fn
:text_delta, text -> IO.write(text)
:tool_use_start, tool -> IO.puts("\nCalling tool: #{tool["name"]}")
:tool_result, result -> IO.puts("Tool result: #{inspect(result)}")
_, _ -> :ok
end
{agent, response} = BaseAgent.stream_with_tools(agent, input, callback)