CMDC.Plugin behaviour (cmdc v0.6.1)

Copy Markdown View Source

Plugin behaviour 定义。所有插件实现此 behaviour。

Plugin 系统是拦截 Agent 生命周期的核心切面:14 个事件 × 8 种 action。 本 moduledoc 给出速查表与最小实现示例。完整范例见 guides/cookbook.md

14 种事件(event)— 速查表

事件触发时机可用 action
:session_startAgent 会话刚启动continue/abort/emit
:session_endAgent 会话正常结束continue/emit
{:after_turn, payload}Agent 即将回 idle 前(finish + abort 两种路径)continue/emit
{:before_prompt, text}用户 prompt 提交前continue/intervene/abort/skip/emit
{:before_request, messages}发送 LLM 请求前,可修改消息列表continue/intervene/abort/skip/emit/switch_model
{:after_response, assistant_msg}收到 LLM 回复后,可分析内容continue/intervene/abort/skip/emit/switch_model
{:before_tool, name, args}工具执行前,可阻止或替换参数continue/block_tool/replace_tool_args/abort/emit
{:on_tool_error, name, call_id, error, attempt}工具执行失败、retry 前continue/abort/skip/emit/switch_model
{:after_tool, name, call_id, result}单个工具执行完毕continue/intervene/abort/emit/switch_model
{:after_tool_batch, results}一批工具全部执行完毕continue/intervene/abort/emit/switch_model
:before_finishAgent 准备返回最终结果前,可拦截注入新 promptcontinue/intervene/abort/emit
{:before_compact, messages}上下文压缩前,可修改待压缩消息continue/skip/emit
{:before_steering, text}CMDC.steer/2 中段软中断入队前continue/intervene/abort/emit
{:before_plugin_opts_update, plugin_module, new_opts}Plugin opts 热更前(仅在 idle flush 时触发)continue/abort/skip/emit

:after_turn payload 字段

Plugin 通过 handle_event({:after_turn, payload}, state, ctx) 接收。 payload :: map() 含:

  • :outcome:finished | :aborted

  • :abort_reasonterm() | nil(仅在 :aborted 时非 nil)

  • :messages_diff[CMDC.Message.t()] 本次 prompt cycle 新增的消息(按时间顺序)
  • :token_usage_diff%CMDC.TokenUsage{} 本次增量
  • :started_at_ms / :ended_at_ms / :duration_ms

:session_end 的区别:

  • :session_end 无 payload,保持现状(不破坏 EventLogger 等老 Plugin)
  • :after_turn 带结构化 payload,新 Plugin 推荐使用;典型场景: 把本轮对话写入审计 / 长期记忆 / 计费系统

8 种 Action

Action元组形式含义典型使用者
continue{:continue, state}继续执行下一个 plugin所有 plugin 的默认返回
intervene{:intervene, prompt, state}注入新 prompt,Agent 继续工作SelfVerify、ReviewLoop
abort{:abort, reason, state}强制停止 AgentSecurityGuard、BudgetGuard
skip{:skip, state}跳过后续 plugin(短路)短路优化
block_tool{:block_tool, reason, state}阻止当前工具执行(仅 before_tool)SecurityGuard、HumanApproval
replace_tool_args{:replace_tool_args, new_args, state}替换工具参数(仅 before_tool)SecurityGuard、Sandbox 重写
emit{:emit, {event_name, payload}, state}广播自定义事件任何 plugin
switch_model{:switch_model, model_string, state}{:switch_model, model_string, state, opts}运行期切换 LLM 模型。4 元组形态接受 opts:provider_opts :: keyword() 同步切换 provider 参数(base_url / api_key / timeout)Recovery、ModelRouter(运行期降级)

Hook × Action 完整矩阵

下表标 表示该 hook 接受该 action;- 表示该 hook 不支持该 action(Pipeline 内部 忽略并继续处理后续 plugin,不抛错)。每个 hook 都隐含支持 :continue:emit

