Normandy.Coordination.AgentProcess
(normandy v1.0.0)
View Source
GenServer wrapper for running BaseAgent instances as supervised processes.
AgentProcess enables agents to run as long-lived processes that can:
- Maintain state across multiple invocations
- Be supervised and restarted on failure
- Receive messages asynchronously
- Integrate with process registries
Example
# Start an agent process
{:ok, pid} = AgentProcess.start_link(
agent: my_agent,
name: :research_agent
)
# Execute synchronously
{:ok, result} = AgentProcess.run(pid, "Analyze this data")
# Execute asynchronously
:ok = AgentProcess.cast(pid, "Process in background", reply_to: self())
# Get current agent state
agent = AgentProcess.get_agent(pid)Durable turn engine (:server mode)
Pass turn_engine: :server to route turns through the durable
Normandy.Agents.Turn.Session/Turn.Server engine (approval parking,
passivation, persistence) instead of the default synchronous :inline
BaseAgent.run/2 path. :inline is the default and is unchanged.
{:ok, pid} = AgentProcess.start_link(agent: config, turn_engine: :server)In :server mode:
run/3/cast/3route throughTurn.Session;run/3is non-blocking internally (the GenServer stays responsive while a turn is parked);approve/2delivers human-approval decisions to a parked turn.- The
SessionStoreowns conversation memory:get_agent/1reconstructs it from the store, andupdate_agent/2updates only the config template (model/temperature/behaviours/tools) — memory mutations are ignored. - Session infra (
:store,:registry,:supervisor) may be supplied viastart_link; if omitted, the process starts and owns in-memory defaults that terminate with it.:subscriber,:handlers,:approval_timeout_ms, and:idle_timeout_msare forwarded toTurn.Sessionwhen supplied.
Summary
Functions
Delivers approval decisions to a turn parked awaiting human approval
(:server mode only). decisions maps tool_call_id to :approve | :reject;
any parked id absent or :reject is treated as rejected (fail-closed).
Executes the agent asynchronously.
Returns a specification to start this module under a supervisor.
Returns the current agent state.
Returns the agent ID.
Returns agent statistics and metadata.
Executes the agent synchronously.
Starts an AgentProcess GenServer.
Stops the agent process gracefully.
Updates the agent state.
Types
@type agent_id() :: String.t()
@type run_opts() :: [timeout: non_neg_integer(), async: boolean(), reply_to: pid()]
Functions
@spec approve(GenServer.server(), %{optional(String.t()) => :approve | :reject}) :: :ok | {:error, :no_session} | {:error, :inline_mode}
Delivers approval decisions to a turn parked awaiting human approval
(:server mode only). decisions maps tool_call_id to :approve | :reject;
any parked id absent or :reject is treated as rejected (fail-closed).
Returns :ok, {:error, :no_session} if no live parked session exists, or
{:error, :inline_mode} when the process runs in :inline mode.
@spec cast(GenServer.server(), term(), keyword()) :: :ok
Executes the agent asynchronously.
The agent runs in the background. If :reply_to is provided,
sends {:agent_result, agent_id, result} when complete.
Options
:reply_to- PID to send result to (optional)
Example
:ok = AgentProcess.cast(pid, input, reply_to: self())
receive do
{:agent_result, agent_id, result} ->
IO.inspect(result)
end
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec get_agent(GenServer.server()) :: struct()
Returns the current agent state.
Example
agent = AgentProcess.get_agent(pid)
@spec get_id(GenServer.server()) :: agent_id()
Returns the agent ID.
Example
agent_id = AgentProcess.get_id(pid)
#=> "agent_1"
@spec get_stats(GenServer.server()) :: map()
Returns agent statistics and metadata.
Example
stats = AgentProcess.get_stats(pid)
#=> %{
agent_id: "agent_1",
run_count: 42,
last_run: ~U[2024-01-15 10:30:00Z],
total_runtime_ms: 15420
}
@spec run(GenServer.server(), term(), keyword()) :: {:ok, term()} | {:error, term()}
Executes the agent synchronously.
Runs the agent with the given input and returns the result.
Options
:timeout- Call timeout in ms (default: 60_000)
Example
{:ok, result} = AgentProcess.run(pid, "What is AI?")
@spec start_link(keyword()) :: GenServer.on_start()
Starts an AgentProcess GenServer.
Options
:agent- BaseAgent struct (required):name- Register the process with a name (optional):agent_id- Unique identifier for this agent (default: UUID):context_pid- StatefulContext process to use (optional)
Example
{:ok, pid} = AgentProcess.start_link(
agent: my_agent,
name: :my_agent,
agent_id: "agent_1"
)
@spec stop(GenServer.server()) :: :ok
Stops the agent process gracefully.
Example
:ok = AgentProcess.stop(pid)
@spec update_agent(GenServer.server(), (struct() -> struct())) :: :ok
Updates the agent state.
Useful for modifying configuration or resetting state.
Example
:ok = AgentProcess.update_agent(pid, fn agent ->
%{agent | config: new_config}
end)