CMDC.Plugin.Pipeline (cmdc v0.3.0)

Copy Markdown View Source

Plugin Pipeline 执行器。

priority/0 从小到大依次调用各 plugin 的 handle_event/3, 遇到短路 action(abort / block_tool / skip)立即停止。

执行语义

Action管道行为结果体现
continue继续执行下一个 pluginaction: :continue
intervene收集 prompt,继续执行后续 pluginaction: :interveneinterventions 累积
abort短路,立即停止action: :aborthalted_by 有值
skip短路,跳过所有后续 pluginaction: :skiphalted_by 有值
block_tool短路,阻止工具执行(仅 before_tool 有效)action: :block_toolhalted_by 有值
replace_tool_args替换工具参数,继续执行(最后一次替换生效)replaced_args 被覆盖
emit收集自定义事件,继续执行emitted_events 累积
switch_model请求切换 LLM 模型,继续执行(最后一次生效)model_switch 被覆盖

使用示例

# 构建已排序的 plugin 列表
plugins = Pipeline.sort([{MyPlugin, plugin_state}, {OtherPlugin, other_state}])

# 执行 Pipeline
{:ok, result} = Pipeline.run(plugins, {:before_tool, "shell", %{"cmd" => "rm -rf /"}}, ctx)

case result.action do
  :block_tool -> {:error, result.halt_reason}
  :abort -> {:error, result.halt_reason}
  _ -> :ok
end

Summary

Types

Pipeline 中的单个 plugin 条目:{模块, 运行时状态}。

Pipeline 执行结果。

Functions

判断 Pipeline 结果是否被短路(abort / block_tool / skip)。

从 Pipeline 结果中获取所有 intervention prompt,合并为单个字符串。

执行 Pipeline:按顺序对所有 plugin 调用 handle_event/3

对 plugin 列表按 priority/0 升序排序,返回排好序的 plugin_entry 列表。

Types

plugin_entry()

@type plugin_entry() :: {module(), CMDC.Plugin.plugin_state()}

Pipeline 中的单个 plugin 条目:{模块, 运行时状态}。

result()

@type result() :: %{
  action: :continue | :intervene | :abort | :skip | :block_tool,
  plugin_states: %{required(module()) => CMDC.Plugin.plugin_state()},
  interventions: [%{plugin: module(), prompt: String.t()}],
  emitted_events: [{atom(), term()}],
  replaced_args: map() | nil,
  model_switch: String.t() | {String.t(), keyword()} | nil,
  halted_by: module() | nil,
  halt_reason: String.t() | nil
}

Pipeline 执行结果。

  • action — 最终 action 类型(:continue | :intervene | :abort | :skip | :block_tool

  • plugin_states — 执行后各 plugin 的最新状态
  • interventions — 收集到的干预 prompt 列表,含来源 plugin 模块
  • emitted_events — 收集到的自定义事件列表
  • replaced_args — 替换后的工具参数(仅 before_tool 事件可能有值)
  • model_switch — 待切换的目标 model(v0.2 RFC C8); nil 表示不切换; String.t() 表示只切 model(v0.2 形式); {model, provider_opts} 表示同时切 model 与 provider_opts (v0.3 RFC 11G #A17,provider_opts :: keyword()); 多个 plugin 同时 switch_model 时取最后执行(priority 最大)的值
  • halted_by — 触发短路的 plugin 模块(nil 表示未短路)
  • halt_reason — 短路原因文本(nil 表示未短路)

Functions

halted?(map)

@spec halted?(result()) :: boolean()

判断 Pipeline 结果是否被短路(abort / block_tool / skip)。

示例

iex> Pipeline.halted?(%{halted_by: nil, action: :continue, ...})
false

iex> Pipeline.halted?(%{halted_by: MyPlugin, action: :abort, ...})
true

merged_interventions(map)

@spec merged_interventions(result()) :: String.t() | nil

从 Pipeline 结果中获取所有 intervention prompt,合并为单个字符串。

无 intervention 时返回 nil

示例

iex> Pipeline.merged_interventions(%{interventions: [], ...})
nil

iex> Pipeline.merged_interventions(%{interventions: [%{plugin: MyPlugin, prompt: "请检查输出"}], ...})
"[Elixir.MyPlugin] 请检查输出"

run(plugins, event, ctx)

@spec run([plugin_entry()], CMDC.Plugin.event(), CMDC.Context.t()) ::
  {:ok, result()} | {:error, term()}

执行 Pipeline:按顺序对所有 plugin 调用 handle_event/3

Plugin 列表应预先通过 sort/1 排好序。

遇到短路 action(abort / block_tool / skip)时立即停止, 不再调用后续 plugin。

单个 plugin 抛出异常时,记录警告日志后跳过该 plugin,继续执行后续 plugin。

参数

  • plugins — 已排好序的 [{module, plugin_state}] 列表
  • event — 当前事件,见 CMDC.Plugin.event/0
  • ctx — Agent 执行上下文

返回

  • {:ok, result()} — 执行成功(含所有 action 类型,包括 abort)
  • {:error, {:pipeline_crash, exception}} — Pipeline 内部异常(极少发生)

sort(plugins)

@spec sort([plugin_entry()]) :: [plugin_entry()]

对 plugin 列表按 priority/0 升序排序,返回排好序的 plugin_entry 列表。

在调用 run/3 之前应先调用此函数。建议在 Agent 初始化时排好序并缓存, 避免每次事件触发时重复排序。

参数

  • plugins[{module, plugin_state}] 列表,顺序任意

示例

sorted = Pipeline.sort([{MyPlugin, %{}}, {SecurityGuard, %{}}])
# SecurityGuard priority: 10 排在前面