Hook \ Actioncontinueinterveneabortskipblock_toolreplace_argsemitswitch_model
:session_start-----
:session_end------
{:after_turn, p}------
{:before_prompt, t}---
{:before_request, m}--
{:after_response, m}--
{:before_tool, n, a}--
{:on_tool_error, ...}----*
{:after_tool, ...}---
{:after_tool_batch, r}---
:before_finish----
{:before_compact, m}-----
{:before_steering, t}----
{:before_plugin_opts_update, ...}----

*:on_tool_error 在 Task retry 内部 Pipeline 触发,无完整 Agent state 上下文,:switch_model action 被收集但不应用(避免脏写 state.model)。 若需基于工具失败切 model,请在 :after_tool 钩子中匹配 {:error, _} result。

Action 失败/嵌套语义补充

  • :abort 短路:Pipeline 立即停止,后续 plugin 不再执行;Agent 直接回 idle
    • 广播 {:agent_abort, reason} + 触发 :after_turn (outcome=:aborted)
  • :block_tool 短路(仅 :before_tool):当前工具不执行,注入 synthetic error tool_result;后续 plugin 也不再执行;其他工具继续按 batch 调度
  • :skip 短路:跳过所有后续 plugin,但不影响 Agent 主流程(Agent 按"无 action" 继续推进)
  • :intervene 累积式:多个 plugin 同时 intervene 时,prompt 按 priority 顺序 拼接( 分隔),最终一次性注入;不短路
  • :replace_tool_args 覆盖式:多个 plugin 同时返回时取最后执行(priority 最大) 的值;不短路
  • :emit 累积式:所有 plugin 的 emit 事件按时序追加到 emitted_events, Pipeline 结束后 Agent 统一 broadcast;不短路;支持 4 种形态:
    • {:emit, {name, payload}, state} — 单事件
    • {:emit, [{name, payload}, ...], state} — 多事件
    • {:emit, name, payload, state} — 4 元简写
    • {:emit, {name, a, b}, state} — 3 元 tuple payload(如 :update_system_context
  • :switch_model 覆盖式:多个 plugin 同时返回时取最后执行(priority 最大)的值; 支持 3 元和 4 元两种形态;4 元 {:switch_model, model, state, provider_opts: [...]} 同步切 provider 参数

Plugin emit 自动注入 user_data

emit 出来的 {:plugin_event, name, payload} 事件,当 payload 是 map 时, Agent 会自动 merge state.user_data:user_data 字段(仅当 payload 不含 :user_data 时)。Plugin 可在 payload 中加 :_no_user_data 键 opt-out (broadcast 前自动 pop)。订阅方可以直接从 payload.user_data 反查业务账号。

优先级范围

  • P0 安全类:0-99(最先执行)
  • P1 核心类:100-299
  • P2 质量类:300-599
  • P3 智能类:600-899
  • 自定义:900+(最后执行)

运行期热更 Plugin opts(v0.6+)

集成方可通过 CMDC.update_plugin_opts/3 在长会话中调整已注册 Plugin 的运行时 参数,无需停 Agent 重启。改动语义如下:

  • 入队即返update_plugin_opts/3 是 cast,立即返回 :ok,新 opts 进 state.pending_plugin_opts 队列(同一 plugin 多次更新 LWW 覆盖)
  • 仅 idle 生效:Agent 进入 idle 状态时统一 flush,期间状态机不动; running / streaming / executing_tools 期间排队等待,避免热路径状态错乱
  • :before_plugin_opts_update hook:flush 时先走一次 Plugin Pipeline,其它 Plugin 可 :abort / :skip 拒绝(典型场景:SecurityGuard 校验新 opts 合法性)
  • 可选 on_config_update/2 callback:未实现的 Plugin 走默认替换语义 (新 plugin_state = new_opts 当 map,否则直接取 new_opts)
  • 结果广播:每条 flush 完成后 emit {:plugin_opts_updated, %{plugin, success?, error}}

何时实现 on_config_update/2

仅当 Plugin 内部 state 是结构化的复合数据(含计数器 / 缓存 / 子结构)、并且热更 时希望"保留运行时累积值、只覆盖配置部分"时需要实现。简单"opts == state"的 Plugin 让默认实现兜底即可。

最小实现示例

defmodule MyApp.AuditPlugin do
  @moduledoc "记录所有工具调用。"
  @behaviour CMDC.Plugin

  @impl true
  def init(_opts), do: {:ok, %{calls: []}}

  @impl true
  def priority, do: 500

  @impl true
  def handle_event({:before_tool, name, args}, state, _ctx) do
    {:continue, %{state | calls: [{name, args} | state.calls]}}
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}
end

