CMDC Agent 是一个 :gen_statem,4 个状态、7 大类事件。看完本章你能精确预测
任何 API 在任何状态下的行为。
4 个状态
| 状态 | 含义 | 主要外部 API 行为 |
|---|---|---|
:idle | 空闲,等下一条 prompt | prompt/2 立即开新 turn;steer/3 退化为 prompt |
:running | 已发出 LLM 请求,等首个 chunk | prompt/2 入队;steer/3 入 steering queue |
:streaming | 正在接收 LLM 流式输出 | 同上 |
:executing_tools | LLM 已停,工具批次执行中 | 同上 + 当前批次结束才合并 steering |
状态转换图
┌─────────────────────────────┐
│ │
▼ │
┌──────────┐ prompt/2 ┌──────────┐ stream chunk │
│ idle │────────────▶│ running │───────────┐ │
└──────────┘ └──────────┘ ▼ │
▲ │ ┌──────────┐
│ agent_end / │ stream │streaming │
│ abort │ done └──────────┘
│ ▼ │
│ ┌────────────────┐ │ has tool_calls
│ │ finalize + │◀────┘
│ │ decide next │
│ └────────────────┘
│ │ has tool_calls
│ ▼
│ ┌──────────────────┐
└──────────────────│ executing_tools │
batch done └──────────────────┘
+next turn4 状态接收行为
prompt/2
| 状态 | 行为 |
|---|---|
:idle | 立即开新 turn,转 :running |
| 其他 | 入 pending_messages 队列;本轮结束后自动消费下一条 |
steer/3(中段干预)
| 状态 | 行为 |
|---|---|
:idle | 退化为 prompt/2 |
:running | 入 steering queue,下一 turn 间隙合并注入 |
:streaming | 同上 |
:executing_tools | 同上 + 当前批次内 killable 工具立即被 brutal_kill |
队列满(默认 3)时返回 {:error, :queue_full},发出 :steering_received
事件 status=:rejected_full。Plugin 可在 {:before_steering, text} hook
返回 :abort 拒绝某条 steering。
abort/2
| 状态 | 默认行为(kill_tools: :killable) |
|---|---|
:idle | no-op,但仍 emit :agent_abort 便于订阅方对账 |
:running / :streaming | cancel stream task |
:executing_tools | 杀非 immune 工具,immune 工具继续;同时 cancel stream |
无论何种状态都保证 :agent_abort 事件 100ms 内到达订阅方。
approve/3 / reject/3
HumanApproval Plugin 在 :before_tool 拦下危险工具,把 Agent 暂时退到
:idle 等审批。approve/reject 在 idle 状态下:
:auto_resume默认true(approve)/false(reject)- auto_resume 后 Agent emit
{:agent_resumed, %{trigger: ...}},重新进:running让 LLM 重试
switch_model/2 / 3
| 状态 | 行为 |
|---|---|
:idle | 立即切;下一次 prompt/2 用新模型;emit :model_switched |
:running / :streaming / :executing_tools | state.model 立即更新 + emit;本轮跑完,下一轮才用新模型 |
切到同一模型 = no-op,不发事件。messages / tools / plugin_states 全保留。
完整事件清单
事件外层格式:{:cmdc_event, session_id, event}。下面只列 event 部分。
会话生命周期
:agent_start— Agent 开始处理 prompt{:agent_end, messages, %CMDC.TokenUsage{}}— 本轮完成{:agent_abort, reason}/:agent_abort— 中止(reason 为 nil 时发裸 atom){:agent_resumed, %{trigger}}— 因外部信号续命;trigger∈:tool_approved | :tool_rejected | :tool_approval_timeout | :user_respond | :steering{:prompt_received, text}— 收到用户 prompt{:prompt_queued, text}— 忙碌中入队{:prompt_dropped, text}—abort(:clear_queue)丢弃的 prompt{:prompt_rejected, reason}— Plugin 拒绝
流式响应
:message_start{:message_delta, %{delta: text}}{:response_complete, %CMDC.Message{}}:thinking_start{:thinking_delta, %{delta}}{:status_update, text}/{:title_generated, title}—<status>...</status>/<title>...</title>内联 XML 标签提取
Provider / 请求
{:request_start, %{model, messages}}{:stream_error, reason}/{:stream_stalled, elapsed_s}{:retry, attempt, delay_ms, reason}{:context_overflow, reason}
工具执行
{:tool_calls, count}— LLM 请求了 N 个工具{:tool_execution_start, name, call_id, args}{:tool_execution_end, name, call_id, result}— result 为{:ok, _} | {:error, _}{:tool_execution_metrics, name, call_id, %{started_at_ms, ended_at_ms, duration_ms}}— 自动埋点{:tool_blocked, name, call_id, reason}— Plugin block_tool{:tool_killed, %{name, call_id, reason}}— abort 杀掉的{:tool_skipped_for_steering, %{name, call_id, reason}}— Steering 跳过{:tool_attached, name}/{:tool_detached, name}—attach_tool/2/detach_tool/2调用结果{:tools_updated, %{attached, detached}}— 批量原子操作汇总{:tool_call_unknown, name, call_id}— LLM 引用了已 detach 的工具{:loop_detected, %{type, ...}}— 内建循环检测;type 为:repeat_pattern | :file_loop_warn | :file_loop_abort
人机交互
{:approval_required, approval_map}— 等待审批{:approval_resolved, approval_map}— 已决定(含:status∈:approved | :rejected | :timeout){:tool_approved_always, %{tool, command_family}}—approve_always白名单写入{:ask_user, sid, question, options, ref}— Agent 主动提问{:user_responded, sid, ref, response}— 用户已答
中段干预
{:steering_received, %{ref, text, queued_at, status}}—steer/2调用结果{:steering_applied, %{refs, count}}— queue 已合并到下一 turn{:tool_skipped_for_steering, ...}— 工具因 steering 取消
上下文压缩
{:compact_start, sid}/{:compact_end, sid, removed_count}{:before_compact, messages}— Plugin hook 同名
子代理
{:subagent_start, sid, child_sid, description}{:subagent_end, sid, child_sid, result}{:sub_agent_event, call_id, child_sid, event}— 子代理内部事件透传
模型 / 计划 / 通知
{:model_switched, %{from, to, provider_opts_changed?}}{:todo_change, sid, todos}—WriteTodos工具更新{:plan_generated, %CMDC.Plan{}}—PlanningPlugin 解析{:memory_flushed, %{facts, count, sid}}—MemoryFlush写入{:large_result_offloaded, %{tool, call_id, path, bytes}}{:content_policy_violated, %{summary, triggered_policies}}{:plugin_event, name, payload}— 自定义事件(payload map 自动注入 user_data){:intervention, prompt}— Agent 注入干预 prompt
订阅模板
defmodule MyApp.AgentObserver do
use GenServer
def start_link(session) do
GenServer.start_link(__MODULE__, session)
end
def init(session) do
CMDC.subscribe(session)
CMDC.monitor(session)
{:ok, %{session: session, deltas: []}}
end
def handle_info({:cmdc_event, _sid, {:message_delta, %{delta: t}}}, st) do
IO.write(t)
{:noreply, %{st | deltas: [t | st.deltas]}}
end
def handle_info({:cmdc_event, _sid, {:tool_execution_start, name, _, _}}, st) do
IO.puts("\n [tool] #{name} ...")
{:noreply, st}
end
def handle_info({:cmdc_event, _sid, {:agent_end, _msgs, usage}}, st) do
IO.puts("\n[done] tokens=#{usage.total_tokens} cost=#{usage.cost_usd}")
{:noreply, st}
end
def handle_info({:cmdc_down, _ref, _sid, reason}, st) do
IO.puts("[crashed] #{inspect(reason)}")
{:stop, :normal, st}
end
def handle_info(_, st), do: {:noreply, st}
end重连补帧(断网恢复)
启用 ring buffer:
{:ok, session} = CMDC.create_agent(
model: "...",
event_buffer_size: 200 # 默认 0 = 关闭
)订阅时给 :since:
last_index = MyApp.LastSeen.read(session_id)
{:ok, _} = CMDC.subscribe(session, since: last_index, types: [:message_delta, :agent_end])可选 :types 白名单只 replay 关心的事件,省网络流量。
下一步
- 写一个 Plugin — 在 13 个 hook 拦截 + 收集事件
- 常见配方 — 流式 UI / HITL / Checkpoint 等组合范例