Planck.Agent (Planck.Agent v0.1.0)

Copy Markdown View Source

OTP-based LLM agent.

Each agent is a GenServer that drives the LLM loop: stream a response → collect tool calls → execute them concurrently → append results → re-stream until the model stops.

Roles

An agent's role is derived from its tool list at start time:

  • Orchestrator — has a tool named "spawn_agent" in its list. Owns a team_id; all agents sharing that team_id are terminated when this agent exits.
  • Worker — no "spawn_agent" tool. Receives tasks and reports back.

Events

Subscribers receive {:agent_event, type, payload} messages:

EventPayload keys
:turn_startindex
:turn_endmessage, usage
:text_deltatext
:thinking_deltatext
:usage_deltadelta (input_tokens, output_tokens, cost), total (input_tokens, output_tokens, cost), context_tokens
:tool_startid, name, args
:tool_endid, name, result, error
:worker_spawned
:worker_exitpid, reason
:errorreason

Example

{:ok, pid} = DynamicSupervisor.start_child(
  Planck.Agent.AgentSupervisor,
  {Planck.Agent,
    id: "agent-1",
    model: model,
    system_prompt: "You are helpful.",
    tools: [read_tool]}
)

Planck.Agent.subscribe(pid)
Planck.Agent.prompt(pid, "What is in lib/app.ex?")

Summary

Types

A reference to a running agent — pid, registered name, or via-tuple.

t()

Internal GenServer state for an agent.

Functions

Cancel in-flight streaming and tool execution. Blocks until the agent has returned to :idle (or started a follow-up turn for any queued messages).

Add a tool at runtime.

Replace the model used for subsequent LLM turns without interrupting the current state.

Estimate the number of tokens currently in the agent's context window.

Lightweight summary: id, name, description, type, role, status, turn_index, usage.

Synchronous state snapshot.

Trigger the agent to run an LLM turn without adding a new user message.

Send a user message and kick off the agent loop. Returns once the agent status is :streaming.

Remove a tool by name at runtime.

Truncate the session to strictly before message_id, then reload the agent's in-memory message history from the DB (the source of truth). turn_checkpoints is rebuilt from the reloaded message list.

Start an agent under a supervisor.

Stop the agent. Cancels any in-flight work and removes it from the supervisor.

Subscribe the calling process to {:agent_event, type, payload} messages.

Resolve an agent id to its pid via the Registry.

Types

agent()

@type agent() :: pid() | atom() | {:via, module(), term()}

A reference to a running agent — pid, registered name, or via-tuple.

t()

@type t() :: %Planck.Agent{
  available_models: [Planck.AI.Model.t()],
  cost: float(),
  cwd: String.t(),
  delegator_id: String.t() | nil,
  description: String.t() | nil,
  id: String.t(),
  messages: [Planck.Agent.Message.t()],
  model: Planck.AI.Model.t() | nil,
  name: String.t() | nil,
  on_compact:
    ([Planck.Agent.Message.t()] ->
       {:compact, Planck.Agent.Message.t(), [Planck.Agent.Message.t()]} | :skip)
    | nil,
  opts: keyword(),
  pending_tool_calls: [map()],
  role: :orchestrator | :worker,
  running_tools: %{required(String.t()) => map()},
  session_id: String.t() | nil,
  status: :idle | :streaming | :executing_tools,
  stream_ref: reference() | nil,
  stream_start: non_neg_integer(),
  stream_task: Task.t() | nil,
  system_prompt: String.t(),
  team_id: String.t() | nil,
  text_buffer: String.t(),
  thinking_buffer: String.t(),
  tool_results_acc: list(),
  tools: %{required(String.t()) => Planck.Agent.Tool.t()},
  turn_checkpoints: [non_neg_integer()],
  turn_index: non_neg_integer(),
  type: String.t() | nil,
  usage: %{input_tokens: non_neg_integer(), output_tokens: non_neg_integer()}
}

