Normandy.Agents.BaseAgent (normandy v0.6.2)

View Source

Core agent implementation providing conversational AI capabilities.

BaseAgent manages agent state, memory, and LLM interactions through a stateful configuration approach.

Summary

Functions

Adds an MCP server configuration for server-side MCP.

Gets a tool from the agent's tool registry by name.

Checks if the agent has any tools registered.

Lists all tools available to the agent.

Process multiple inputs through the agent concurrently.

Process a batch and return detailed statistics.

Discovers and registers client-side MCP tools in the agent's tool registry.

Registers a tool in the agent's tool registry.

Run the agent with optional streaming support.

Run the agent with tool calling support.

Stream a response from the LLM with real-time callbacks.

Stream responses with tool calling support.

Types

config_input()

@type config_input() :: %{
  :client => struct(),
  :model => String.t(),
  :temperature => float(),
  optional(:input_schema) => struct(),
  optional(:output_schema) => struct(),
  optional(:memory) => map(),
  optional(:prompt_specification) =>
    Normandy.Components.PromptSpecification.t(),
  optional(:max_tokens) => pos_integer() | nil,
  optional(:tool_registry) => Normandy.Tools.Registry.t(),
  optional(:max_tool_iterations) => pos_integer(),
  optional(:max_tool_concurrency) => integer(),
  optional(:retry_options) => keyword(),
  optional(:enable_circuit_breaker) => boolean(),
  optional(:circuit_breaker_options) => keyword(),
  optional(:enable_json_retry) => boolean(),
  optional(:json_retry_max_attempts) => pos_integer(),
  optional(:input_guardrails) => [Normandy.Guardrails.spec()],
  optional(:output_guardrails) => [Normandy.Guardrails.spec()],
  optional(:output_guardrails_streaming_mode) => :accumulate | :incremental,
  optional(:output_guardrails_chunk_size) => pos_integer()
}

Functions

add_mcp_server(config, server)

Adds an MCP server configuration for server-side MCP.

The server config is passed to the Anthropic API, which connects to the MCP server on your behalf.

Examples

alias Normandy.MCP.ServerConfig

server = ServerConfig.new("my_server", "https://mcp.example.com/sse")
agent = BaseAgent.add_mcp_server(agent, server)

delete_context_provider(config, provider_name)

get_context_provider(base_agent_config, provider_name)

@spec get_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom()) :: struct()

get_response(config, response_model \\ nil)

@spec get_response(Normandy.Agents.BaseAgentConfig.t(), struct() | nil) :: struct()

get_tool(base_agent_config, tool_name)

@spec get_tool(Normandy.Agents.BaseAgentConfig.t(), String.t()) ::
  {:ok, struct()} | :error

Gets a tool from the agent's tool registry by name.

Examples

iex> agent = BaseAgent.register_tool(agent, %Calculator{})
iex> BaseAgent.get_tool(agent, "calculator")
{:ok, %Calculator{}}

has_tools?(base_agent_config)

@spec has_tools?(Normandy.Agents.BaseAgentConfig.t()) :: boolean()

Checks if the agent has any tools registered.

Examples

iex> BaseAgent.has_tools?(agent)
true

init(config)

list_tools(base_agent_config)

@spec list_tools(Normandy.Agents.BaseAgentConfig.t()) :: [struct()]

Lists all tools available to the agent.

Examples

iex> BaseAgent.list_tools(agent)
[%Calculator{}, %StringManipulator{}]

process_batch(agent, inputs, opts \\ [])

@spec process_batch(Normandy.Agents.BaseAgentConfig.t(), [term()], keyword()) ::
  {:ok, [term()] | map()}

Process multiple inputs through the agent concurrently.

Provides efficient batch processing with configurable concurrency. Delegates to Normandy.Batch.Processor.process_batch/3.

Options

  • :max_concurrency - Maximum concurrent tasks (default: 10)
  • :ordered - Preserve input order in results (default: true)
  • :timeout - Timeout per task in milliseconds (default: 300_000ms)
  • :on_progress - Callback function called after each completion
  • :on_error - Callback function called on each error

Examples

# Simple batch
inputs = [
  %{chat_message: "Hello"},
  %{chat_message: "How are you?"}
]
{:ok, results} = BaseAgent.process_batch(agent, inputs)

# With options
{:ok, results} = BaseAgent.process_batch(
  agent,
  inputs,
  max_concurrency: 5
)

process_batch_with_stats(agent, inputs, opts \\ [])

@spec process_batch_with_stats(
  Normandy.Agents.BaseAgentConfig.t(),
  [term()],
  keyword()
) :: {:ok, map()}

Process a batch and return detailed statistics.

