Agent 状态机 — 使用 :gen_statem 编排四状态循环。
状态图
┌──────┐ prompt ┌─────────┐ stream ┌───────────┐
│ idle │──────────▶│ running │─────────▶│ streaming │
└──┬───┘ └────▲────┘ └─────┬─────┘
│ │ │
│ │ finalize │
│ │ │
│ ┌──────────────┴────────┐ │
│ │ tool_calls? │◀─────────────┘
│ │ yes → executing_tools│
│ │ no → finish/retry │
│ └──────────┬────────────┘
│ │
│ ┌───────▼──────────┐
│ │ executing_tools │ ← Task 完成后
│ │ collect results │ 回到 running
│ └───────┬──────────┘
│ │ all done
│ ▼
│ run_turn (loop)
│ │ no more tool_calls
└────────────────┘ finish → idle流式消息协议
Agent 通过 CMDC.Provider.stream/4 发起 LLM 请求。
CMDC.Provider.StreamBridge 将 req_llm 的 StreamResponse 转为消息:
| 消息 | 处理 |
|---|---|
{:cmdc_stream_chunk, %StreamChunk{}} | 由内部 Stream 处理器更新状态 |
:cmdc_stream_done | 触发 finalize_response/1 |
{:cmdc_stream_error, reason} | 设置 stream_errored,触发错误恢复 |
关键改造(相对旧项目)
- 移除
Agent.Registry进程注册表(随 Orchestrator 走) - 移除 shadow 功能
- Emitter 适配新
CMDC.EventBus(替代旧CMDC.Events) - Plugin.Registry 从旧
from_specs/1移植,支持Module和{Module, opts}两种格式 config字段保持为 map(待 1.9 任务改为Config.t())
使用示例
{:ok, pid} = CMDC.Agent.start_link(
session_id: "abc",
model: "anthropic:claude-sonnet-4-5",
working_dir: "/project",
tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell],
provider_opts: [api_key: "sk-..."]
)
%{queued: false} = CMDC.Agent.prompt(pid, "Hello")
Summary
Types
@type start_opts() :: [ session_id: String.t(), model: CMDC.Provider.model(), working_dir: String.t(), blueprint_system_prompt: String.t(), tools: [module()], disabled_tools: [String.t()], plugins: [CMDC.Plugin.Registry.plugin_spec()], config: map(), provider_opts: keyword(), messages: [CMDC.Message.t()], sandbox: module() | nil, subagents: [map()], user_data: map(), prompt_mode: :full | :task | :minimal | :none, event_buffer_size: non_neg_integer(), hibernate_after_ms: pos_integer() | nil ]
Functions
@spec abort( GenServer.server(), keyword() ) :: :ok
中止当前运行。
选项
:reason :: term()— 中止原因,写入{:agent_abort, reason}payload。 默认nil(透传,发裸:agent_abort事件)。入参可以是 atom 或 string。下列 6 个标准 string 会自动归一为同名 atom, 防止前端通过 JSON 反序列化注入任意 atom 进 BEAM atom table:
"user_cancelled" / "timeout" / "shutdown" / "budget_exceeded" / "permission_denied" / "provider_error"其他 string 一律归并为
:unknown并Logger.warning。Atom 入参保持原样透传。 string 入参 Since v0.3。:clear_queue :: boolean()— 是否清空pending_messages(排队中的 prompt); 清空时为每条被丢弃的 prompt emit{:prompt_dropped, text}事件。 默认true。:kill_tools :: :all | :killable | :none— 工具任务清理策略::all— brutal_kill 所有 in-flight 工具(含 interrupt_immune_tools):killable— 只杀非 immune 工具(与 Steering 一致),默认值:none— 不杀任何工具,让它们自然完成 每杀一个工具 emit{:tool_killed, %{name, call_id, reason}}事件。
4 状态行为
| 状态 | 默认行为(killable) | :all | :none |
|---|---|---|---|
:idle | 仅 emit :agent_abort(no-op) | 同 | 同 |
:running | cancel stream task | 同 | 仅 emit |
:streaming | cancel stream task | 同 | 仅 emit |
:executing_tools | 杀非 immune 工具 + cancel stream | 杀全部工具 | 留全部工具 |
无论何种状态,:agent_abort 事件保证发出(订阅方 100ms 内收到,BEAM 调度延迟)。
@spec approve(GenServer.server(), String.t(), keyword()) :: :ok
批准指定的工具审批请求。
选项
:auto_resume— boolean,默认true。当 Agent 已经处于 idle(被 block_tool 拦下后回到 idle), 审批通过后自动开启新 turn 让 LLM 重试被拦截的工具。设为false保持旧版行为 (需调用方再prompt/2才能续)。
自动续接成功时会 emit {:agent_resumed, %{trigger: :tool_approved, approval_id: id}},
上层可订阅事件确认 Agent 已重新进入 running 状态。
@spec attach_tool(GenServer.server(), module()) :: :ok | {:error, :already_attached | :invalid_tool}
运行期挂载新工具。
立即写入 state.tools,下一次 LLM 请求生效(重生成 tools schema)。
In-flight 请求不受影响。
- 已存在同名 tool →
{:error, :already_attached} - 模块未实现
CMDC.Toolbehaviour →{:error, :invalid_tool} - 成功 →
:ok+ emit{:tool_attached, name}
@spec attach_tools(GenServer.server(), [module()]) :: {:ok, [String.t()]} | {:error, {:validation_failed, [{module(), atom()}]}}
批量挂载多个工具(原子操作)。
先对所有 tool 做 dry-run 校验(避免 invalid_tool / already_attached / 列表内重名); 任何一个失败 → 全部回滚,state.tools 完全不变。
- 全部成功 →
{:ok, [name, ...]}+ 每个 emit{:tool_attached, name}+ 汇总 emit{:tools_updated, %{attached: [...], detached: []}} - 失败 →
{:error, {:validation_failed, failures}},其中 failures 是[{module, reason}, ...]
典型场景:用户启用 MCP Server 时一次性挂载该 Server 提供的 N 个工具。
@spec child_spec(start_opts()) :: Supervisor.child_spec()
@spec demonitor(GenServer.server(), reference()) :: :ok
取消通过 monitor/1 登记的崩溃监控。
@spec detach_tool(GenServer.server(), String.t()) :: :ok | {:error, :not_found}
运行期卸载工具。
立即从 state.tools 移除(按 tool.name() 字符串匹配),
下一次 LLM 请求生效。In-flight tool 调用不受影响(仍跑完)。
- 不存在同名 tool →
{:error, :not_found} - 成功 →
:ok+ emit{:tool_detached, name}
注意:若 LLM 在 detach 后仍调用该 tool(已在 streaming 中或下一轮), Agent 会发
{:tool_call_unknown, name, call_id}并自动注入 error tool_result,让 LLM 自我纠正。
@spec detach_tools(GenServer.server(), [String.t()]) :: {:ok, [String.t()]} | {:error, {:validation_failed, [{String.t(), atom()}]}}
批量卸载工具(原子操作)。
先 dry-run 全部找到对应模块,任何一个 :not_found → 全回滚,state.tools 完全不变。
- 全部成功 →
{:ok, [name, ...]}+ 每个 emit{:tool_detached, name}+ 汇总 emit{:tools_updated, %{attached: [], detached: [...]}} - 失败 →
{:error, {:validation_failed, [{name, :not_found}, ...]}}
@spec get_messages(GenServer.server()) :: [CMDC.Message.t()]
获取完整的消息列表(含系统提示词,按时间顺序)。
@spec messages(GenServer.server()) :: [CMDC.Message.t()]
获取完整的消息列表(含系统提示词,按时间顺序)。
@spec monitor(GenServer.server()) :: reference()
登记当前进程对 Agent 的崩溃监控。
Agent 退出(任何原因)时,观察者进程会收到:
{:cmdc_down, ref, session_id, structured_reason}structured_reason 已结构化,常见值:
:normal | :shutdown | {:exception, term},未来扩展:
:max_turns_exceeded | :provider_timeout | {:plugin_aborted, name, why}。
返回 reference(),用 demonitor/2 取消监听。
@spec prompt(GenServer.server(), String.t()) :: %{queued: boolean()}
发送用户 prompt。
- idle 状态下立即处理,返回
%{queued: false} - 忙碌时入队,返回
%{queued: true}
@spec reject(GenServer.server(), String.t(), keyword()) :: :ok
拒绝指定的工具审批请求。
选项
:auto_resume— boolean,默认false。reject 默认不自动续 turn——HumanApproval Plugin 只是把 awaiting 清掉并 emit:approval_resolved(status=:rejected),调用方通常希望让 Agent 保持 idle 等待新 prompt(被拒命令的 ToolMessage 已记录为 is_error,下次 prompt 时 LLM 自然能感知到拒绝结果)。若希望 reject 后 Agent 立刻重新规划(例如让 LLM 走"被拒后换个方案"分支),传
auto_resume: true, 会和 approve 一样开新 turn + emit{:agent_resumed, %{trigger: :tool_rejected, ...}}。
@spec replace_tools(GenServer.server(), [module()]) :: {:ok, %{attached: [String.t()], detached: [String.t()]}} | {:error, {:validation_failed, [{module(), atom()}]}}
替换整张工具表(原子操作)。
自动计算 diff:
- 老 tools 在新列表里 → 保留
- 老 tools 不在新列表里 → detach(emit
{:tool_detached, name}) - 新 tools 不在老列表里 → attach(emit
{:tool_attached, name})
全部 attach validate 失败 → 全回滚,emit {:error, _}。
典型场景:MCP Server 重启 / 配置变更,新 tools 列表完全覆盖旧列表。
返回 {:ok, %{attached: [name, ...], detached: [name, ...]}} + 汇总 emit
{:tools_updated, %{attached, detached}}。
@spec start_link(start_opts()) :: GenServer.on_start()
启动 Agent 状态机。
支持 :hibernate_after_ms 选项,透传到 :gen_statem.start_link/3 的
{:hibernate_after, ms} OTP 原生选项;nil 时不主动 hibernate。
@spec status(GenServer.server()) :: %{ state: :idle | :running | :streaming | :executing_tools, session_id: String.t(), model: String.t(), turns: non_neg_integer(), turns_count: non_neg_integer(), tool_calls: non_neg_integer(), messages_count: non_neg_integer(), total_tokens: non_neg_integer(), cost_usd: float(), token_usage: CMDC.TokenUsage.t(), uptime_ms: non_neg_integer(), active_since_ms: integer(), timestamp_ms: integer(), pending_tools: [ %{ name: String.t(), call_id: String.t(), args: map(), started_at_ms: integer() } ], pending_approvals: [map()], queues: %{prompt_queue: non_neg_integer(), steering_queue: non_neg_integer()} }
获取 Agent 的状态快照(含运行期可观测字段)。
返回结构:
:state— 当前 gen_statem 状态(:idle | :running | :streaming | :executing_tools):session_id— 会话 ID:model— 当前 LLM 模型字符串(可被switch_model/2改变):turns— 已完成轮次数(与:turns_count等同,保留兼容别名):turns_count— 已完成轮次数:tool_calls— 已执行工具调用总次数:messages_count— 当前 messages 列表长度(含 system + user + assistant + tool_result):active_since_ms— Agent 进程启动时间戳(毫秒,绝对时间排序; 与:uptime_ms互补,后者为运行时长):total_tokens— 累计 token 用量(整数,向后兼容字段):cost_usd— 累计美元成本:token_usage—%CMDC.TokenUsage{}struct,含prompt_tokens / completion_tokens / total_tokens / cost_usd / cached_tokens。:uptime_ms— 运行时长(毫秒):timestamp_ms— 快照时间戳:pending_tools— 当前正在执行的工具列表,每项包含name / call_id / args / started_at_ms(started_at_ms单位System.system_time(:millisecond),用于无须自己埋点即可统计 tool 耗时)。:pending_approvals— 当前等待人类审批的工具调用, 从启用了审批的 Plugin(如CMDC.Plugin.Builtin.HumanApproval)的状态汇总; 每项包含id / tool / args / session_id / requested_at等字段。:queues— 各种内部队列长度:%{ prompt_queue: non_neg_integer(), # 排队等执行的 prompt 数 steering_queue: non_neg_integer() # 中段软中断 queue 长度 }
@spec steer(GenServer.server(), reference(), String.t()) :: :ok | {:error, :queue_full | :rejected}
中段软中断(Steering)。
idle状态:等同prompt/2,立刻进入新 turn- 其他状态:text 入
:steering_queue,下个 turn 间隙合并注入
详见 guides/cookbook.md 中的「中段干预(Steering)」配方。
@spec switch_model(GenServer.server(), CMDC.Provider.model()) :: :ok
运行期切换 model。
下一次 LLM 调用立即生效。当前 streaming / executing_tools 不会被打断
(语义:本轮跑完,下一轮再换;如果想立刻打断换模型,请先 abort/2 再 switch_model/2 + prompt/2)。
选项
:provider_opts :: keyword()— 与 model 一并替换 provider 参数(base_url / api_key / timeout); 传nil或不传则保留现有 provider_opts; 典型场景:从 Anthropic 切到 OpenAI 自建网关,需同步base_url、api_key
行为
- 不切换模型(new_model 与 state.model 相同且无 provider_opts 变化)→ no-op,不发事件
- idle 状态:立即更新 state.model / state.config.provider_opts,
emit
{:model_switched, %{from, to, provider_opts_changed?}} - running / streaming / executing_tools:更新 state.model,本轮继续用旧模型, 下一轮自动用新模型;event 立即发出
- messages / tools / plugin_states 都保留(同一会话,换模型继续)
兼容性警告
- 不同模型对 system_prompt 的支持差异(如 OpenAI 的 system 消息 vs Anthropic 的 system 字段) 由 Provider 层处理,不在 switch_model 范围内
- 上下文窗口差异由调用方自行评估(如从 200k token 模型切到 8k 模型,需先 compact)
- 不同模型的 tool_calling schema 兼容性由 Provider 适配
@spec switch_model(GenServer.server(), CMDC.Provider.model(), keyword()) :: :ok