Internal GenServer state for an agent.

Public fields (readable via get_state/1 or get_info/1):

  • id — unique agent identifier
  • name / description / type — display metadata set at start time
  • team_id — registry namespace shared by all agents in the same team
  • session_id — SQLite session this agent persists messages to; nil for ephemeral agents
  • delegator_id — id of the orchestrator that spawned this worker; nil for orchestrators
  • role:orchestrator (has spawn_agent tool) or :worker
  • model — the Planck.AI.Model the agent is configured to use
  • system_prompt — prepended to every LLM context
  • cwd — working directory for the session; used to locate AGENTS.md
  • messages — full in-memory conversation history (Message.t() list)
  • tools — map of tool name → Tool.t() available to this agent
  • status:idle, :streaming, or :executing_tools
  • turn_index — monotonically increasing turn counter
  • usage — accumulated %{input_tokens, output_tokens} for this session
  • cost — accumulated cost in USD; never decreases (rewinding messages does not reduce it)

Internal fields (not part of the public API):

  • stream_task / stream_ref — in-flight async LLM stream
  • stream_start — length of messages when the current stream began; used to detect messages appended during streaming that the LLM did not see
  • turn_checkpoints — message-count stack used internally
  • pending_tool_calls — tool calls waiting for execution after stream end
  • text_buffer / thinking_buffer — partial text accumulated during streaming
  • on_compact — optional compaction callback
  • opts — pass-through keyword options (e.g. tool_timeout)
  • available_models — model catalog used by list_models and spawn_agent

Functions

abort(agent)

@spec abort(agent()) :: :ok

Cancel in-flight streaming and tool execution. Blocks until the agent has returned to :idle (or started a follow-up turn for any queued messages).

add_tool(agent, tool)

@spec add_tool(agent(), Planck.Agent.Tool.t()) :: :ok

Add a tool at runtime.

change_model(agent, model)

@spec change_model(agent(), Planck.AI.Model.t()) :: :ok

Replace the model used for subsequent LLM turns without interrupting the current state.

estimate_tokens(agent)

@spec estimate_tokens(agent()) :: non_neg_integer()

Estimate the number of tokens currently in the agent's context window.

get_info(agent)

@spec get_info(agent()) :: map()

Lightweight summary: id, name, description, type, role, status, turn_index, usage.

get_state(agent)

@spec get_state(agent()) :: map()

Synchronous state snapshot.

nudge(agent)

@spec nudge(agent()) :: :ok

Trigger the agent to run an LLM turn without adding a new user message.

Used after session resume when a recovery context message is already present in the agent's history and just needs to be acted upon.

prompt(agent, content, opts \\ [])

@spec prompt(agent(), String.t() | [Planck.AI.Message.content_part()], keyword()) ::
  :ok

Send a user message and kick off the agent loop. Returns once the agent status is :streaming.

remove_tool(agent, name)

@spec remove_tool(agent(), String.t()) :: :ok

Remove a tool by name at runtime.

rewind_to_message(agent, message_id)

@spec rewind_to_message(agent(), pos_integer()) :: :ok

Truncate the session to strictly before message_id, then reload the agent's in-memory message history from the DB (the source of truth). turn_checkpoints is rebuilt from the reloaded message list.

Only meaningful for agents with a session_id. A no-op for ephemeral agents.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start an agent under a supervisor.

stop(agent)

@spec stop(agent()) :: :ok

Stop the agent. Cancels any in-flight work and removes it from the supervisor.

subscribe(agent_id)

@spec subscribe(String.t() | agent()) :: :ok | {:error, term()}

Subscribe the calling process to {:agent_event, type, payload} messages.

Accepts either an agent id string or a pid/name. The pid form resolves the id via get_info/1 — prefer passing the id directly when available.

whereis(id)

@spec whereis(String.t()) :: {:ok, pid()} | {:error, :not_found}

Resolve an agent id to its pid via the Registry.