5 个完整 Cookbook(DangerousCommandGuard / PromptAuditLog / BudgetGuard / ToolFallbackGuard / SteeringContentGuard)见 guides/cookbook.md

Summary

Types

Plugin 返回的控制 action。

Plugin 接收的事件类型。

单个 emit 事件,被 Pipeline 累加到 emitted_events 并最终被 Agent 广播。

插件自身的运行时状态,格式由各插件自行定义。

Callbacks

返回插件的人类可读描述,用于调试和日志(可选)。

处理事件并返回一个 action。

初始化插件状态。

Plugin opts 热更回调(可选,v0.6+)。

会话结束时的清理操作(可选)。

插件优先级。数值越小越先执行。

Functions

获取 action 的类型标签(原子)。

对 Plugin 执行 opts 热更,调用其 on_config_update/2(若实现),否则走默认替换语义。

从 action 元组中提取 plugin_state。

检查给定模块是否实现了 CMDC.Plugin behaviour。

判断 action 是否为短路类型。

Types

action()

@type action() ::
  {:continue, plugin_state()}
  | {:intervene, prompt :: String.t(), plugin_state()}
  | {:abort, reason :: String.t(), plugin_state()}
  | {:skip, plugin_state()}
  | {:block_tool, reason :: String.t(), plugin_state()}
  | {:replace_tool_args, new_args :: map(), plugin_state()}
  | {:emit, emit_event(), plugin_state()}
  | {:emit, [emit_event()], plugin_state()}
  | {:emit, event_name :: atom(), payload :: term(), plugin_state()}
  | {:switch_model, new_model :: String.t(), plugin_state()}
  | {:switch_model, new_model :: String.t(), plugin_state(),
     opts :: [{:provider_opts, keyword()}]}

Plugin 返回的控制 action。

共 8 种大类:

  • {:continue, state} — 继续执行下一个 plugin(默认)
  • {:intervene, prompt, state} — 注入新 prompt,Agent 继续工作
  • {:abort, reason, state} — 强制停止 Agent
  • {:skip, state} — 跳过后续所有 plugin(短路)
  • {:block_tool, reason, state} — 阻止工具执行(仅 before_tool 有效)
  • {:replace_tool_args, new_args, state} — 替换工具参数(仅 before_tool 有效)
  • {:emit, event, state} / {:emit, [event], state} / {:emit, name, payload, state} — 广播自定义事件,支持单个、列表、4 元组三种等价形式
  • {:switch_model, model_string, state} — 运行期切换 LLM 模型; 多个 plugin 同时返回时,priority 较大者(后执行)的值生效; Agent 收到后立即调用 switch_model/2,emit {:model_switched, %{from, to}}
  • {:switch_model, model_string, state, opts} — 4 元组形态; opts :: keyword() 支持 :provider_opts :: keyword()(同步切换 provider 参数, 如从 Anthropic 切到 OpenAI 自建网关,可同时改 base_urlapi_keytimeout); 与 3 元组共存,Pipeline 根据元组长度自动分发; :model_switched payload 含 provider_opts_changed?: boolean()

after_turn_payload()

@type after_turn_payload() :: %{
  outcome: :finished | :aborted,
  abort_reason: term() | nil,
  messages_diff: [CMDC.Message.t()],
  token_usage_diff: CMDC.TokenUsage.t(),
  started_at_ms: integer(),
  ended_at_ms: integer(),
  duration_ms: non_neg_integer()
}

Plugin 接收的事件类型。

