Normandy.Coordination.AgentProcess (normandy v1.1.0)

View Source

GenServer wrapper for running BaseAgent instances as supervised processes.

AgentProcess enables agents to run as long-lived processes that can:

  • Maintain state across multiple invocations
  • Be supervised and restarted on failure
  • Receive messages asynchronously
  • Integrate with process registries

Example

# Start an agent process
{:ok, pid} = AgentProcess.start_link(
  agent: my_agent,
  name: :research_agent
)

# Execute synchronously
{:ok, result} = AgentProcess.run(pid, "Analyze this data")

# Execute asynchronously
:ok = AgentProcess.cast(pid, "Process in background", reply_to: self())

# Get current agent state
agent = AgentProcess.get_agent(pid)

Durable turn engine (:server mode)

Pass turn_engine: :server to route turns through the durable Normandy.Agents.Turn.Session/Turn.Server engine (approval parking, passivation, persistence) instead of the default synchronous :inline BaseAgent.run/2 path. :inline is the default and is unchanged.

{:ok, pid} = AgentProcess.start_link(agent: config, turn_engine: :server)

In :server mode:

  • run/3/cast/3 route through Turn.Session; run/3 is non-blocking internally (the GenServer stays responsive while a turn is parked); approve/2 delivers human-approval decisions to a parked turn.
  • The SessionStore owns conversation memory: get_agent/1 reconstructs it from the store, and update_agent/2 updates only the config template (model/temperature/behaviours/tools) — memory mutations are ignored.
  • Session infra (:store, :registry, :supervisor) may be supplied via start_link; if omitted, the process starts and owns in-memory defaults that terminate with it. :subscriber, :handlers, :approval_timeout_ms, and :idle_timeout_ms are forwarded to Turn.Session when supplied.

Summary

Functions

Delivers approval decisions to a turn parked awaiting human approval (:server mode only). decisions maps tool_call_id to :approve | :reject; any parked id absent or :reject is treated as rejected (fail-closed).

Executes the agent asynchronously.

Returns a specification to start this module under a supervisor.

Returns the current agent state.

Returns the agent ID.

Returns agent statistics and metadata.

Executes the agent synchronously.

Starts an AgentProcess GenServer.

Stops the agent process gracefully.

Updates the agent state.

Types

agent_id()

@type agent_id() :: String.t()

run_opts()

@type run_opts() :: [timeout: non_neg_integer(), async: boolean(), reply_to: pid()]

Functions

approve(server, decisions)

@spec approve(GenServer.server(), %{optional(String.t()) => :approve | :reject}) ::
  :ok | {:error, :no_session} | {:error, :inline_mode}

Delivers approval decisions to a turn parked awaiting human approval (:server mode only). decisions maps tool_call_id to :approve | :reject; any parked id absent or :reject is treated as rejected (fail-closed).

Returns :ok, {:error, :no_session} if no live parked session exists, or {:error, :inline_mode} when the process runs in :inline mode.

cast(server, input, opts \\ [])

@spec cast(GenServer.server(), term(), keyword()) :: :ok

Executes the agent asynchronously.

The agent runs in the background. If :reply_to is provided, sends {:agent_result, agent_id, result} when complete.

Options

  • :reply_to - PID to send result to (optional)

Example

:ok = AgentProcess.cast(pid, input, reply_to: self())

receive do
  {:agent_result, agent_id, result} ->
    IO.inspect(result)
end

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_agent(server)

@spec get_agent(GenServer.server()) :: struct()

Returns the current agent state.

Example

agent = AgentProcess.get_agent(pid)

get_id(server)

@spec get_id(GenServer.server()) :: agent_id()

Returns the agent ID.

Example

agent_id = AgentProcess.get_id(pid)
#=> "agent_1"

get_stats(server)

@spec get_stats(GenServer.server()) :: map()

Returns agent statistics and metadata.

Example

stats = AgentProcess.get_stats(pid)
#=> %{
  agent_id: "agent_1",
  run_count: 42,
  last_run: ~U[2024-01-15 10:30:00Z],
  total_runtime_ms: 15420
}

run(server, input, opts \\ [])

@spec run(GenServer.server(), term(), keyword()) :: {:ok, term()} | {:error, term()}

Executes the agent synchronously.

Runs the agent with the given input and returns the result.

Options

  • :timeout - Call timeout in ms (default: 60_000)

Example

{:ok, result} = AgentProcess.run(pid, "What is AI?")

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts an AgentProcess GenServer.

Options

  • :agent - BaseAgent struct (required)
  • :name - Register the process with a name (optional)
  • :agent_id - Unique identifier for this agent (default: UUID)
  • :context_pid - StatefulContext process to use (optional)

Example

{:ok, pid} = AgentProcess.start_link(
  agent: my_agent,
  name: :my_agent,
  agent_id: "agent_1"
)

stop(server)

@spec stop(GenServer.server()) :: :ok

Stops the agent process gracefully.

Example

:ok = AgentProcess.stop(pid)

update_agent(server, update_fn)

@spec update_agent(GenServer.server(), (struct() -> struct())) :: :ok

Updates the agent state.

Useful for modifying configuration or resetting state.

Example

:ok = AgentProcess.update_agent(pid, fn agent ->
  %{agent | config: new_config}
end)