CMDC.Plugin behaviour (cmdc v0.5.0)

Copy Markdown View Source

Plugin behaviour 定义。所有插件实现此 behaviour。

Plugin 系统是拦截 Agent 生命周期的核心切面:13 个事件 × 8 种 action。 本 moduledoc 给出速查表与最小实现示例。完整范例见 guides/cookbook.md

13 种事件(event)— 速查表

事件触发时机可用 action
:session_startAgent 会话刚启动continue/abort/emit
:session_endAgent 会话正常结束continue/emit
{:after_turn, payload}Agent 即将回 idle 前(finish + abort 两种路径)continue/emit
{:before_prompt, text}用户 prompt 提交前continue/intervene/abort/skip/emit
{:before_request, messages}发送 LLM 请求前,可修改消息列表continue/intervene/abort/skip/emit/switch_model
{:after_response, assistant_msg}收到 LLM 回复后,可分析内容continue/intervene/abort/skip/emit/switch_model
{:before_tool, name, args}工具执行前,可阻止或替换参数continue/block_tool/replace_tool_args/abort/emit
{:on_tool_error, name, call_id, error, attempt}工具执行失败、retry 前continue/abort/skip/emit/switch_model
{:after_tool, name, call_id, result}单个工具执行完毕continue/intervene/abort/emit/switch_model
{:after_tool_batch, results}一批工具全部执行完毕continue/intervene/abort/emit/switch_model
:before_finishAgent 准备返回最终结果前,可拦截注入新 promptcontinue/intervene/abort/emit
{:before_compact, messages}上下文压缩前,可修改待压缩消息continue/skip/emit
{:before_steering, text}CMDC.steer/2 中段软中断入队前continue/intervene/abort/emit

:after_turn payload 字段

Plugin 通过 handle_event({:after_turn, payload}, state, ctx) 接收。 payload :: map() 含:

  • :outcome:finished | :aborted

  • :abort_reasonterm() | nil(仅在 :aborted 时非 nil)

  • :messages_diff[CMDC.Message.t()] 本次 prompt cycle 新增的消息(按时间顺序)
  • :token_usage_diff%CMDC.TokenUsage{} 本次增量
  • :started_at_ms / :ended_at_ms / :duration_ms

:session_end 的区别:

  • :session_end 无 payload,保持现状(不破坏 EventLogger 等老 Plugin)
  • :after_turn 带结构化 payload,新 Plugin 推荐使用;典型场景: 把本轮对话写入审计 / 长期记忆 / 计费系统

8 种 Action

Action元组形式含义典型使用者
continue{:continue, state}继续执行下一个 plugin所有 plugin 的默认返回
intervene{:intervene, prompt, state}注入新 prompt,Agent 继续工作SelfVerify、ReviewLoop
abort{:abort, reason, state}强制停止 AgentSecurityGuard、BudgetGuard
skip{:skip, state}跳过后续 plugin(短路)短路优化
block_tool{:block_tool, reason, state}阻止当前工具执行(仅 before_tool)SecurityGuard、HumanApproval
replace_tool_args{:replace_tool_args, new_args, state}替换工具参数(仅 before_tool)SecurityGuard、Sandbox 重写
emit{:emit, {event_name, payload}, state}广播自定义事件任何 plugin
switch_model{:switch_model, model_string, state}{:switch_model, model_string, state, opts}运行期切换 LLM 模型。4 元组形态接受 opts:provider_opts :: keyword() 同步切换 provider 参数(base_url / api_key / timeout)Recovery、ModelRouter(运行期降级)

Hook × Action 完整矩阵

下表标 表示该 hook 接受该 action;- 表示该 hook 不支持该 action(Pipeline 内部 忽略并继续处理后续 plugin,不抛错)。每个 hook 都隐含支持 :continue:emit

Hook \ Actioncontinueinterveneabortskipblock_toolreplace_argsemitswitch_model
:session_start-----
:session_end------
{:after_turn, p}------
{:before_prompt, t}---
{:before_request, m}--
{:after_response, m}--
{:before_tool, n, a}--
{:on_tool_error, ...}----*
{:after_tool, ...}---
{:after_tool_batch, r}---
:before_finish----
{:before_compact, m}-----
{:before_steering, t}----