共 14 种,覆盖 Agent 完整生命周期:

  • :session_start — 会话启动
  • :session_end — 会话结束
  • {:after_turn, payload} — Agent 回 idle 前; payload 含 :outcome :: :finished | :aborted:abort_reason:messages_diff:token_usage_diff:started_at_ms:ended_at_ms:duration_ms
  • {:before_prompt, text} — 用户 prompt 提交前
  • {:before_request, messages} — 发送 LLM 请求前
  • {:after_response, assistant_msg} — 收到 LLM 回复后
  • {:before_tool, name, args} — 工具执行前
  • {:on_tool_error, name, call_id, error, attempt} — 工具执行失败、retry 前
  • {:after_tool, name, call_id, result} — 单工具执行后
  • {:after_tool_batch, results} — 批次工具全部执行后
  • :before_finish — Agent 准备返回最终结果前
  • {:before_compact, messages} — 上下文压缩前
  • {:before_steering, text}CMDC.steer/2 中段软中断入队前
  • {:before_plugin_opts_update, plugin_module, new_opts} — Plugin opts 热更前 (CMDC.update_plugin_opts/3 触发,仅 idle flush 时执行)

emit_event()

@type emit_event() ::
  {event_name :: atom(), payload :: term()}
  | {event_name :: atom(), a :: term(), b :: term()}

单个 emit 事件,被 Pipeline 累加到 emitted_events 并最终被 Agent 广播。

  • 2 元组 {name, payload} 会被广播为 {:plugin_event, name, payload}
  • 3 元组 {name, a, b} 会被广播为 {:plugin_event, name, {a, b}} 典型用途:{:update_system_context, key, text} 更新 Agent.state.dynamic_context_sections

event()

@type event() ::
  :session_start
  | :session_end
  | {:after_turn, after_turn_payload()}
  | {:before_prompt, text :: String.t()}
  | {:before_request, messages :: [CMDC.Message.t()]}
  | {:after_response, assistant_msg :: CMDC.Message.t()}
  | {:before_tool, tool_name :: String.t(), args :: map()}
  | {:on_tool_error, tool_name :: String.t(), call_id :: String.t(),
     error :: String.t(), attempt :: pos_integer()}
  | {:after_tool, tool_name :: String.t(), call_id :: String.t(),
     result :: {:ok, String.t()} | {:error, String.t()}}
  | {:after_tool_batch,
     results :: [
       {tool_name :: String.t(), {:ok, String.t()} | {:error, String.t()}}
     ]}
  | :before_finish
  | {:before_compact, messages :: [CMDC.Message.t()]}
  | {:before_steering, text :: String.t()}
  | {:before_plugin_opts_update, plugin_module :: module(),
     new_opts :: keyword()}

plugin_state()

@type plugin_state() :: term()

插件自身的运行时状态,格式由各插件自行定义。

Callbacks

describe()

(optional)
@callback describe() :: String.t() | map()

返回插件的人类可读描述,用于调试和日志(可选)。

可以返回简单字符串,也可以返回结构化 map(含 :name:version:description:events 等字段),便于 Plugin Registry / 调试工具展示。

handle_event(event, plugin_state, t)

@callback handle_event(event(), plugin_state(), CMDC.Context.t()) :: action()

处理事件并返回一个 action。

Pipeline 按 priority/0 从小到大依次调用各 plugin 的 handle_event/3。 遇到短路 action(abortblock_toolskip)时,Pipeline 停止后续调用。

参数

  • event — 当前事件,见 event/0 类型定义
  • state — 当前插件的运行时状态
  • ctx — Agent 执行上下文(含 session_id、working_dir、model 等)

init(opts)

@callback init(opts :: keyword()) :: {:ok, plugin_state()} | {:error, term()}

初始化插件状态。

在 Agent 会话启动时被调用一次。返回 {:ok, state} 表示初始化成功, 返回 {:error, reason} 将阻止 Agent 启动。

参数

  • opts — 注册 plugin 时传入的选项(keyword list)

on_config_update(new_opts, state)