Returns success/error breakdown with counts.

Examples

{:ok, stats} = BaseAgent.process_batch_with_stats(agent, inputs)
#=> %{
  success: [result1, result2],
  errors: [{input3, error}],
  total: 3,
  success_count: 2,
  error_count: 1
}

register_context_provider(config, provider_name, provider)

@spec register_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom(), struct()) ::
  Normandy.Agents.BaseAgentConfig.t()

register_mcp_tools(config, adapter, client, opts \\ [])

@spec register_mcp_tools(
  Normandy.Agents.BaseAgentConfig.t(),
  module(),
  term(),
  keyword()
) ::
  {:ok, Normandy.Agents.BaseAgentConfig.t()} | {:error, term()}

Discovers and registers client-side MCP tools in the agent's tool registry.

Connects to an MCP server via the specified adapter, discovers available tools, and registers them as Normandy tools that can be used in the agent's tool loop.

Options

  • :prefix - Namespace prefix for tool names (e.g., "my_server")

Examples

agent = BaseAgent.register_mcp_tools(agent, MyAdapter, mcp_client, prefix: "server1")

register_tool(config, tool)

Registers a tool in the agent's tool registry.

Creates a new registry if one doesn't exist.

Examples

iex> agent = BaseAgent.init(config)
iex> tool = %Normandy.Tools.Examples.Calculator{}
iex> agent = BaseAgent.register_tool(agent, tool)

reset_memory(config)

run(config, user_input \\ nil)

run(config, user_input, opts)

Run the agent with optional streaming support.

Accepts a keyword list as the third argument with options:

  • :stream - Boolean, enables streaming mode
  • :on_chunk - Callback function for streaming chunks

Example

BaseAgent.run(agent, %{chat_message: "Hello"}, stream: true, on_chunk: fn chunk ->
  IO.write(chunk)
end)

run_with_tools(config, user_input \\ nil)

Run the agent with tool calling support.

This method handles the full tool execution loop:

  1. Send user input to LLM
  2. If LLM requests tool calls, execute them
  3. Send tool results back to LLM
  4. Repeat until LLM provides final response or max iterations reached

Parameters

  • config: Agent configuration
  • user_input: Optional user input to start the conversation

Returns

Tuple of {updated_config, final_response}

Examples

iex> {config, response} = BaseAgent.run_with_tools(agent, user_input)

stream_response(config, user_input \\ nil, callback)

Stream a response from the LLM with real-time callbacks.

This method enables streaming mode, allowing you to process LLM responses as they're generated. Callbacks are invoked for each event type.

Parameters

  • config: Agent configuration
  • user_input: Optional user input (can be nil to continue conversation)
  • callback: Function (event_type, data) -> :ok called for each event

Event Types

  • :text_delta - Incremental text content
  • :tool_use_start - Tool call beginning
  • :thinking_delta - Extended thinking content
  • :message_start - Stream beginning
  • :message_stop - Stream complete

Returns

{config, final_response} - Updated config and accumulated response

Example

callback = fn
  :text_delta, text -> IO.write(text)
  :tool_use_start, tool -> IO.puts("\nCalling tool: #{tool["name"]}")
  _, _ -> :ok
end

{agent, response} = BaseAgent.stream_response(agent, input, callback)

stream_with_tools(config, user_input \\ nil, callback)

Stream responses with tool calling support.

Combines streaming and tool execution - as the LLM streams its response, tool calls are detected and executed, with results fed back into the stream.

Parameters

  • config: Agent configuration
  • user_input: Optional user input to start the conversation
  • callback: Function (event_type, data) -> :ok called for each event

Event Types

  • :text_delta - Incremental text content
  • :tool_use_start - Tool call beginning
  • :tool_result - Tool execution result (custom event)
  • :thinking_delta - Extended thinking content
  • :message_start - Stream beginning
  • :message_stop - Stream complete
  • :guardrail_violation - Output guardrail violation

Returns

{config, final_response} - Updated config and accumulated response

Guardrail Semantics

If :output_guardrails_streaming_mode is :incremental and a violation fires mid-stream, the current iteration is halted and any in-flight tool_use content block is stripped from the returned response — the caller won't execute a tool whose arguments were still streaming. Tool results from earlier iterations remain in memory; memory commits happen after each stream ends, not after the loop completes.

Example

callback = fn
  :text_delta, text -> IO.write(text)
  :tool_use_start, tool -> IO.puts("\nCalling tool: #{tool["name"]}")
  :tool_result, result -> IO.puts("Tool result: #{inspect(result)}")
  _, _ -> :ok
end

{agent, response} = BaseAgent.stream_with_tools(agent, input, callback)