*:on_tool_error 在 Task retry 内部 Pipeline 触发,无完整 Agent state 上下文,:switch_model action 被收集但不应用(避免脏写 state.model)。 若需基于工具失败切 model,请在 :after_tool 钩子中匹配 {:error, _} result。

Action 失败/嵌套语义补充

  • :abort 短路:Pipeline 立即停止,后续 plugin 不再执行;Agent 直接回 idle
    • 广播 {:agent_abort, reason} + 触发 :after_turn (outcome=:aborted)
  • :block_tool 短路(仅 :before_tool):当前工具不执行,注入 synthetic error tool_result;后续 plugin 也不再执行;其他工具继续按 batch 调度
  • :skip 短路:跳过所有后续 plugin,但不影响 Agent 主流程(Agent 按"无 action" 继续推进)
  • :intervene 累积式:多个 plugin 同时 intervene 时,prompt 按 priority 顺序 拼接( 分隔),最终一次性注入;不短路
  • :replace_tool_args 覆盖式:多个 plugin 同时返回时取最后执行(priority 最大) 的值;不短路
  • :emit 累积式:所有 plugin 的 emit 事件按时序追加到 emitted_events, Pipeline 结束后 Agent 统一 broadcast;不短路;支持 4 种形态:
    • {:emit, {name, payload}, state} — 单事件
    • {:emit, [{name, payload}, ...], state} — 多事件
    • {:emit, name, payload, state} — 4 元简写
    • {:emit, {name, a, b}, state} — 3 元 tuple payload(如 :update_system_context
  • :switch_model 覆盖式:多个 plugin 同时返回时取最后执行(priority 最大)的值; 支持 3 元和 4 元两种形态;4 元 {:switch_model, model, state, provider_opts: [...]} 同步切 provider 参数

Plugin emit 自动注入 user_data

emit 出来的 {:plugin_event, name, payload} 事件,当 payload 是 map 时, Agent 会自动 merge state.user_data:user_data 字段(仅当 payload 不含 :user_data 时)。Plugin 可在 payload 中加 :_no_user_data 键 opt-out (broadcast 前自动 pop)。订阅方可以直接从 payload.user_data 反查业务账号。

优先级范围

  • P0 安全类:0-99(最先执行)
  • P1 核心类:100-299
  • P2 质量类:300-599
  • P3 智能类:600-899
  • 自定义:900+(最后执行)

最小实现示例

defmodule MyApp.AuditPlugin do
  @moduledoc "记录所有工具调用。"
  @behaviour CMDC.Plugin

  @impl true
  def init(_opts), do: {:ok, %{calls: []}}

  @impl true
  def priority, do: 500

  @impl true
  def handle_event({:before_tool, name, args}, state, _ctx) do
    {:continue, %{state | calls: [{name, args} | state.calls]}}
  end

  def handle_event(_event, state, _ctx), do: {:continue, state}
end

5 个完整 Cookbook(DangerousCommandGuard / PromptAuditLog / BudgetGuard / ToolFallbackGuard / SteeringContentGuard)见 guides/cookbook.md

Summary

Types

Plugin 返回的控制 action。

Plugin 接收的事件类型。

单个 emit 事件,被 Pipeline 累加到 emitted_events 并最终被 Agent 广播。

插件自身的运行时状态,格式由各插件自行定义。

Callbacks

返回插件的人类可读描述,用于调试和日志(可选)。

处理事件并返回一个 action。

初始化插件状态。

会话结束时的清理操作(可选)。

插件优先级。数值越小越先执行。

Functions

获取 action 的类型标签(原子)。

从 action 元组中提取 plugin_state。

检查给定模块是否实现了 CMDC.Plugin behaviour。

判断 action 是否为短路类型。

Types

action()

@type action() ::
  {:continue, plugin_state()}
  | {:intervene, prompt :: String.t(), plugin_state()}
  | {:abort, reason :: String.t(), plugin_state()}
  | {:skip, plugin_state()}
  | {:block_tool, reason :: String.t(), plugin_state()}
  | {:replace_tool_args, new_args :: map(), plugin_state()}
  | {:emit, emit_event(), plugin_state()}
  | {:emit, [emit_event()], plugin_state()}
  | {:emit, event_name :: atom(), payload :: term(), plugin_state()}
  | {:switch_model, new_model :: String.t(), plugin_state()}
  | {:switch_model, new_model :: String.t(), plugin_state(),
     opts :: [{:provider_opts, keyword()}]}

Plugin 返回的控制 action。

共 8 种大类:

  • {:continue, state} — 继续执行下一个 plugin(默认)
  • {:intervene, prompt, state} — 注入新 prompt,Agent 继续工作
  • {:abort, reason, state} — 强制停止 Agent
  • {:skip, state} — 跳过后续所有 plugin(短路)
  • {:block_tool, reason, state} — 阻止工具执行(仅 before_tool 有效)
  • {:replace_tool_args, new_args, state} — 替换工具参数(仅 before_tool 有效)
  • {:emit, event, state} / {:emit, [event], state} / {:emit, name, payload, state} — 广播自定义事件,支持单个、列表、4 元组三种等价形式
  • {:switch_model, model_string, state} — 运行期切换 LLM 模型; 多个 plugin 同时返回时,priority 较大者(后执行)的值生效; Agent 收到后立即调用 switch_model/2,emit {:model_switched, %{from, to}}
  • {:switch_model, model_string, state, opts} — 4 元组形态; opts :: keyword() 支持 :provider_opts :: keyword()(同步切换 provider 参数, 如从 Anthropic 切到 OpenAI 自建网关,可同时改 base_urlapi_keytimeout); 与 3 元组共存,Pipeline 根据元组长度自动分发; :model_switched payload 含 provider_opts_changed?: boolean()

after_turn_payload()

@type after_turn_payload() :: %{
  outcome: :finished | :aborted,
  abort_reason: term() | nil,
  messages_diff: [CMDC.Message.t()],
  token_usage_diff: CMDC.TokenUsage.t(),
  started_at_ms: integer(),
  ended_at_ms: integer(),
  duration_ms: non_neg_integer()
}

Plugin 接收的事件类型。

共 13 种,覆盖 Agent 完整生命周期:

  • :session_start — 会话启动
  • :session_end — 会话结束
  • {:after_turn, payload} — Agent 回 idle 前; payload 含 :outcome :: :finished | :aborted:abort_reason:messages_diff:token_usage_diff:started_at_ms:ended_at_ms:duration_ms
  • {:before_prompt, text} — 用户 prompt 提交前
  • {:before_request, messages} — 发送 LLM 请求前
  • {:after_response, assistant_msg} — 收到 LLM 回复后
  • {:before_tool, name, args} — 工具执行前
  • {:on_tool_error, name, call_id, error, attempt} — 工具执行失败、retry 前
  • {:after_tool, name, call_id, result} — 单工具执行后
  • {:after_tool_batch, results} — 批次工具全部执行后
  • :before_finish — Agent 准备返回最终结果前
  • {:before_compact, messages} — 上下文压缩前
  • {:before_steering, text}CMDC.steer/2 中段软中断入队前

emit_event()

@type emit_event() ::
  {event_name :: atom(), payload :: term()}
  | {event_name :: atom(), a :: term(), b :: term()}

单个 emit 事件,被 Pipeline 累加到 emitted_events 并最终被 Agent 广播。

  • 2 元组 {name, payload} 会被广播为 {:plugin_event, name, payload}
  • 3 元组 {name, a, b} 会被广播为 {:plugin_event, name, {a, b}} 典型用途:{:update_system_context, key, text} 更新 Agent.state.dynamic_context_sections

event()

@type event() ::
  :session_start
  | :session_end
  | {:after_turn, after_turn_payload()}
  | {:before_prompt, text :: String.t()}
  | {:before_request, messages :: [CMDC.Message.t()]}
  | {:after_response, assistant_msg :: CMDC.Message.t()}
  | {:before_tool, tool_name :: String.t(), args :: map()}
  | {:on_tool_error, tool_name :: String.t(), call_id :: String.t(),
     error :: String.t(), attempt :: pos_integer()}
  | {:after_tool, tool_name :: String.t(), call_id :: String.t(),
     result :: {:ok, String.t()} | {:error, String.t()}}
  | {:after_tool_batch,
     results :: [
       {tool_name :: String.t(), {:ok, String.t()} | {:error, String.t()}}
     ]}
  | :before_finish
  | {:before_compact, messages :: [CMDC.Message.t()]}
  | {:before_steering, text :: String.t()}

plugin_state()

@type plugin_state() :: term()

插件自身的运行时状态,格式由各插件自行定义。

Callbacks

describe()

(optional)
@callback describe() :: String.t() | map()

返回插件的人类可读描述,用于调试和日志(可选)。

可以返回简单字符串,也可以返回结构化 map(含 :name:version:description:events 等字段),便于 Plugin Registry / 调试工具展示。

handle_event(event, plugin_state, t)

@callback handle_event(event(), plugin_state(), CMDC.Context.t()) :: action()

处理事件并返回一个 action。

Pipeline 按 priority/0 从小到大依次调用各 plugin 的 handle_event/3。 遇到短路 action(abortblock_toolskip)时,Pipeline 停止后续调用。

参数

  • event — 当前事件,见 event/0 类型定义
  • state — 当前插件的运行时状态
  • ctx — Agent 执行上下文(含 session_id、working_dir、model 等)

init(opts)

@callback init(opts :: keyword()) :: {:ok, plugin_state()} | {:error, term()}

初始化插件状态。

在 Agent 会话启动时被调用一次。返回 {:ok, state} 表示初始化成功, 返回 {:error, reason} 将阻止 Agent 启动。

参数

  • opts — 注册 plugin 时传入的选项(keyword list)

on_session_end(plugin_state, t)

(optional)
@callback on_session_end(plugin_state(), CMDC.Context.t()) :: :ok

会话结束时的清理操作(可选)。

priority()

@callback priority() :: non_neg_integer()

插件优先级。数值越小越先执行。

建议范围:

  • 0-99:安全/防护类
  • 100-299:核心功能类
  • 300-599:质量监控类
  • 600-899:智能增强类
  • 900+:自定义扩展

Functions

action_type(arg)

@spec action_type(action()) :: atom()

获取 action 的类型标签(原子)。

示例

iex> CMDC.Plugin.action_type({:continue, %{}})
:continue

iex> CMDC.Plugin.action_type({:block_tool, "危险路径", %{}})
:block_tool

extract_state(arg)

@spec extract_state(action()) :: plugin_state()

从 action 元组中提取 plugin_state。

无论哪种 action 类型,state 始终是最后一个元素。

示例

iex> CMDC.Plugin.extract_state({:continue, %{count: 1}})
%{count: 1}

iex> CMDC.Plugin.extract_state({:abort, "stop", %{reason: "budget"}})
%{reason: "budget"}

plugin?(module)

@spec plugin?(module()) :: boolean()

检查给定模块是否实现了 CMDC.Plugin behaviour。

示例

iex> CMDC.Plugin.plugin?(CMDC.Plugin.Builtin.EventLogger)
true

iex> CMDC.Plugin.plugin?(String)
false

short_circuit?(arg1)

@spec short_circuit?(action()) :: boolean()

判断 action 是否为短路类型。

短路 action 会让 Pipeline 停止调用后续 plugin:

  • {:abort, ...} — 强制停止 Agent
  • {:block_tool, ...} — 阻止工具执行
  • {:skip, ...} — 跳过后续 plugin

示例

iex> CMDC.Plugin.short_circuit?({:abort, "危险操作", %{}})
true

iex> CMDC.Plugin.short_circuit?({:continue, %{}})
false