CMDC — Elixir Agent Kernel 公共 API 入口。
提供从零创建 Agent 会话到收集回复的完整链路。
内部通过 CMDC.EventBus 实现异步解耦。
公开 API 矩阵
所有公开 API 同时接受 session pid 和 session_id 字符串。
字符串入参通过 CMDC.SessionRegistry(:via 注册表)反查 pid。
| 能力 | API | 入参类型 | 备注 |
|---|---|---|---|
| 创建会话 | create_agent/1 | keyword | Options.t() | 返回 {:ok, pid} |
| 发送 prompt | prompt/2 | pid | string | 立刻或入队 |
| 收集回复 | collect_reply/2 | pid | string | 阻塞等待 |
| 中段干预 | steer/2 | pid | string | 软中断 |
| 中止 | abort/1 | pid | string | Agent → idle |
| 停止 | stop/2 | pid | string | 关闭 Supervisor |
| 订阅事件 | subscribe/1 | pid | string | 当前进程接收事件 |
| 取消订阅 | unsubscribe/1 | pid | string | — |
| 取 session_id | session_id/1 | pid | string | 字符串入参直接返回 |
| 取 Agent pid | agent_pid/1 | pid | string | — |
| 状态快照 | status/1 | pid | string | %{state, turns, ...} |
| HITL 批准 | approve/3 | pid | string | 默认 auto_resume |
| HITL 拒绝 | reject/3 | pid | string | 默认不续 turn |
| 用户回应 | user_respond/3 | pid | string | 应答 AskUser 工具 |
字符串入参对应的 session 进程必须存活;不存在时 raise ArgumentError。
快速开始
# 1. 创建 Agent 会话
{:ok, session} = CMDC.create_agent(
session_id: "my-session",
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.ReadFile, CMDC.Tool.Shell],
system_prompt: "你是一个专业的代码助手。",
working_dir: "/my/project"
)
# 2. 发送 prompt(pid 或 session_id 字符串都行)
CMDC.prompt(session, "帮我列出当前目录的文件")
CMDC.prompt("my-session", "帮我列出当前目录的文件")
# 3. 等待回复(阻塞直到 Agent 回到 idle)
{:ok, reply} = CMDC.collect_reply(session, timeout: 30_000)
IO.puts(reply)
# 4. 停止会话
CMDC.stop(session)流式接收
CMDC.subscribe(session)
CMDC.prompt(session, "写一首诗")
receive do
{:cmdc_event, _sid, {:message_delta, %{delta: chunk}}} -> IO.write(chunk)
{:cmdc_event, _sid, {:agent_end, _msgs, _usage}} -> :done
endHITL 审批
CMDC.subscribe(session)
CMDC.prompt(session, "删除所有日志文件")
receive do
{:cmdc_event, _sid, {:approval_required, %{id: ref, tool: tool, args: args}}} ->
IO.inspect({tool, args}, label: "批准工具调用?")
CMDC.approve(session, ref) # 或 CMDC.reject(session, ref)
endAskUser 工具回应
receive do
{:cmdc_event, sid, {:ask_user, ^sid, question, options, ref}} ->
# 通过 string session_id 也可以
CMDC.user_respond(sid, ref, "我的答案")
end
Summary
Functions
中止当前 Agent 运行中的任务,Agent 回到 :idle 状态。
返回会话对应的主 Agent pid。
批准一个待审批的工具调用(HITL 审批流)。
运行期挂载新工具。
批量挂载工具(原子操作)。
等待并收集 Agent 的最终文本回复。
创建并启动一个 Agent 会话。
取消通过 monitor/1 登记的 CMDC 崩溃监控。
运行期卸载工具。
批量卸载工具(原子操作)。
获取会话的完整消息列表(按时间顺序,含 system / user / assistant / tool_result)。
登记当前进程对 Agent 的崩溃监控。
向 Agent 发送用户 prompt。
拒绝一个待审批的工具调用(HITL 审批流)。
替换整张工具表(原子操作)。
返回会话的 session_id 字符串。
获取 Agent 当前状态快照。
中段软中断(Steering):在 Agent 执行中段追加新指引,不需要先 abort 再 prompt。
停止 Agent 会话,清理所有资源。
订阅当前进程接收指定 Agent 会话的所有事件。
运行期切换 LLM 模型。
运行期切换 LLM 模型并同步替换 provider 参数(base_url / api_key / timeout 等)。
取消当前进程对指定 Agent 会话的事件订阅。
应答 CMDC.Tool.AskUser 工具的等待请求(HITL 输入流)。
Types
@type create_opts() :: keyword() | CMDC.Options.t()
@type pid_lookup() :: {:ok, pid()} | {:error, :invalid_session | :not_alive}
normalize_to_pid 返回类型。
会话引用 — 既支持 SessionServer pid,也支持 session_id 字符串。
字符串入参会通过 CMDC.SessionRegistry 反查 pid;
当 session 不存在或已死亡时,所有公开 API 会 raise ArgumentError。
Functions
中止当前 Agent 运行中的任务,Agent 回到 :idle 状态。
选项
:reason :: term()— 中止原因。如设置则事件为{:agent_abort, reason}, 否则为裸:agent_abort(向后兼容)。:clear_queue :: boolean()— 是否清空 prompt pending queue,默认true; 清空时为每条被丢弃的 user prompt emit{:prompt_dropped, text}。:kill_tools :: :all | :killable | :none— 工具任务清理策略,默认:killable::all— 杀全部 in-flight 工具:killable— 只杀非 immune 工具(与 Steering 一致):none— 不杀,让工具自然完成 每杀一个工具 emit{:tool_killed, %{name, call_id, reason}}。
状态行为
| 调用时状态 | :agent_abort 是否发出 | 工具行为(默认 :killable) |
|---|---|---|
:idle | ✅ no-op,但仍发出便于订阅方对账 | — |
:running | ✅ | cancel stream task |
:streaming | ✅ | cancel stream task |
:executing_tools | ✅ | 杀非 immune 工具,immune 工具继续 |
事件保证:所有状态下 :agent_abort 在 100ms 内到达订阅方
(BEAM 调度延迟,本地 EventBus 通常 <10ms)。多次 abort 幂等。
返回会话对应的主 Agent pid。
批准一个待审批的工具调用(HITL 审批流)。
approval_id 来自 {:approval_required, approval_map} 事件中的 approval_map.id。
选项
:auto_resume— boolean,默认true。Agent 处于 idle 时(被block_tool拦下后回到 idle), 审批通过后自动开新 turn 让 LLM 重试,无需调用方再prompt/2。 设为false保持旧版"只放行不续 turn"行为。
自动续接成功时会广播 {:agent_resumed, %{trigger: :tool_approved, approval_id: id}},
上层可订阅事件确认 Agent 已重新进入 running 状态。
示例
receive do
{:cmdc_event, _sid, {:approval_required, %{id: aid}}} ->
:ok = CMDC.approve(session, aid)
end
receive do
{:cmdc_event, _sid, {:agent_resumed, %{approval_id: ^aid}}} ->
IO.puts("Agent 已自动续 turn")
end关闭自动续(保留旧行为,需要自行 prompt):
CMDC.approve(session, aid, auto_resume: false)
CMDC.prompt(session, "继续刚才被拦截的操作")
@spec attach_tool(session(), module()) :: :ok | {:error, :already_attached | :invalid_tool | :invalid_session | :not_alive}
运行期挂载新工具。
立即写入 state.tools,下一次 LLM 请求生效(重生成 tools schema)。
In-flight 请求不受影响。
返回
:ok— 挂载成功,emit{:tool_attached, name}{:error, :already_attached}— 已存在同名 tool(按tool.name()比较){:error, :invalid_tool}— 模块未实现CMDC.Toolbehaviour 必需回调
示例
:ok = CMDC.attach_tool(session, MyApp.Tools.GitHubMCP)
# → 下一次 prompt 时 LLM 即可看到该 tool 的 schema 并调用适用场景
- MCP 热加载:用户对话中途说"装个 GitHub MCP",无需重启 Agent
- Skill 进化:升级一个 Skill 不必重启会话
- 基于上下文动态扩展:根据 LLM 当前任务挂载相关工具
@spec attach_tools(session(), [module()]) :: {:ok, [String.t()]} | {:error, {:validation_failed, [{module(), atom()}]}} | {:error, :invalid_session | :not_alive}
批量挂载工具(原子操作)。
dry-run 全部 validate 通过才一次性挂载;任何一个失败 → 全回滚。 典型场景:用户启用 MCP Server 时一次性挂上 N 个工具。
返回 {:ok, [name, ...]} 或 {:error, {:validation_failed, [{module, reason}, ...]}},
reason: :invalid_tool | :already_attached。
emit 单个 {:tool_attached, name} × N + 汇总 {:tools_updated, %{attached, detached: []}}。
@spec collect_reply( session(), keyword() ) :: {:ok, String.t()} | {:error, :timeout | :invalid_session | term()}
等待并收集 Agent 的最终文本回复。
订阅当前会话的 EventBus,等待 {:agent_end, messages, _usage} 事件,
从最后一条 assistant 消息中提取文本内容返回。
选项
:timeout— 等待超时时间(毫秒),默认 60_000(60 秒)
返回
{:ok, text}— 成功收到回复文本{:error, :timeout}— 超时{:error, reason}— Agent 出错或被中止
示例
CMDC.prompt(session, "你好")
{:ok, reply} = CMDC.collect_reply(session, timeout: 30_000)
IO.puts(reply)
@spec create_agent(create_opts()) :: {:ok, session()} | {:error, term()}
创建并启动一个 Agent 会话。
支持三种调用方式:
1. keyword list(推荐)
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
tools: [CMDC.Tool.Shell],
working_dir: "/project"
)2. CMDC.Options.t() struct
opts = CMDC.Options.new!(model: "anthropic:claude-sonnet-4-5")
{:ok, session} = CMDC.create_agent(opts)3. Blueprint 模块
{:ok, session} = CMDC.create_agent(blueprint: MyApp.CodingAgent)必填选项
:model— 模型标识符,如"anthropic:claude-sonnet-4-5"(Blueprint 启动时可省略)
可选选项
:session_id— 会话唯一标识符,默认自动生成:blueprint— Blueprint 模块或CMDC.Blueprint.t()struct:working_dir— 工作目录,默认当前目录:system_prompt— 系统提示词:tools— Tool 模块列表:plugins— Plugin 列表:config—CMDC.Config.t()struct:provider_opts— 传给 req_llm 的选项,如[api_key: "sk-..."]:max_turns— 最大轮数,默认 100:user_data— 业务上下文 map,原样写入ctx.user_data,SubAgent 自动继承:messages— 历史消息列表([CMDC.Message.t()]),按时间顺序传入; Agent 启动时直接写入state.messages,跳过让 LLM 重读历史的轮次, 适用于 idle 休眠唤醒 / 跨进程恢复
历史消息恢复
history = [
CMDC.Message.user("帮我审核这段代码"),
CMDC.Message.assistant("好的,请贴上来", [], [])
]
{:ok, session} = CMDC.create_agent(
model: "anthropic:claude-sonnet-4-5",
messages: history
)
CMDC.prompt(session, "def hello, do: :world")
# → 续接历史,不会让 LLM 重新读 history返回 {:ok, session_pid} 或 {:error, reason}。
取消通过 monitor/1 登记的 CMDC 崩溃监控。
@spec detach_tool(session(), String.t()) :: :ok | {:error, :not_found | :invalid_session | :not_alive}
运行期卸载工具。
立即从 state.tools 移除(按 tool.name() 字符串匹配),
下一次 LLM 请求生效。已 in-flight 的 tool 调用不受影响(继续执行完)。
返回
:ok— 卸载成功,emit{:tool_detached, name}{:error, :not_found}— 不存在同名 tool
示例
:ok = CMDC.detach_tool(session, "shell")注意
若 LLM 在 detach 后仍调用该 tool(已在 streaming 中或下一轮),
Agent 会发 {:tool_call_unknown, name, call_id} 事件,
并在工具结果中注入 {:error, "tool not found"} 让 LLM 自我纠正。
@spec detach_tools(session(), [String.t()]) :: {:ok, [String.t()]} | {:error, {:validation_failed, [{String.t(), atom()}]}} | {:error, :invalid_session | :not_alive}
批量卸载工具(原子操作)。
返回 {:ok, [name, ...]} 或 {:error, {:validation_failed, [{name, :not_found}, ...]}}。
emit 单个 {:tool_detached, name} × N + 汇总 {:tools_updated, %{attached: [], detached}}。
@spec messages(session()) :: [CMDC.Message.t()] | {:error, :invalid_session | :not_alive}
获取会话的完整消息列表(按时间顺序,含 system / user / assistant / tool_result)。
适用场景:
- 切换模型前查看历史
- 持久化对话用于断点恢复
- 审计日志
登记当前进程对 Agent 的崩溃监控。
与 Process.monitor/1 的区别:CMDC.monitor 返回的 reference() 是 CMDC 自维护的,
Agent 进程退出时观察者收到结构化 reason:
{:cmdc_down, ref, session_id, structured_reason}Structured Reason 表
:normal— 正常退出(supervisor shutdown /CMDC.stop/2明确调用){:exception, exception}— 未捕获异常(含 RuntimeError / ArgumentError 等):max_turns_exceeded— 超出Options.max_turns(预留):provider_timeout— Provider.stream 超时(预留){:plugin_aborted, plugin_name, why}— Plugin 返回:abort导致 Agent 终止 (预留;当前 plugin abort 仅将 Agent 回到 idle 不退出进程)- 其他原子/元组 — 原样透传(未知 reason)
示例
ref = CMDC.monitor(session)
receive do
{:cmdc_down, ^ref, sid, :normal} ->
IO.puts("Agent #{sid} exited normally")
{:cmdc_down, ^ref, sid, {:exception, e}} ->
IO.puts("Agent #{sid} crashed: #{Exception.message(e)}")
end与 Process.monitor/1 区别
Process.monitor只给{:DOWN, ref, :process, pid, reason},reason 是 raw 的:shutdown/{:shutdown, _}/ 异常元组,需要调用方自行解析CMDC.monitor返回 CMDC 自己的 ref,reason 已结构化,可直接做 pattern match
@spec prompt(session(), String.t()) :: %{queued: boolean()} | {:error, :invalid_session | :not_alive}
向 Agent 发送用户 prompt。
- 若 Agent 处于 idle,立即开始处理,返回
%{queued: false} - 若 Agent 正忙,消息入队,处理完当前任务后自动处理,返回
%{queued: true}
示例
%{queued: false} = CMDC.prompt(session, "帮我写一个 Elixir GenServer")
拒绝一个待审批的工具调用(HITL 审批流)。
选项
:auto_resume— boolean,默认false。reject 通常希望让 Agent 保持 idle 等待新 prompt, 被拒命令的 ToolMessage 已记录为 is_error,下次用户 prompt 时 LLM 自然能感知拒绝结果。若希望 reject 后 Agent 立刻重新规划(让 LLM 走"被拒后换个方案"分支),传
auto_resume: true, 会广播{:agent_resumed, %{trigger: :tool_rejected, approval_id: id}}并自动续 turn。
示例
:ok = CMDC.reject(session, approval_id)
:ok = CMDC.reject(session, approval_id, auto_resume: true)
@spec replace_tools(session(), [module()]) :: {:ok, %{attached: [String.t()], detached: [String.t()]}} | {:error, {:validation_failed, [{module(), atom()}]}} | {:error, :invalid_session | :not_alive}
替换整张工具表(原子操作)。
自动 diff:保留已存在的(按 tool.name())、attach 新的、detach 缺失的。
全部 validate 失败 → 全回滚。典型场景:MCP Server 重启 / 配置变更。
返回 {:ok, %{attached: [name, ...], detached: [name, ...]}} 或
{:error, {:validation_failed, [{module, :invalid_tool | :duplicate_in_list}, ...]}}。
emit 单个 {:tool_attached/detached, name} × N + 汇总 {:tools_updated, %{attached, detached}}。
返回会话的 session_id 字符串。
字符串入参直接返回(不查注册表,便于纯 string 流水线)。 pid 入参时会查询 Agent 状态获取 session_id。
获取 Agent 当前状态快照。
返回:
%{
state: :idle | :running | :streaming | :executing_tools,
session_id: String.t(),
turns: non_neg_integer(),
cost_usd: float(),
uptime_ms: non_neg_integer(),
timestamp_ms: integer()
}
@spec steer(session(), String.t()) :: {:ok, reference()} | {:error, :queue_full | :rejected | :invalid_text | :invalid_session | :not_alive}
中段软中断(Steering):在 Agent 执行中段追加新指引,不需要先 abort 再 prompt。
详见 guides/cookbook.md 中的「中段干预(Steering)」配方。
行为
idle状态:等同prompt/2,立刻进入新 turnrunning/streaming/executing_tools:入:steering_queue,下个 turn 间隙合并注入executing_tools同时杀掉 killable in-flight tool(白名单豁免见Options.interrupt_immune_tools)
返回
{:ok, ref}— 已入队(不等同已生效;生效信号通过:steering_applied事件广播){:error, :queue_full}— Steering queue 已满(默认 3,可由Options.max_steering_queue配置){:error, :rejected}— Plugin 拦截:before_steering返回:abort{:error, :invalid_text}— 入参非合法字符串
示例
CMDC.subscribe(session)
CMDC.prompt(session, "搜索 Elixir gen_statem 教程,分析每个并整理成文档")
# 一段时间后用户改主意
{:ok, _ref} = CMDC.steer(session, "改成只看官方文档,不要第三方教程")
receive do
{:cmdc_event, _sid, {:steering_applied, %{count: n}}} ->
IO.puts("已注入 #{n} 条 steering")
end
停止 Agent 会话,清理所有资源。
优雅停止 SessionServer 监督树(级联停止 SubAgent.Supervisor 和主 Agent)。
选项
:reason— 停止原因,默认:normal:timeout— 等待停止超时(毫秒),默认 5_000
示例
:ok = CMDC.stop(session)
订阅当前进程接收指定 Agent 会话的所有事件。
订阅后,当前进程将收到 {:cmdc_event, session_id, event} 格式的消息。
常见事件:
{:stream_chunk, session_id, chunk}— 流式 token{:tool_start, session_id, tool_name, args}— 工具开始{:tool_end, session_id, tool_name, result}— 工具完成{:agent_end, messages, %CMDC.TokenUsage{}}— 回复完成{:approval_required, session_id, tool_name, args, ref}— 等待审批
选项
:since :: non_neg_integer()— 从该 index 之后的事件开始 replay。 要求 Agent 启动时已开启 ring buffer(Options.event_buffer_size > 0)。 用于 WebSocket / Channel 重连补帧,避免断网那一秒丢的 stream_chunk / tool_end 永远丢失。 可通过CMDC.EventBus.last_index/1获取当前最新 index 作为下次重连的:since。
示例
CMDC.subscribe(session)
CMDC.prompt(session, "hello")
receive do
{:cmdc_event, _sid, {:agent_end, _msgs, _usage}} -> IO.puts("done")
end
# 重连补帧(需 Options.event_buffer_size > 0)
CMDC.subscribe(session, since: last_seen_index)
@spec switch_model(session(), CMDC.Provider.model()) :: :ok | {:error, :invalid_session | :not_alive}
运行期切换 LLM 模型。
下一次 LLM 请求立即用新模型,不重启 Agent,messages / tools / plugin 状态全部保留。
行为
- idle 状态:立即更新
state.model,emit{:model_switched, %{from, to}}, 后续prompt/2用新模型 - running / streaming:当前请求继续用旧模型完成;
state.model已更新;{:model_switched, ...}立即发出;下一个 turn 用新模型 - executing_tools:同上,工具继续执行,下一轮 LLM 调用用新模型
- 同模型切换(new_model == 当前 model):no-op,不发事件
触发方式
方式一:调用方主动切换
:ok = CMDC.switch_model(session, "openai:gpt-4o-mini")方式二:Plugin Action
Plugin 在 :on_tool_error / :after_response / :after_tool_batch 等钩子中返回:
def handle_event({:on_tool_error, _name, _call_id, _err, _attempt}, state, _ctx) do
{:switch_model, "openai:gpt-4o-mini", state}
endPipeline 收集后由 Agent 自动执行(无需调用方介入)。
兼容性警告
- system_prompt 兼容性:不同 Provider 对 system prompt 的支持差异由 Provider 层处理
- 上下文窗口差异:从 200k 模型切到 8k 模型时,调用方需自行评估是否要先压缩上下文
- tool_calling schema 差异:由 Provider 适配(OpenAI / Anthropic / Gemini 各自不同)
事件
{:model_switched, %{from: "anthropic:claude-sonnet-4-5", to: "openai:gpt-4o-mini"}}
@spec switch_model(session(), CMDC.Provider.model(), keyword()) :: :ok | {:error, :invalid_session | :not_alive}
运行期切换 LLM 模型并同步替换 provider 参数(base_url / api_key / timeout 等)。
选项
:provider_opts :: keyword()— 与 model 一并替换 provider 参数(base_url / api_key / timeout);nil或不传则保留现有 provider_opts; 典型场景:从 Anthropic 切到 OpenAI 自建网关,需同步 base_url + api_key
事件 {:model_switched, %{from, to, provider_opts_changed?}} 含 provider_opts_changed?
字段,便于订阅方判断是否需要刷新连接池。
示例
:ok = CMDC.switch_model(session, "openai:gpt-4o-mini",
provider_opts: [base_url: "https://api.custom.com/v1", api_key: "sk-..."])
@spec unsubscribe(session()) :: :ok | {:error, :invalid_session}
取消当前进程对指定 Agent 会话的事件订阅。
应答 CMDC.Tool.AskUser 工具的等待请求(HITL 输入流)。
AskUser 工具会广播 {:ask_user, session_id, question, options, ref} 并阻塞等待
{:user_responded, session_id, ref, response} 事件。本函数提供与 approve/3 /
reject/3 对称的公开 API,避免上层直接调用 EventBus.broadcast/2。
参数
session— pid 或 session_id 字符串ref—:ask_user事件中的ref(字符串)response— 任意可序列化的应答(字符串、字符串列表、map)
示例
receive do
{:cmdc_event, sid, {:ask_user, ^sid, _question, _options, ref}} ->
CMDC.user_respond(sid, ref, "我的回答")
end