(optional)
@callback on_config_update(new_opts :: keyword(), state :: plugin_state()) ::
  {:ok, new_state :: plugin_state()} | {:error, term()}

Plugin opts 热更回调(可选,v0.6+)。

CMDC.update_plugin_opts/3 触发,仅在 Agent 进入 idle 状态 flush 队列时 被调用一次。返回 {:ok, new_plugin_state} 替换当前 plugin state,返回 {:error, reason} 则保留旧 state 并 emit {:plugin_opts_updated, %{success?: false, error: reason}}

参数

  • new_opts — 调用方传入的新 opts(keyword list)
  • state — 当前 plugin 的运行时 state

默认实现语义

未实现该 callback 的 Plugin 走默认替换:

  • new_opts 是 keyword list 且老 state 是 map,自动将 new_opts 转 map 后 Map.merge(old_state, new_opts_map)
  • 否则直接 new_opts 替换为新 state

何时需要自定义实现

  • Plugin state 含运行时累积字段(计数器 / 缓存 / 子结构)需要保留
  • Plugin state 与 opts 结构不同(需要做转换)
  • 新 opts 需要校验合法性(拒绝非法配置返回 {:error, reason}

on_session_end(plugin_state, t)

(optional)
@callback on_session_end(plugin_state(), CMDC.Context.t()) :: :ok

会话结束时的清理操作(可选)。

priority()

@callback priority() :: non_neg_integer()

插件优先级。数值越小越先执行。

建议范围:

  • 0-99:安全/防护类
  • 100-299:核心功能类
  • 300-599:质量监控类
  • 600-899:智能增强类
  • 900+:自定义扩展

Functions

action_type(arg)

@spec action_type(action()) :: atom()

获取 action 的类型标签(原子)。

示例

iex> CMDC.Plugin.action_type({:continue, %{}})
:continue

iex> CMDC.Plugin.action_type({:block_tool, "危险路径", %{}})
:block_tool

apply_config_update(module, new_opts, state)

@spec apply_config_update(module(), keyword() | term(), plugin_state()) ::
  {:ok, plugin_state()} | {:error, term()}

对 Plugin 执行 opts 热更,调用其 on_config_update/2(若实现),否则走默认替换语义。

默认替换语义(与 callback 文档保持一致):

  • new_opts 是 keyword list 且老 state 是 map → Map.merge(old_state, Map.new(new_opts))
  • 其他情况 → 直接以 new_opts 作为新 state

返回 {:ok, new_state}{:error, term()}

示例

iex> CMDC.Plugin.apply_config_update(String, [a: 1], %{a: 0, b: 2})
{:ok, %{a: 1, b: 2}}

iex> CMDC.Plugin.apply_config_update(String, %{x: 9}, :anything)
{:ok, %{x: 9}}

extract_state(arg)

@spec extract_state(action()) :: plugin_state()

从 action 元组中提取 plugin_state。

无论哪种 action 类型,state 始终是最后一个元素。

示例

iex> CMDC.Plugin.extract_state({:continue, %{count: 1}})
%{count: 1}

iex> CMDC.Plugin.extract_state({:abort, "stop", %{reason: "budget"}})
%{reason: "budget"}

plugin?(module)

@spec plugin?(module()) :: boolean()

检查给定模块是否实现了 CMDC.Plugin behaviour。

示例

iex> CMDC.Plugin.plugin?(CMDC.Plugin.Builtin.EventLogger)
true

iex> CMDC.Plugin.plugin?(String)
false

short_circuit?(arg1)

@spec short_circuit?(action()) :: boolean()

判断 action 是否为短路类型。

短路 action 会让 Pipeline 停止调用后续 plugin:

  • {:abort, ...} — 强制停止 Agent
  • {:block_tool, ...} — 阻止工具执行
  • {:skip, ...} — 跳过后续 plugin

示例

iex> CMDC.Plugin.short_circuit?({:abort, "危险操作", %{}})
true

iex> CMDC.Plugin.short_circuit?({:continue, %{}})
false