Plugin Pipeline 执行器。
按 priority/0 从小到大依次调用各 plugin 的 handle_event/3,
遇到短路 action(abort / block_tool / skip)立即停止。
执行语义
| Action | 管道行为 | 结果体现 |
|---|---|---|
continue | 继续执行下一个 plugin | action: :continue |
intervene | 收集 prompt,继续执行后续 plugin | action: :intervene,interventions 累积 |
abort | 短路,立即停止 | action: :abort,halted_by 有值 |
skip | 短路,跳过所有后续 plugin | action: :skip,halted_by 有值 |
block_tool | 短路,阻止工具执行(仅 before_tool 有效) | action: :block_tool,halted_by 有值 |
replace_tool_args | 替换工具参数,继续执行(最后一次替换生效) | replaced_args 被覆盖 |
replace_tool_result | 替换工具结果,继续执行(仅 after_tool,最后一次生效) | replaced_result 被覆盖 |
emit | 收集自定义事件,继续执行 | emitted_events 累积 |
switch_model | 请求切换 LLM 模型,继续执行(最后一次生效) | model_switch 被覆盖 |
使用示例
# 构建已排序的 plugin 列表
plugins = Pipeline.sort([{MyPlugin, plugin_state}, {OtherPlugin, other_state}])
# 执行 Pipeline
{:ok, result} = Pipeline.run(plugins, {:before_tool, "shell", %{"cmd" => "rm -rf /"}}, ctx)
case result.action do
:block_tool -> {:error, result.halt_reason}
:abort -> {:error, result.halt_reason}
_ -> :ok
end
Summary
Functions
判断 Pipeline 结果是否被短路(abort / block_tool / skip)。
从 Pipeline 结果中获取所有 intervention prompt,合并为单个字符串。
执行 Pipeline:按顺序对所有 plugin 调用 handle_event/3。
对 plugin 列表按 priority/0 升序排序,返回排好序的 plugin_entry 列表。
Types
@type plugin_entry() :: {module(), CMDC.Plugin.plugin_state()}
Pipeline 中的单个 plugin 条目:{模块, 运行时状态}。
@type result() :: %{ action: :continue | :intervene | :abort | :skip | :block_tool, plugin_states: %{required(module()) => CMDC.Plugin.plugin_state()}, interventions: [%{plugin: module(), prompt: String.t()}], emitted_events: [{atom(), term()}], replaced_args: map() | nil, replaced_result: term() | nil, model_switch: String.t() | {String.t(), keyword()} | nil, halted_by: module() | nil, halt_reason: String.t() | nil }
Pipeline 执行结果。
action— 最终 action 类型(:continue|:intervene|:abort|:skip|:block_tool)plugin_states— 执行后各 plugin 的最新状态interventions— 收集到的干预 prompt 列表,含来源 plugin 模块emitted_events— 收集到的自定义事件列表replaced_args— 替换后的工具参数(仅before_tool事件可能有值)replaced_result— 替换后的工具结果(仅after_tool事件可能有值,v0.4.1+)model_switch— 待切换的目标 model(v0.2 RFC C8);nil表示不切换;String.t()表示只切 model(v0.2 形式);{model, provider_opts}表示同时切 model 与 provider_opts (v0.3 RFC 11G #A17,provider_opts :: keyword()); 多个 plugin 同时 switch_model 时取最后执行(priority 最大)的值halted_by— 触发短路的 plugin 模块(nil 表示未短路)halt_reason— 短路原因文本(nil 表示未短路)
Functions
判断 Pipeline 结果是否被短路(abort / block_tool / skip)。
示例
iex> Pipeline.halted?(%{halted_by: nil, action: :continue, ...})
false
iex> Pipeline.halted?(%{halted_by: MyPlugin, action: :abort, ...})
true
从 Pipeline 结果中获取所有 intervention prompt,合并为单个字符串。
无 intervention 时返回 nil。
示例
iex> Pipeline.merged_interventions(%{interventions: [], ...})
nil
iex> Pipeline.merged_interventions(%{interventions: [%{plugin: MyPlugin, prompt: "请检查输出"}], ...})
"[Elixir.MyPlugin] 请检查输出"
@spec run([plugin_entry()], CMDC.Plugin.event(), CMDC.Context.t()) :: {:ok, result()} | {:error, term()}
执行 Pipeline:按顺序对所有 plugin 调用 handle_event/3。
Plugin 列表应预先通过 sort/1 排好序。
遇到短路 action(abort / block_tool / skip)时立即停止,
不再调用后续 plugin。
单个 plugin 抛出异常时,记录警告日志后跳过该 plugin,继续执行后续 plugin。
参数
plugins— 已排好序的[{module, plugin_state}]列表event— 当前事件,见CMDC.Plugin.event/0ctx— Agent 执行上下文
返回
{:ok, result()}— 执行成功(含所有 action 类型,包括 abort){:error, {:pipeline_crash, exception}}— Pipeline 内部异常(极少发生)
@spec sort([plugin_entry()]) :: [plugin_entry()]
对 plugin 列表按 priority/0 升序排序,返回排好序的 plugin_entry 列表。
在调用 run/3 之前应先调用此函数。建议在 Agent 初始化时排好序并缓存,
避免每次事件触发时重复排序。
参数
plugins—[{module, plugin_state}]列表,顺序任意
示例
sorted = Pipeline.sort([{MyPlugin, %{}}, {SecurityGuard, %{}}])
# SecurityGuard priority: 10 排在前面