CMDC.Agent (cmdc v0.5.2)

Copy Markdown View Source

Agent 状态机 — 使用 :gen_statem 编排四状态循环。

状态图

      prompt     stream   
     idle  running  streaming 
                          
                                                  
                                     finalize     
                                                  
                          
              tool_calls?          
              yes  executing_tools
              no   finish/retry   
            
                       
               
                executing_tools    Task 完成后
                collect results     回到 running
               
                        all done
                       
                  run_turn (loop)
                        no more tool_calls
        finish  idle

流式消息协议

Agent 通过 CMDC.Provider.stream/4 发起 LLM 请求。 CMDC.Provider.StreamBridge 将 req_llm 的 StreamResponse 转为消息:

消息处理
{:cmdc_stream_chunk, %StreamChunk{}}由内部 Stream 处理器更新状态
:cmdc_stream_done触发 finalize_response/1
{:cmdc_stream_error, reason}设置 stream_errored,触发错误恢复

关键改造(相对旧项目)

  • 移除 Agent.Registry 进程注册表(随 Orchestrator 走)
  • 移除 shadow 功能
  • Emitter 适配新 CMDC.EventBus(替代旧 CMDC.Events
  • Plugin.Registry 从旧 from_specs/1 移植,支持 Module{Module, opts} 两种格式
  • config 字段保持为 map(待 1.9 任务改为 Config.t()

使用示例

{:ok, pid} = CMDC.Agent.start_link(
  session_id: "abc",
  model: "anthropic:claude-sonnet-4-5",
  working_dir: "/project",
  tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell],
  provider_opts: [api_key: "sk-..."]
)

%{queued: false} = CMDC.Agent.prompt(pid, "Hello")

Summary

Functions

中止当前运行。

批准指定的工具审批请求。

运行期挂载新工具。

批量挂载多个工具(原子操作)。

取消通过 monitor/1 登记的崩溃监控。

运行期卸载工具。

批量卸载工具(原子操作)。

获取完整的消息列表(含系统提示词,按时间顺序)。

获取完整的消息列表(含系统提示词,按时间顺序)。

登记当前进程对 Agent 的崩溃监控。

发送用户 prompt。

拒绝指定的工具审批请求。

替换整张工具表(原子操作)。

启动 Agent 状态机。

获取 Agent 的状态快照(含运行期可观测字段)。

中段软中断(Steering)。

运行期切换 model。

Types

start_opts()

@type start_opts() :: [
  session_id: String.t(),
  model: CMDC.Provider.model(),
  working_dir: String.t(),
  blueprint_system_prompt: String.t(),
  tools: [module()],
  disabled_tools: [String.t()],
  plugins: [CMDC.Plugin.Registry.plugin_spec()],
  config: map(),
  provider_opts: keyword(),
  messages: [CMDC.Message.t()],
  sandbox: module() | nil,
  subagents: [map()],
  user_data: map(),
  prompt_mode: :full | :task | :minimal | :none,
  event_buffer_size: non_neg_integer(),
  hibernate_after_ms: pos_integer() | nil
]

Functions

abort(agent, opts \\ [])

@spec abort(
  GenServer.server(),
  keyword()
) :: :ok

中止当前运行。

选项

  • :reason :: term() — 中止原因,写入 {:agent_abort, reason} payload。 默认 nil(透传,发裸 :agent_abort 事件)。

    入参可以是 atom 或 string。下列 6 个标准 string 会自动归一为同名 atom, 防止前端通过 JSON 反序列化注入任意 atom 进 BEAM atom table:

    "user_cancelled" / "timeout" / "shutdown" /
    "budget_exceeded" / "permission_denied" / "provider_error"

    其他 string 一律归并为 :unknownLogger.warning。Atom 入参保持原样透传。 string 入参 Since v0.3

  • :clear_queue :: boolean() — 是否清空 pending_messages(排队中的 prompt); 清空时为每条被丢弃的 prompt emit {:prompt_dropped, text} 事件。 默认 true

  • :kill_tools :: :all | :killable | :none — 工具任务清理策略:

    • :all — brutal_kill 所有 in-flight 工具(含 interrupt_immune_tools)
    • :killable — 只杀非 immune 工具(与 Steering 一致),默认值
    • :none — 不杀任何工具,让它们自然完成 每杀一个工具 emit {:tool_killed, %{name, call_id, reason}} 事件。

4 状态行为

状态默认行为(killable):all:none
:idle仅 emit :agent_abort(no-op)
:runningcancel stream task仅 emit
:streamingcancel stream task仅 emit
:executing_tools杀非 immune 工具 + cancel stream杀全部工具留全部工具

无论何种状态,:agent_abort 事件保证发出(订阅方 100ms 内收到,BEAM 调度延迟)。

approve(agent, approval_id, opts \\ [])

@spec approve(GenServer.server(), String.t(), keyword()) :: :ok

批准指定的工具审批请求。

选项

  • :auto_resume — boolean,默认 true。当 Agent 已经处于 idle(被 block_tool 拦下后回到 idle), 审批通过后自动开启新 turn 让 LLM 重试被拦截的工具。设为 false 保持旧版行为 (需调用方再 prompt/2 才能续)。

自动续接成功时会 emit {:agent_resumed, %{trigger: :tool_approved, approval_id: id}}, 上层可订阅事件确认 Agent 已重新进入 running 状态。

attach_tool(agent, tool_module)

@spec attach_tool(GenServer.server(), module()) ::
  :ok | {:error, :already_attached | :invalid_tool}

运行期挂载新工具。

立即写入 state.tools,下一次 LLM 请求生效(重生成 tools schema)。 In-flight 请求不受影响。

  • 已存在同名 tool → {:error, :already_attached}
  • 模块未实现 CMDC.Tool behaviour → {:error, :invalid_tool}
  • 成功 → :ok + emit {:tool_attached, name}

attach_tools(agent, tool_modules)

@spec attach_tools(GenServer.server(), [module()]) ::
  {:ok, [String.t()]} | {:error, {:validation_failed, [{module(), atom()}]}}

批量挂载多个工具(原子操作)。

先对所有 tool 做 dry-run 校验(避免 invalid_tool / already_attached / 列表内重名); 任何一个失败 → 全部回滚,state.tools 完全不变。

  • 全部成功 → {:ok, [name, ...]} + 每个 emit {:tool_attached, name} + 汇总 emit {:tools_updated, %{attached: [...], detached: []}}
  • 失败 → {:error, {:validation_failed, failures}},其中 failures 是 [{module, reason}, ...]

典型场景:用户启用 MCP Server 时一次性挂载该 Server 提供的 N 个工具。

child_spec(opts)

@spec child_spec(start_opts()) :: Supervisor.child_spec()

demonitor(agent, ref)

@spec demonitor(GenServer.server(), reference()) :: :ok

取消通过 monitor/1 登记的崩溃监控。

detach_tool(agent, tool_name)

@spec detach_tool(GenServer.server(), String.t()) :: :ok | {:error, :not_found}

运行期卸载工具。

立即从 state.tools 移除(按 tool.name() 字符串匹配), 下一次 LLM 请求生效。In-flight tool 调用不受影响(仍跑完)。

  • 不存在同名 tool → {:error, :not_found}
  • 成功 → :ok + emit {:tool_detached, name}

注意:若 LLM 在 detach 后仍调用该 tool(已在 streaming 中或下一轮), Agent 会发 {:tool_call_unknown, name, call_id} 并自动注入 error tool_result,让 LLM 自我纠正。

detach_tools(agent, tool_names)

@spec detach_tools(GenServer.server(), [String.t()]) ::
  {:ok, [String.t()]} | {:error, {:validation_failed, [{String.t(), atom()}]}}

批量卸载工具(原子操作)。

先 dry-run 全部找到对应模块,任何一个 :not_found → 全回滚,state.tools 完全不变。

  • 全部成功 → {:ok, [name, ...]} + 每个 emit {:tool_detached, name} + 汇总 emit {:tools_updated, %{attached: [], detached: [...]}}
  • 失败 → {:error, {:validation_failed, [{name, :not_found}, ...]}}

get_messages(agent)

@spec get_messages(GenServer.server()) :: [CMDC.Message.t()]

获取完整的消息列表(含系统提示词,按时间顺序)。

messages(agent)

@spec messages(GenServer.server()) :: [CMDC.Message.t()]

获取完整的消息列表(含系统提示词,按时间顺序)。

monitor(agent)

@spec monitor(GenServer.server()) :: reference()

登记当前进程对 Agent 的崩溃监控。

Agent 退出(任何原因)时,观察者进程会收到:

{:cmdc_down, ref, session_id, structured_reason}

structured_reason 已结构化,常见值: :normal | :shutdown | {:exception, term},未来扩展: :max_turns_exceeded | :provider_timeout | {:plugin_aborted, name, why}

返回 reference(),用 demonitor/2 取消监听。

prompt(agent, text)

@spec prompt(GenServer.server(), String.t()) :: %{queued: boolean()}

发送用户 prompt。

  • idle 状态下立即处理,返回 %{queued: false}
  • 忙碌时入队,返回 %{queued: true}

reject(agent, approval_id, opts \\ [])

@spec reject(GenServer.server(), String.t(), keyword()) :: :ok

拒绝指定的工具审批请求。

选项

  • :auto_resume — boolean,默认 false。reject 默认不自动续 turn——HumanApproval Plugin 只是把 awaiting 清掉并 emit :approval_resolved (status=:rejected),调用方通常希望让 Agent 保持 idle 等待新 prompt(被拒命令的 ToolMessage 已记录为 is_error,下次 prompt 时 LLM 自然能感知到拒绝结果)。

    若希望 reject 后 Agent 立刻重新规划(例如让 LLM 走"被拒后换个方案"分支),传 auto_resume: true, 会和 approve 一样开新 turn + emit {:agent_resumed, %{trigger: :tool_rejected, ...}}

replace_tools(agent, tool_modules)

@spec replace_tools(GenServer.server(), [module()]) ::
  {:ok, %{attached: [String.t()], detached: [String.t()]}}
  | {:error, {:validation_failed, [{module(), atom()}]}}

替换整张工具表(原子操作)。

自动计算 diff:

  • 老 tools 在新列表里 → 保留
  • 老 tools 不在新列表里 → detach(emit {:tool_detached, name}
  • 新 tools 不在老列表里 → attach(emit {:tool_attached, name}

全部 attach validate 失败 → 全回滚,emit {:error, _}。 典型场景:MCP Server 重启 / 配置变更,新 tools 列表完全覆盖旧列表。

返回 {:ok, %{attached: [name, ...], detached: [name, ...]}} + 汇总 emit {:tools_updated, %{attached, detached}}

start_link(opts)

@spec start_link(start_opts()) :: GenServer.on_start()

启动 Agent 状态机。

支持 :hibernate_after_ms 选项,透传到 :gen_statem.start_link/3{:hibernate_after, ms} OTP 原生选项;nil 时不主动 hibernate。

status(agent)

@spec status(GenServer.server()) :: %{
  state: :idle | :running | :streaming | :executing_tools,
  session_id: String.t(),
  model: String.t(),
  turns: non_neg_integer(),
  turns_count: non_neg_integer(),
  tool_calls: non_neg_integer(),
  messages_count: non_neg_integer(),
  total_tokens: non_neg_integer(),
  cost_usd: float(),
  token_usage: CMDC.TokenUsage.t(),
  uptime_ms: non_neg_integer(),
  active_since_ms: integer(),
  timestamp_ms: integer(),
  pending_tools: [
    %{
      name: String.t(),
      call_id: String.t(),
      args: map(),
      started_at_ms: integer()
    }
  ],
  pending_approvals: [map()],
  queues: %{prompt_queue: non_neg_integer(), steering_queue: non_neg_integer()}
}

获取 Agent 的状态快照(含运行期可观测字段)。

返回结构:

  • :state — 当前 gen_statem 状态(:idle | :running | :streaming | :executing_tools

  • :session_id — 会话 ID

  • :model — 当前 LLM 模型字符串(可被 switch_model/2 改变)

  • :turns — 已完成轮次数(与 :turns_count 等同,保留兼容别名)

  • :turns_count — 已完成轮次数

  • :tool_calls — 已执行工具调用总次数

  • :messages_count — 当前 messages 列表长度(含 system + user + assistant + tool_result)

  • :active_since_ms — Agent 进程启动时间戳(毫秒,绝对时间排序; 与 :uptime_ms 互补,后者为运行时长)

  • :total_tokens — 累计 token 用量(整数,向后兼容字段)

  • :cost_usd — 累计美元成本

  • :token_usage%CMDC.TokenUsage{} struct,含 prompt_tokens / completion_tokens / total_tokens / cost_usd / cached_tokens

  • :uptime_ms — 运行时长(毫秒)

  • :timestamp_ms — 快照时间戳

  • :pending_tools — 当前正在执行的工具列表,每项包含 name / call_id / args / started_at_msstarted_at_ms 单位 System.system_time(:millisecond),用于无须自己埋点即可统计 tool 耗时)。

  • :pending_approvals — 当前等待人类审批的工具调用, 从启用了审批的 Plugin(如 CMDC.Plugin.Builtin.HumanApproval)的状态汇总; 每项包含 id / tool / args / session_id / requested_at 等字段。

  • :queues — 各种内部队列长度:

      %{
        prompt_queue:   non_neg_integer(),  # 排队等执行的 prompt 数
        steering_queue: non_neg_integer()   # 中段软中断 queue 长度
      }

steer(agent, ref, text)

@spec steer(GenServer.server(), reference(), String.t()) ::
  :ok | {:error, :queue_full | :rejected}

中段软中断(Steering)。

  • idle 状态:等同 prompt/2,立刻进入新 turn
  • 其他状态:text 入 :steering_queue,下个 turn 间隙合并注入

详见 guides/cookbook.md 中的「中段干预(Steering)」配方。

switch_model(agent, new_model)

@spec switch_model(GenServer.server(), CMDC.Provider.model()) :: :ok

运行期切换 model。

下一次 LLM 调用立即生效。当前 streaming / executing_tools 不会被打断 (语义:本轮跑完,下一轮再换;如果想立刻打断换模型,请先 abort/2switch_model/2 + prompt/2)。

选项

  • :provider_opts :: keyword() — 与 model 一并替换 provider 参数(base_url / api_key / timeout); 传 nil 或不传则保留现有 provider_opts; 典型场景:从 Anthropic 切到 OpenAI 自建网关,需同步 base_urlapi_key

行为

  • 不切换模型(new_model 与 state.model 相同且无 provider_opts 变化)→ no-op,不发事件
  • idle 状态:立即更新 state.model / state.config.provider_opts, emit {:model_switched, %{from, to, provider_opts_changed?}}
  • running / streaming / executing_tools:更新 state.model,本轮继续用旧模型, 下一轮自动用新模型;event 立即发出
  • messages / tools / plugin_states 都保留(同一会话,换模型继续)

兼容性警告

  • 不同模型对 system_prompt 的支持差异(如 OpenAI 的 system 消息 vs Anthropic 的 system 字段) 由 Provider 层处理,不在 switch_model 范围内
  • 上下文窗口差异由调用方自行评估(如从 200k token 模型切到 8k 模型,需先 compact)
  • 不同模型的 tool_calling schema 兼容性由 Provider 适配

switch_model(agent, new_model, opts)

@spec switch_model(GenServer.server(), CMDC.Provider.model(), keyword()) :: :ok