planck_agent is an OTP-based agent runtime for Elixir built on top of
planck_ai. It drives the full LLM loop —
stream a response, collect tool calls, execute them concurrently, append results,
re-stream — inside a supervised GenServer per agent, with Phoenix.PubSub
broadcasting at every step.
Installation
# mix.exs
{:planck_agent, "~> 0.1"}planck_agent is a pure library — it has no runtime config module. Callers
pass paths (sessions dir, skills/tools dirs) explicitly. planck_headless
owns config resolution for the full Planck stack.
Quick start
alias Planck.AI
alias Planck.Agent
alias Planck.Agent.Tool
# 1. Get a model from planck_ai
{:ok, model} = AI.get_model(:anthropic, "claude-sonnet-4-6")
# 2. Define a tool
read_file =
Tool.new(
name: "read_file",
description: "Read a file from disk",
parameters: %{
"type" => "object",
"properties" => %{"path" => %{"type" => "string"}},
"required" => ["path"]
},
execute_fn: fn _id, %{"path" => path} ->
case File.read(path) do
{:ok, content} -> {:ok, content}
{:error, reason} -> {:error, "could not read #{path}: #{reason}"}
end
end
)
# 3. Start an agent
{:ok, agent} =
DynamicSupervisor.start_child(
Planck.Agent.AgentSupervisor,
{Agent,
id: "agent-1",
model: model,
system_prompt: "You are a helpful coding assistant.",
tools: [read_file]}
)
# 4. Subscribe and prompt
Agent.subscribe(agent)
Agent.prompt(agent, "What does lib/app.ex do?")
# 5. Receive events
receive do
{:agent_event, :turn_end, %{message: msg, usage: usage}} ->
IO.inspect(msg.content)
IO.inspect(usage)
endPub/Sub events
Subscribe to {:agent_event, type, payload} messages via Agent.subscribe/1.
Events are broadcast on two topics: "agent:#{id}" and, when a session_id is
set, "session:#{session_id}".
| Event | Payload keys | When |
|---|---|---|
:turn_start | index | New LLM turn begins |
:turn_end | message, usage | Turn complete, no pending tools |
:text_delta | text | Streaming text chunk |
:thinking_delta | text | Streaming thinking chunk |
:usage_delta | delta, total | Token usage from each LLM response |
:tool_start | id, name, args | Tool execution begins |
:tool_end | id, name, result, error | Tool finished |
:worker_exit | pid, reason | Worker process exited (orchestrator only) |
:error | reason | Stream error; agent returns to :idle |
Agent.subscribe("agent-1")
receive do
{:agent_event, :text_delta, %{text: chunk}} -> IO.write(chunk)
{:agent_event, :tool_start, %{name: name}} -> IO.puts("→ #{name}")
{:agent_event, :turn_end, %{usage: u}} -> IO.inspect(u)
endSubscribe to the session topic to receive events from all agents in a session:
Phoenix.PubSub.subscribe(Planck.Agent.PubSub, "session:#{session_id}")Agent lifecycle
prompt/2
idle ──────────────────► streaming ──► stream events
▲ │ (text, thinking, tool calls)
│ ▼
│ stream done
│ / \
│ no tools tool calls pending
│ │ │
│◄──── turn_end ─┘ ▼
│ executing_tools
│ (Task.async_stream)
│ │
│◄─── append tool_result ────────────────┘
re-stream (loop)abort/1 cancels in-flight streaming from any state and returns the agent to
:idle. stop/1 shuts it down cleanly.
Roles
An agent's role is determined solely by its tool list at start time:
- Orchestrator — has a tool named
"spawn_agent". Owns ateam_id; the entire team is terminated when the orchestrator exits. - Worker — no
"spawn_agent"tool. Receives tasks, executes them, reports back.
Teams
Agents with the same team_id form a team. The orchestrator owns the team;
all workers are process-linked to it and exit when it does.
Built-in inter-agent tools
These tools are wired up by the caller — see Planck.Agent.OrchestratorTools
and Planck.Agent.WorkerTools for the Tool structs to include.
Available to all agents:
| Tool | Behaviour |
|---|---|
ask_agent | Blocking — sends a prompt to a team member and waits for :turn_end |
delegate_task | Non-blocking — sends a task and returns immediately |
send_response | Non-blocking — routes a result back to the delegator |
list_team | Returns all agents in the team with type, name, status, and turn index |
Orchestrator only:
| Tool | Behaviour |
|---|---|
spawn_agent | Spawns a new worker under the same team_id and session_id |
destroy_agent | Terminates a worker permanently |
interrupt_agent | Aborts a worker's current turn; worker stays alive |
list_models | Returns the available_models list passed at start time |
Built-in file and shell tools
Planck.Agent.BuiltinTools provides four ready-made Tool structs that cover
file-system access and shell execution:
tools = [
Planck.Agent.BuiltinTools.read(),
Planck.Agent.BuiltinTools.write(),
Planck.Agent.BuiltinTools.edit(),
Planck.Agent.BuiltinTools.bash()
]| Tool | Description |
|---|---|
read | Read a file. Accepts optional offset (lines to skip) and limit (max lines). |
write | Write content to a file, creating missing parent directories. |
edit | Replace an exact unique string in a file. Errors if not found or ambiguous. |
bash | Run a shell command. Optional cwd and timeout (ms) as runtime JSON args. |
bash captures both stdout and stderr; stderr is appended under a STDERR: header when non-empty. Shell execution is managed by erlexec, which cleans up process groups on timeout or termination.
Granting tools to spawned workers
When building the orchestrator's tools, pass a grantable_tools list to
Planck.Agent.Tools.orchestrator_tools/5. The orchestrator can then grant any
subset of those tools to workers it spawns by including a "tools" key in the
spawn_agent call:
{
"type": "reviewer",
"name": "Reviewer",
"tools": ["read"]
}Workers always receive the standard worker tools (ask_agent, delegate_task,
send_response, list_team). Granted tools are added on top. Names not in the
orchestrator's grantable_tools list are silently ignored — workers cannot
escalate beyond what the orchestrator was given.
Spawning a team manually
alias Planck.Agent.{Agent, AgentSpec, Compactor, Team}
{:ok, model} = Planck.AI.get_model(:anthropic, "claude-sonnet-4-6")
team_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
session_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
on_compact = Compactor.build(model)
orchestrator_opts = [
id: "orch-#{team_id}",
type: "orchestrator",
model: model,
system_prompt: "You coordinate a team of agents.",
tools: orchestrator_tools,
team_id: team_id,
session_id: session_id,
on_compact: on_compact,
available_models: Planck.AI.list_models(:anthropic)
]
DynamicSupervisor.start_child(Planck.Agent.AgentSupervisor, {Agent, orchestrator_opts})Workers are typically spawned by the orchestrator via spawn_agent at runtime,
or pre-spawned from a team template (see below).
Session persistence
Start a session before starting any agents that should persist messages:
alias Planck.Agent.Session
{:ok, _pid} = Session.start(session_id, name: "my-session", dir: sessions_dir)Agents with a matching session_id append every message automatically.
Each call to append/3 is synchronous and returns the SQLite row id, which
becomes Message.id — unifying the in-memory id with the DB primary key.
Messages are stored in a SQLite file at <sessions_dir>/<id>_<name>.db.
Retrieving messages
# All messages in insertion order
{:ok, rows} = Session.messages(session_id)
{:ok, rows} = Session.messages(session_id, agent_id: "agent-1")
# Each row: %{db_id: pos_integer(), agent_id: String.t(), message: Message.t(), inserted_at: integer()}Checkpoint-based pagination
Summary messages (role {:custom, :summary}) are stored as checkpoints,
enabling efficient "load recent / load more" pagination for long sessions:
# Initial load — latest summary checkpoint + all messages after it
{:ok, rows, checkpoint_id} = Session.messages_from_latest_checkpoint(session_id)
# Load more — previous chapter
{:ok, rows, prev_id} = Session.messages_before_checkpoint(session_id, checkpoint_id)
# prev_id == nil → no more history to loadPass agent_id: to either function to filter to a specific agent.
Context compaction
Planck.Agent.Compactor.build/2 returns an on_compact hook. When the
estimated token count of the message history exceeds the threshold, it calls
the LLM to produce a summary that preserves the active goal and recent context.
on_compact = Compactor.build(model,
ratio: 0.8, # compact when history reaches 80% of context_window
keep_recent: 10 # keep the last 10 messages verbatim
)When compaction triggers, the summary is inserted as a {:custom, :summary}
message in the agent's history and persisted to the session. Future LLM calls
are built from the latest summary onward — the full history remains in the
session for audit and UI pagination.
Bring your own compaction strategy by implementing the Planck.Agent.Compactor
behaviour in a sidecar module (see Sidecars below), then passing
sidecar_node: and compactor: to build/2:
on_compact = Compactor.build(model,
sidecar_node: sidecar_node,
compactor: "MySidecar.Compactors.Builder"
)The remote compactor falls back to the local LLM-based compactor if the sidecar is unavailable.
The hook receives messages since the last summary checkpoint and must return
either {:compact, summary_msg, kept_messages} or :skip.
Sidecars
A sidecar is a separate OTP application that extends planck_headless with
custom tools and compactors over distributed Erlang. The entry-point module
implements the Planck.Agent.Sidecar behaviour:
defmodule MySidecar.Planck do
use Planck.Agent.Sidecar
@impl true
def tools do
[
Planck.Agent.Tool.new(
name: "run_tests",
description: "Run the test suite.",
parameters: %{"type" => "object", "properties" => %{}},
execute_fn: fn _id, _args ->
{output, 0} = System.cmd("mix", ["test"])
{:ok, output}
end
)
]
end
endPlanck.Agent.Sidecar itself provides the RPC entry points planck_headless
calls on the sidecar node:
discover/0— finds the entry module by scanning loaded OTP applicationslist_tools/0— discovers the entry module and returns serialisablePlanck.AI.Tool.t()structsexecute_tool/3— discovers the entry module and calls the matching tool
See specs/sidecar.md for the full design including startup sequence, compactor
integration, and configuration.
Team templates
Define a team in JSON and load it at runtime:
[
{
"type": "planner",
"name": "Planner",
"description": "Breaks tasks into steps",
"provider": "anthropic",
"model_id": "claude-sonnet-4-6",
"system_prompt": "You are an expert planner.",
"opts": { "temperature": 0.5 }
},
{
"type": "coder",
"name": "Coder",
"description": "Writes and edits code",
"provider": "ollama",
"model_id": "llama3.2",
"system_prompt": "prompts/coder.md"
}
]system_prompt accepts an inline string or a .md/.txt path resolved
relative to the template file. Valid providers are "anthropic", "openai",
"google", "ollama", "llama_cpp".
The optional "tools" array lists tool names the agent should receive. Names are
resolved at start time from the tool_pool: keyword passed to AgentSpec.to_start_opts/2:
{ "type": "coder", "tools": ["read", "write", "bash"], ... }pool = Planck.Agent.BuiltinTools.all() ++ Planck.Agent.ExternalTool.load_all(dirs)
start_opts = AgentSpec.to_start_opts(spec,
tool_pool: pool,
team_id: team_id,
session_id: session_id
)Unknown names are silently ignored. When spec.tools is empty, to_start_opts/2
falls back to the tools: keyword — the behaviour before this feature was added.
alias Planck.Agent.{Agent, AgentSpec, Compactor, Team}
{:ok, team} = Team.load(".planck/teams/my-team")
tools_by_type = %{
"planner" => [list_team_tool, delegate_task_tool, spawn_agent_tool],
"coder" => [read_file_tool, write_file_tool, bash_tool, send_response_tool]
}
team_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
session_id = :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
Enum.each(team.members, fn spec ->
{:ok, model} = Planck.AI.get_model(spec.provider, spec.model_id)
tools = Map.get(tools_by_type, spec.type, [])
start_opts =
AgentSpec.to_start_opts(spec,
tools: tools,
team_id: team_id,
session_id: session_id,
on_compact: Compactor.build(model)
)
DynamicSupervisor.start_child(Planck.Agent.AgentSupervisor, {Agent, start_opts})
end)Skills
Skills are reusable agent capabilities stored on the filesystem. Each skill is a
directory containing a SKILL.md with YAML-style frontmatter:
.planck/skills/
code_review/
SKILL.md
resources/
rubric.md---
name: code_review
description: Reviews code for correctness, style, and performance.
---
Review the provided code...Load skills and inject them into an agent's system prompt:
alias Planck.Agent.Skill
skills = Skill.load_all(["~/.planck/skills"])
# Per-agent skill scoping is driven by spec.skills and skill_pool: in
# AgentSpec.to_start_opts/2 — see the Teams section.Each skill entry includes the path to its SKILL.md file and its
resources directory.
Dynamic tool management
Add and remove tools without restarting the agent:
Agent.add_tool(agent, new_tool)
Agent.remove_tool(agent, "tool_name")Editing history
Truncate both the session and in-memory history to strictly before a given
message. The message id is the SQLite row id (Message.id == db_id for
persisted messages). A no-op for ephemeral agents.
Agent.rewind_to_message(agent, message_id)Typically called via Planck.Headless.rewind_to_message/3 which also
re-prompts the orchestrator with the edited text.
Configuration
planck_agent has no runtime configuration module. Every function that reads
from disk (Session.start/2, Skill.load_all/1, ExternalTool.load_all/1,
Compactor.load/1) takes its path(s) as an explicit argument. Applications
using this library should resolve those paths themselves — or depend on
planck_headless, which exposes Planck.Headless.Config for the full Planck
stack.
Supervision tree
Planck.Agent.Supervisor (strategy: :one_for_all)
├── Phoenix.PubSub (name: Planck.Agent.PubSub)
├── Registry (keys: :duplicate, name: Planck.Agent.Registry)
├── Task.Supervisor (name: Planck.Agent.TaskSupervisor)
├── DynamicSupervisor (name: Planck.Agent.SessionSupervisor)
│ └── Planck.Agent.Session (restart: :temporary)
└── DynamicSupervisor (name: Planck.Agent.AgentSupervisor)
├── Planck.Agent (role: :orchestrator)
└── Planck.Agent (role: :worker):one_for_all on the top-level supervisor ensures the Registry and PubSub
always restart together — a stale Registry after a crash would leave agents
unable to find each other.