Plugin behaviour 定义。所有插件实现此 behaviour。
Plugin 系统是拦截 Agent 生命周期的核心切面:14 个事件 × 8 种 action。
本 moduledoc 给出速查表与最小实现示例。完整范例见 guides/cookbook.md。
14 种事件(event)— 速查表
| 事件 | 触发时机 | 可用 action |
|---|---|---|
:session_start | Agent 会话刚启动 | continue/abort/emit |
:session_end | Agent 会话正常结束 | continue/emit |
{:after_turn, payload} | Agent 即将回 idle 前(finish + abort 两种路径) | continue/emit |
{:before_prompt, text} | 用户 prompt 提交前 | continue/intervene/abort/skip/emit |
{:before_request, messages} | 发送 LLM 请求前,可修改消息列表 | continue/intervene/abort/skip/emit/switch_model |
{:after_response, assistant_msg} | 收到 LLM 回复后,可分析内容 | continue/intervene/abort/skip/emit/switch_model |
{:before_tool, name, args} | 工具执行前,可阻止或替换参数 | continue/block_tool/replace_tool_args/abort/emit |
{:on_tool_error, name, call_id, error, attempt} | 工具执行失败、retry 前 | continue/abort/skip/emit/switch_model |
{:after_tool, name, call_id, result} | 单个工具执行完毕 | continue/intervene/abort/emit/switch_model |
{:after_tool_batch, results} | 一批工具全部执行完毕 | continue/intervene/abort/emit/switch_model |
:before_finish | Agent 准备返回最终结果前,可拦截注入新 prompt | continue/intervene/abort/emit |
{:before_compact, messages} | 上下文压缩前,可修改待压缩消息 | continue/skip/emit |
{:before_steering, text} | CMDC.steer/2 中段软中断入队前 | continue/intervene/abort/emit |
{:before_plugin_opts_update, plugin_module, new_opts} | Plugin opts 热更前(仅在 idle flush 时触发) | continue/abort/skip/emit |
:after_turn payload 字段
Plugin 通过 handle_event({:after_turn, payload}, state, ctx) 接收。
payload :: map() 含:
:outcome—:finished | :aborted:abort_reason—term() | nil(仅在:aborted时非 nil):messages_diff—[CMDC.Message.t()]本次 prompt cycle 新增的消息(按时间顺序):token_usage_diff—%CMDC.TokenUsage{}本次增量:started_at_ms/:ended_at_ms/:duration_ms
与 :session_end 的区别:
:session_end无 payload,保持现状(不破坏 EventLogger 等老 Plugin):after_turn带结构化 payload,新 Plugin 推荐使用;典型场景: 把本轮对话写入审计 / 长期记忆 / 计费系统
8 种 Action
| Action | 元组形式 | 含义 | 典型使用者 |
|---|---|---|---|
continue | {:continue, state} | 继续执行下一个 plugin | 所有 plugin 的默认返回 |
intervene | {:intervene, prompt, state} | 注入新 prompt,Agent 继续工作 | SelfVerify、ReviewLoop |
abort | {:abort, reason, state} | 强制停止 Agent | SecurityGuard、BudgetGuard |
skip | {:skip, state} | 跳过后续 plugin(短路) | 短路优化 |
block_tool | {:block_tool, reason, state} | 阻止当前工具执行(仅 before_tool) | SecurityGuard、HumanApproval |
replace_tool_args | {:replace_tool_args, new_args, state} | 替换工具参数(仅 before_tool) | SecurityGuard、Sandbox 重写 |
emit | {:emit, {event_name, payload}, state} | 广播自定义事件 | 任何 plugin |
switch_model | {:switch_model, model_string, state} 或 {:switch_model, model_string, state, opts} | 运行期切换 LLM 模型。4 元组形态接受 opts 含 :provider_opts :: keyword() 同步切换 provider 参数(base_url / api_key / timeout) | Recovery、ModelRouter(运行期降级) |
Hook × Action 完整矩阵
下表标 ✓ 表示该 hook 接受该 action;- 表示该 hook 不支持该 action(Pipeline 内部
忽略并继续处理后续 plugin,不抛错)。每个 hook 都隐含支持 :continue 和 :emit。
| Hook \ Action | continue | intervene | abort | skip | block_tool | replace_args | emit | switch_model |
|---|---|---|---|---|---|---|---|---|
:session_start | ✓ | - | ✓ | - | - | - | ✓ | - |
:session_end | ✓ | - | - | - | - | - | ✓ | - |
{:after_turn, p} | ✓ | - | - | - | - | - | ✓ | - |
{:before_prompt, t} | ✓ | ✓ | ✓ | ✓ | - | - | ✓ | - |
{:before_request, m} | ✓ | ✓ | ✓ | ✓ | - | - | ✓ | ✓ |
{:after_response, m} | ✓ | ✓ | ✓ | ✓ | - | - | ✓ | ✓ |
{:before_tool, n, a} | ✓ | - | ✓ | - | ✓ | ✓ | ✓ | ✓ |
{:on_tool_error, ...} | ✓ | - | ✓ | ✓ | - | - | ✓ | -* |
{:after_tool, ...} | ✓ | ✓ | ✓ | - | - | - | ✓ | ✓ |
{:after_tool_batch, r} | ✓ | ✓ | ✓ | - | - | - | ✓ | ✓ |
:before_finish | ✓ | ✓ | ✓ | - | - | - | ✓ | - |
{:before_compact, m} | ✓ | - | - | ✓ | - | - | ✓ | - |
{:before_steering, t} | ✓ | ✓ | ✓ | - | - | - | ✓ | - |
{:before_plugin_opts_update, ...} | ✓ | - | ✓ | ✓ | - | - | ✓ | - |
*
:on_tool_error在 Task retry 内部 Pipeline 触发,无完整 Agent state 上下文,:switch_modelaction 被收集但不应用(避免脏写 state.model)。 若需基于工具失败切 model,请在:after_tool钩子中匹配{:error, _}result。
Action 失败/嵌套语义补充
:abort短路:Pipeline 立即停止,后续 plugin 不再执行;Agent 直接回 idle- 广播
{:agent_abort, reason}+ 触发:after_turn(outcome=:aborted)
- 广播
:block_tool短路(仅:before_tool):当前工具不执行,注入 synthetic error tool_result;后续 plugin 也不再执行;其他工具继续按 batch 调度:skip短路:跳过所有后续 plugin,但不影响 Agent 主流程(Agent 按"无 action" 继续推进):intervene累积式:多个 plugin 同时 intervene 时,prompt 按 priority 顺序 拼接(分隔),最终一次性注入;不短路:replace_tool_args覆盖式:多个 plugin 同时返回时取最后执行(priority 最大) 的值;不短路:emit累积式:所有 plugin 的 emit 事件按时序追加到emitted_events, Pipeline 结束后 Agent 统一 broadcast;不短路;支持 4 种形态:{:emit, {name, payload}, state}— 单事件{:emit, [{name, payload}, ...], state}— 多事件{:emit, name, payload, state}— 4 元简写{:emit, {name, a, b}, state}— 3 元 tuple payload(如:update_system_context)
:switch_model覆盖式:多个 plugin 同时返回时取最后执行(priority 最大)的值; 支持 3 元和 4 元两种形态;4 元{:switch_model, model, state, provider_opts: [...]}同步切 provider 参数
Plugin emit 自动注入 user_data
emit 出来的 {:plugin_event, name, payload} 事件,当 payload 是 map 时,
Agent 会自动 merge state.user_data 到 :user_data 字段(仅当 payload 不含
:user_data 时)。Plugin 可在 payload 中加 :_no_user_data 键 opt-out
(broadcast 前自动 pop)。订阅方可以直接从 payload.user_data 反查业务账号。
优先级范围
- P0 安全类:0-99(最先执行)
- P1 核心类:100-299
- P2 质量类:300-599
- P3 智能类:600-899
- 自定义:900+(最后执行)
运行期热更 Plugin opts(v0.6+)
集成方可通过 CMDC.update_plugin_opts/3 在长会话中调整已注册 Plugin 的运行时
参数,无需停 Agent 重启。改动语义如下:
- 入队即返:
update_plugin_opts/3是 cast,立即返回:ok,新 opts 进state.pending_plugin_opts队列(同一 plugin 多次更新 LWW 覆盖) - 仅 idle 生效:Agent 进入
idle状态时统一 flush,期间状态机不动;running/streaming/executing_tools期间排队等待,避免热路径状态错乱 :before_plugin_opts_updatehook:flush 时先走一次 Plugin Pipeline,其它 Plugin 可:abort/:skip拒绝(典型场景:SecurityGuard 校验新 opts 合法性)- 可选
on_config_update/2callback:未实现的 Plugin 走默认替换语义 (新 plugin_state =new_opts当 map,否则直接取 new_opts) - 结果广播:每条 flush 完成后 emit
{:plugin_opts_updated, %{plugin, success?, error}}
何时实现 on_config_update/2
仅当 Plugin 内部 state 是结构化的复合数据(含计数器 / 缓存 / 子结构)、并且热更 时希望"保留运行时累积值、只覆盖配置部分"时需要实现。简单"opts == state"的 Plugin 让默认实现兜底即可。
最小实现示例
defmodule MyApp.AuditPlugin do
@moduledoc "记录所有工具调用。"
@behaviour CMDC.Plugin
@impl true
def init(_opts), do: {:ok, %{calls: []}}
@impl true
def priority, do: 500
@impl true
def handle_event({:before_tool, name, args}, state, _ctx) do
{:continue, %{state | calls: [{name, args} | state.calls]}}
end
def handle_event(_event, state, _ctx), do: {:continue, state}
end5 个完整 Cookbook(DangerousCommandGuard / PromptAuditLog / BudgetGuard / ToolFallbackGuard / SteeringContentGuard)见
guides/cookbook.md。
Summary
Types
Plugin 返回的控制 action。
Plugin 接收的事件类型。
单个 emit 事件,被 Pipeline 累加到 emitted_events 并最终被 Agent 广播。
插件自身的运行时状态,格式由各插件自行定义。
Callbacks
返回插件的人类可读描述,用于调试和日志(可选)。
处理事件并返回一个 action。
初始化插件状态。
Plugin opts 热更回调(可选,v0.6+)。
会话结束时的清理操作(可选)。
插件优先级。数值越小越先执行。
Functions
获取 action 的类型标签(原子)。
对 Plugin 执行 opts 热更,调用其 on_config_update/2(若实现),否则走默认替换语义。
从 action 元组中提取 plugin_state。
检查给定模块是否实现了 CMDC.Plugin behaviour。
判断 action 是否为短路类型。
Types
@type action() :: {:continue, plugin_state()} | {:intervene, prompt :: String.t(), plugin_state()} | {:abort, reason :: String.t(), plugin_state()} | {:skip, plugin_state()} | {:block_tool, reason :: String.t(), plugin_state()} | {:replace_tool_args, new_args :: map(), plugin_state()} | {:emit, emit_event(), plugin_state()} | {:emit, [emit_event()], plugin_state()} | {:emit, event_name :: atom(), payload :: term(), plugin_state()} | {:switch_model, new_model :: String.t(), plugin_state()} | {:switch_model, new_model :: String.t(), plugin_state(), opts :: [{:provider_opts, keyword()}]}
Plugin 返回的控制 action。
共 8 种大类:
{:continue, state}— 继续执行下一个 plugin(默认){:intervene, prompt, state}— 注入新 prompt,Agent 继续工作{:abort, reason, state}— 强制停止 Agent{:skip, state}— 跳过后续所有 plugin(短路){:block_tool, reason, state}— 阻止工具执行(仅 before_tool 有效){:replace_tool_args, new_args, state}— 替换工具参数(仅 before_tool 有效){:emit, event, state}/{:emit, [event], state}/{:emit, name, payload, state}— 广播自定义事件,支持单个、列表、4 元组三种等价形式{:switch_model, model_string, state}— 运行期切换 LLM 模型; 多个 plugin 同时返回时,priority 较大者(后执行)的值生效; Agent 收到后立即调用switch_model/2,emit{:model_switched, %{from, to}}{:switch_model, model_string, state, opts}— 4 元组形态;opts :: keyword()支持:provider_opts :: keyword()(同步切换 provider 参数, 如从 Anthropic 切到 OpenAI 自建网关,可同时改base_url、api_key、timeout); 与 3 元组共存,Pipeline 根据元组长度自动分发;:model_switchedpayload 含provider_opts_changed?: boolean()
@type after_turn_payload() :: %{ outcome: :finished | :aborted, abort_reason: term() | nil, messages_diff: [CMDC.Message.t()], token_usage_diff: CMDC.TokenUsage.t(), started_at_ms: integer(), ended_at_ms: integer(), duration_ms: non_neg_integer() }
Plugin 接收的事件类型。
共 14 种,覆盖 Agent 完整生命周期:
:session_start— 会话启动:session_end— 会话结束{:after_turn, payload}— Agent 回 idle 前; payload 含:outcome :: :finished | :aborted、:abort_reason、:messages_diff、:token_usage_diff、:started_at_ms、:ended_at_ms、:duration_ms{:before_prompt, text}— 用户 prompt 提交前{:before_request, messages}— 发送 LLM 请求前{:after_response, assistant_msg}— 收到 LLM 回复后{:before_tool, name, args}— 工具执行前{:on_tool_error, name, call_id, error, attempt}— 工具执行失败、retry 前{:after_tool, name, call_id, result}— 单工具执行后{:after_tool_batch, results}— 批次工具全部执行后:before_finish— Agent 准备返回最终结果前{:before_compact, messages}— 上下文压缩前{:before_steering, text}—CMDC.steer/2中段软中断入队前{:before_plugin_opts_update, plugin_module, new_opts}— Plugin opts 热更前 (CMDC.update_plugin_opts/3触发,仅 idle flush 时执行)
@type emit_event() :: {event_name :: atom(), payload :: term()} | {event_name :: atom(), a :: term(), b :: term()}
单个 emit 事件,被 Pipeline 累加到 emitted_events 并最终被 Agent 广播。
- 2 元组
{name, payload}会被广播为{:plugin_event, name, payload} - 3 元组
{name, a, b}会被广播为{:plugin_event, name, {a, b}}典型用途:{:update_system_context, key, text}更新 Agent.state.dynamic_context_sections
@type event() :: :session_start | :session_end | {:after_turn, after_turn_payload()} | {:before_prompt, text :: String.t()} | {:before_request, messages :: [CMDC.Message.t()]} | {:after_response, assistant_msg :: CMDC.Message.t()} | {:before_tool, tool_name :: String.t(), args :: map()} | {:on_tool_error, tool_name :: String.t(), call_id :: String.t(), error :: String.t(), attempt :: pos_integer()} | {:after_tool, tool_name :: String.t(), call_id :: String.t(), result :: {:ok, String.t()} | {:error, String.t()}} | {:after_tool_batch, results :: [ {tool_name :: String.t(), {:ok, String.t()} | {:error, String.t()}} ]} | :before_finish | {:before_compact, messages :: [CMDC.Message.t()]} | {:before_steering, text :: String.t()} | {:before_plugin_opts_update, plugin_module :: module(), new_opts :: keyword()}
@type plugin_state() :: term()
插件自身的运行时状态,格式由各插件自行定义。
Callbacks
返回插件的人类可读描述,用于调试和日志(可选)。
可以返回简单字符串,也可以返回结构化 map(含 :name、:version、:description、
:events 等字段),便于 Plugin Registry / 调试工具展示。
@callback handle_event(event(), plugin_state(), CMDC.Context.t()) :: action()
处理事件并返回一个 action。
Pipeline 按 priority/0 从小到大依次调用各 plugin 的 handle_event/3。
遇到短路 action(abort、block_tool、skip)时,Pipeline 停止后续调用。
参数
event— 当前事件,见event/0类型定义state— 当前插件的运行时状态ctx— Agent 执行上下文(含 session_id、working_dir、model 等)
@callback init(opts :: keyword()) :: {:ok, plugin_state()} | {:error, term()}
初始化插件状态。
在 Agent 会话启动时被调用一次。返回 {:ok, state} 表示初始化成功,
返回 {:error, reason} 将阻止 Agent 启动。
参数
opts— 注册 plugin 时传入的选项(keyword list)
@callback on_config_update(new_opts :: keyword(), state :: plugin_state()) :: {:ok, new_state :: plugin_state()} | {:error, term()}
Plugin opts 热更回调(可选,v0.6+)。
CMDC.update_plugin_opts/3 触发,仅在 Agent 进入 idle 状态 flush 队列时
被调用一次。返回 {:ok, new_plugin_state} 替换当前 plugin state,返回
{:error, reason} 则保留旧 state 并 emit {:plugin_opts_updated, %{success?: false, error: reason}}。
参数
new_opts— 调用方传入的新 opts(keyword list)state— 当前 plugin 的运行时 state
默认实现语义
未实现该 callback 的 Plugin 走默认替换:
- 若
new_opts是 keyword list 且老 state 是 map,自动将 new_opts 转 map 后Map.merge(old_state, new_opts_map) - 否则直接
new_opts替换为新 state
何时需要自定义实现
- Plugin state 含运行时累积字段(计数器 / 缓存 / 子结构)需要保留
- Plugin state 与 opts 结构不同(需要做转换)
- 新 opts 需要校验合法性(拒绝非法配置返回
{:error, reason})
@callback on_session_end(plugin_state(), CMDC.Context.t()) :: :ok
会话结束时的清理操作(可选)。
@callback priority() :: non_neg_integer()
插件优先级。数值越小越先执行。
建议范围:
- 0-99:安全/防护类
- 100-299:核心功能类
- 300-599:质量监控类
- 600-899:智能增强类
- 900+:自定义扩展
Functions
获取 action 的类型标签(原子)。
示例
iex> CMDC.Plugin.action_type({:continue, %{}})
:continue
iex> CMDC.Plugin.action_type({:block_tool, "危险路径", %{}})
:block_tool
@spec apply_config_update(module(), keyword() | term(), plugin_state()) :: {:ok, plugin_state()} | {:error, term()}
对 Plugin 执行 opts 热更,调用其 on_config_update/2(若实现),否则走默认替换语义。
默认替换语义(与 callback 文档保持一致):
new_opts是 keyword list 且老 state 是 map →Map.merge(old_state, Map.new(new_opts))- 其他情况 → 直接以
new_opts作为新 state
返回 {:ok, new_state} 或 {:error, term()}。
示例
iex> CMDC.Plugin.apply_config_update(String, [a: 1], %{a: 0, b: 2})
{:ok, %{a: 1, b: 2}}
iex> CMDC.Plugin.apply_config_update(String, %{x: 9}, :anything)
{:ok, %{x: 9}}
@spec extract_state(action()) :: plugin_state()
从 action 元组中提取 plugin_state。
无论哪种 action 类型,state 始终是最后一个元素。
示例
iex> CMDC.Plugin.extract_state({:continue, %{count: 1}})
%{count: 1}
iex> CMDC.Plugin.extract_state({:abort, "stop", %{reason: "budget"}})
%{reason: "budget"}
检查给定模块是否实现了 CMDC.Plugin behaviour。
示例
iex> CMDC.Plugin.plugin?(CMDC.Plugin.Builtin.EventLogger)
true
iex> CMDC.Plugin.plugin?(String)
false
判断 action 是否为短路类型。
短路 action 会让 Pipeline 停止调用后续 plugin:
{:abort, ...}— 强制停止 Agent{:block_tool, ...}— 阻止工具执行{:skip, ...}— 跳过后续 plugin
示例
iex> CMDC.Plugin.short_circuit?({:abort, "危险操作", %{}})
true
iex> CMDC.Plugin.short_circuit?({:continue, %{}})
false