CMDC 多 Agent 编排引擎 — WorkflowSpec + Run API + DAG 驱动的 AgentOps 编排。

cmdc_orchestratorcmdc 之上提供「图驱动」的多 Agent 编排能力:把 工作流描述成 DAG(节点 + 边),由执行器按拓扑顺序调度,并把上游节点的输出 自动注入到下游节点的 prompt 中。v0.4 起新增可持久化 WorkflowSpec,用于企业 Workflow Designer、发布校验、dry run 和 Trace Viewer 接入。v0.5 新增异步 Run API、 RunStore 事件账本、condition/signal 分支和 output_key 上下文合并。v0.6 补齐 gen_statem 可恢复执行器、运行控制 API、retry/timeout/fallback policy、 :fork / :join 聚合、:human_task 暂停审批恢复和 AgentOps 企业接入契约。

理论基石参考 Agentic Design Patterns 第 2 / 3 / 7 / 12 / 15 章(路由、并行化、 多 Agent 协作、异常处理与恢复、Inter-Agent 通信)。

安装

Hex 依赖:

def deps do
  [
    {:cmdc, "~> 0.5"},
    {:cmdc_orchestrator, "~> 0.6"}
  ]
end

monorepo 内部通过 path 依赖:

def deps do
  [
    {:cmdc_orchestrator, path: "../cmdc_orchestrator"}
  ]
end

核心概念

概念说明
Orchestration一个 DAG(有向无环图),定义节点和依赖
WorkflowSpecv0.4 新增的可序列化工作流规格,面向保存、校验、展示、发布
NodeDAG 节点,12 种内置类型::agent / :aggregator / :router / :condition / :human_task / :fork / :join / :gate / :tool / :eval_gate / :debate / :hierarchy
Edge节点间的数据流依赖 %{from:, to:, branch?:, signal?:}
Run一次编排执行,v0.5 起可通过 start_run/2 取得异步句柄
RuntimeDAG 全程共享的运行时容器(v0.3+),管理 Agent 会话池与 SubAgent 生命周期

WorkflowSpec(v0.4)

WorkflowSpec 是面向企业平台保存和发布的稳定配置层;旧 %DAG{} 仍可直接 execute/2

spec = %{
  "workflow_id" => "wf.contract_review",
  "version" => "2026.06.001",
  "nodes" => [
    %{"id" => "draft", "type" => "agent", "config" => %{"prompt" => "审阅合同"}},
    %{"id" => "gate", "type" => "gate", "config" => %{"criteria" => ["完整"]}}
  ],
  "edges" => [%{"from" => "draft", "to" => "gate"}]
}

{:ok, workflow, warnings} = CMDCOrchestrator.validate_workflow(spec)
{:ok, dry_report} = CMDCOrchestrator.dry_run(workflow)
{:ok, dag} = CMDCOrchestrator.to_dag(workflow)
{:ok, results} = CMDCOrchestrator.execute(workflow, agent_opts)

校验覆盖稳定 node id、edge 引用、DAG 环路、router branch、节点 preflight、 孤岛节点 warning,以及匿名函数 / pid / reference / tuple 等不可持久化配置。

Run API(v0.5+)

企业 Run Console / Trace Viewer 推荐使用异步 Run API:

{:ok, run_id} =
  CMDCOrchestrator.start_run(workflow,
    input: %{"amount" => 128},
    trigger_source: :webhook,
    metadata: %{tenant_id: "t_001"}
  )

{:ok, %{run: run, node_runs: node_runs, events: events}} =
  CMDCOrchestrator.status(run_id)

{:ok, final_run} = CMDCOrchestrator.await_run(run_id, timeout: 120_000)
{:ok, events} =
  CMDCOrchestrator.events(run_id,
    type: "orchestrator.node.completed",
    limit: 100,
    after_id: last_event_id
  )
{:ok, _paused} = CMDCOrchestrator.pause_run(run_id, :operator_pause)
{:ok, _running} = CMDCOrchestrator.resume_run(run_id, idempotency_key: "resume-req-001")
{:ok, _cancelled} = CMDCOrchestrator.cancel(run_id, :user_cancelled)

start_run/2 支持 :idempotency_key,用于 HTTP 重试或重复点击时返回同一个 run:

{:ok, run_id} = CMDCOrchestrator.start_run(workflow, idempotency_key: "trigger-001")
{:ok, ^run_id} = CMDCOrchestrator.start_run(workflow, idempotency_key: "trigger-001")

v0.6 M1 开始,Run Console 可使用:

  • pause_run/3:当前节点结束后停在 :paused
  • resume_run/2:进程存在则唤醒,不存在则从 RunStore 的 DAG snapshot 重建;
  • retry_run/2:从失败 run 的恢复点继续;
  • retry_node/3:清掉指定节点及其下游结果后重跑;
  • rerun/2:基于旧 run 的 DAG snapshot 创建新 run。

v0.6 M2 增加 human_task 决策 API:

{:ok, _running} =
  CMDCOrchestrator.complete_human_task(run_id, "legal-review",
    actor_ref: "role:legal",
    idempotency_key: "approval-click-001"
  )

{:ok, _waiting} =
  CMDCOrchestrator.submit_human_task_decision(
    run_id,
    "legal-review",
    %{action: :progress, comment: "需要补充附件"},
    actor_ref: "role:legal"
  )

{:ok, _running} = CMDCOrchestrator.expire_waiting_tasks(run_id)

默认 RunStore 是 ETS 后端,适合开发和测试。v0.6 M1 还提供 CMDCOrchestrator.RunStore.Checkpoint,可复用 CMDC.Checkpoint.Backend.ETS/DETScmdc_memory_pg 的 checkpoint backend 做轻持久化。企业 Phoenix 平台可以实现 CMDCOrchestrator.RunStore behaviour,把 run、node run、event ledger 映射到 自己的 Ecto schema、租户、RBAC 和审计系统。

企业 Phoenix AgentOps 接入建议见 guides/agentops_integration.md,覆盖 Workflow Designer JSON DSL、RunService API、Approval Center、Gateway SSE 事件映射、 RunStore Ecto 注意事项、Oban/Temporal 边界和 Hive 迁移对照。

run_sync/2 是新 Run API 的同步 wrapper,成功时仍返回旧 execute/2 风格 {:ok, results}

{:ok, results} = CMDCOrchestrator.run_sync(workflow, timeout: 120_000)

节点类型

:agent

启动一个 CMDC Agent 执行配置好的 prompt,自动把 dep_results 拼到 prompt 末尾作为「上游节点输出」上下文。

%{
  id: "research",
  type: :agent,
  config: %{
    prompt: "调研 AI Agent 最新进展",
    system_prompt: "你是研究员",
    max_turns: 5,
    mode: :standalone        # v0.3+::standalone | :pool | :subagent
  }
}

v0.3 新增:三种执行模式

mode说明适用场景
:standalone(默认)每次 ephemeral 会话,不复用单点任务,无需历史
:poolpool_key 共享会话,对话累积Debater / 多轮审稿等需要"角色记忆"
:subagent通过 Runtime 注册独立长生命周期会话需要 SubAgent 监督和生命周期跟踪

:pool 模式可指定 config[:pool_key](默认 node.id);:subagent 模式 由 Runtime 在 DAG 结束时统一回收。

:aggregator

合并多个上游结果。三种策略:"concat" 拼接 / "merge" 合并 map / "vote" 多数 投票。

%{id: "merge", type: :aggregator, config: %{strategy: "concat"}}

:router

按策略分发到不同分支,配合边的 :branch 标签由 Executor 自动剪枝 (ADP 第 2 章 Routing)。三种策略:

# 1. rule —— 顺序匹配 pattern
%{
  id: "classify",
  type: :router,
  config: %{
    strategy: "rule",
    rules: [
      %{pattern: "技术", branch: "tech_branch"},
      %{pattern: "商业", branch: "biz_branch"}
    ]
  }
}

# 2. random —— 在 branches 中均匀随机
%{config: %{strategy: "random", branches: ["a", "b", "c"]}}

# 3. llm(v0.3+)—— LLM 选择 + fallback
%{
  id: "intent_router",
  type: :router,
  config: %{
    strategy: "llm",
    branches: ["weather", "news", "search"],
    fallback: "search",
    router_agent: %{name: "Router", model: "openai:gpt-4o-mini"}
  }
}

:condition(v0.5 新增)

确定性条件节点,返回 "true" / "false" signal,下游边按 :signal:branch 匹配:

%{
  id: "risk_check",
  type: :condition,
  config: %{
    left: "{{amount}}",
    operator: "gte",
    right: 100_000,
    output_key: "risk_check"
  }
}

支持算子:eq / neq / gt / gte / lt / lte / contains / not_contains / is_truthy / is_falsy

边示例:

[
  %{from: "risk_check", to: "legal_review", signal: "true"},
  %{from: "risk_check", to: "auto_report", signal: "false"}
]

:fork / :join(v0.6 M1)

fork 把多个分支放入同一个可恢复 run 内并发执行;join 聚合分支结果。分支使用 branch-local context,完成后一次性写回 run snapshot:

%{id: "split", type: :fork, config: %{output_key: "parallel_review"}}
%{id: "legal", type: :agent, config: %{prompt: "法务审阅"}}
%{id: "finance", type: :agent, config: %{prompt: "财务审阅"}}
%{id: "join", type: :join, config: %{mode: :all, fail_strategy: :fail_fast}}

join.config[:mode] 支持 :all / :any / :n_of_mfail_strategy 支持 :fail_fast / :wait_all / :tolerateWorkflowSpec.validate/1 会拒绝 dangling join、join 入边不足、嵌套 fork 和多 join 歧义。

:human_task(v0.6 M2)

human_task 把 workflow 挂起到人工审批或补录等待点。库内只保存任务描述、 assignee_refs、决策聚合状态、resume signal 与事件账本;审批 UI、RBAC/ABAC、 通知和企业审计表由 Phoenix 平台实现。导入 :human_approval / :approval 会归一化为 :human_task

%{
  id: "legal_review",
  type: :human_task,
  config: %{
    task_id: "legal-review",
    title: "法务审批:{{risk_review.summary}}",
    assignee_refs: ["role:legal"],
    approval_mode: :quorum,
    required_count: 2,
    timeout_ms: 86_400_000,
    on_timeout: :proceed_with_default,
    default_signal: "timeout",
    default_output: %{timed_out: true},
    output_key: "legal_review"
  }
}

决策支持 :approve / :reject / :request_changes / :progress。终局决策会把 human_task 输出写入 Run.completedRun.context_data,再按 "approved" / "rejected" / "request_changes" / timeout signal 继续下游;progress 只更新 任务和事件账本,不恢复 run。CMDCOrchestrator.AssigneeResolver 只定义可选 解析 behaviour,通用包不查询 Accounts/Roles。

统一 Policy(v0.6 M1)

节点可通过 policy 配置执行策略:

%{
  id: "risk_check",
  type: :tool,
  config: %{tool_name: "risk_score"},
  policy: %{
    retries: 2,
    timeout_ms: 5_000,
    backoff: :exponential,
    on_error: :emit_signal,
    signal: "needs_review"
  }
}

on_error 支持 :fail / :continue / :skip / :fallback / :emit_signal:fallback 可配 fallback_output 返回确定性输出。

:gate

质量检查点,criteria 全部通过才继续,否则中止整个 DAG。

%{id: "review", type: :gate, config: %{criteria: ["accurate", "concise"]}}

:tool(v0.4 新增)

直接调用已注册的 CMDC.Tool 模块,输入可从上游节点结果模板渲染。

%{
  id: "rag_search",
  type: :tool,
  config: %{
    tool_name: "enterprise_rag_search",
    args: %{"query" => "{{classify.query}}"},
    output_key: "evidence"
  }
}

运行时通过 agent_opts[:tool_registry] 注入:

CMDCOrchestrator.execute(dag,
  tool_registry: %{"enterprise_rag_search" => MyApp.Tools.RAGSearch}
)

:eval_gate(v0.4 新增)

离线评测门禁,适合 AgentSpec / RAG preset / Workflow 发布前阻断。

%{
  id: "release_gate",
  type: :eval_gate,
  config: %{
    metrics: %{groundedness: 0.91, unauthorized_source_count: 0},
    thresholds: %{groundedness: 0.85, unauthorized_source_count: 0}
  }
}

也可以配置 gate_module 委托给企业侧 CMDCEvalCMDCRAGArcana.Eval.Gate 风格模块的 check/2

:debate(v0.3 新增)

多 Agent 辩论 + Judge 模式,对应 ADP 第 7 章 Multi-Agent Collaboration。

%{
  id: "debate",
  type: :debate,
  config: %{
    topic: "Elixir vs Python on Agents?",
    debaters: [%{name: "ProElixir"}, %{name: "ProPython"}],
    judge: %{name: "Judge"},
    max_rounds: 3,
    consensus_fn: fn rounds -> length(rounds) >= 2 end  # 可选提前终止
  }
}
  • 每轮内部按列表顺序串行执行,后发言者能看到本轮先发言者的内容;
  • 默认每个 debater / judge 走 :pool 模式,自动累积自己角色的对话历史;
  • 输出包含 topic / rounds / consensus / verdict / terminated_at_round

:hierarchy(v0.3 新增)

Manager → Workers → Synthesizer 三段式协作。

%{
  id: "hier",
  type: :hierarchy,
  config: %{
    goal: "为新 SaaS 制定市场调研报告",
    manager: %{name: "PM"},                                 # 可选
    workers: [%{name: "Researcher"}, %{name: "Analyst"}],
    synthesizer: %{name: "Writer"},                          # 可选
    worker_assign: :round_robin,                             # 或 :pairwise
    max_parallel: 4
  }
}

可选 :tasks / :split_fn 完全跳过 LLM 拆解,直接用静态子任务列表跑离线测试。

使用示例

dag = %CMDCOrchestrator.DAG{
  nodes: [
    %{id: "research", type: :agent, config: %{prompt: "调研 AI Agent"}},
    %{id: "write", type: :agent, config: %{prompt: "写一篇 800 字博客"}},
    %{id: "review", type: :gate, config: %{criteria: ["完整"]}}
  ],
  edges: [
    %{from: "research", to: "write"},
    %{from: "write", to: "review"}
  ]
}

{:ok, results} = CMDCOrchestrator.execute(dag,
  model: "openai:gpt-4o-mini",
  api_key: System.fetch_env!("OPENAI_API_KEY")
)

results["write"]

执行失败时返回结构化错误,方便上层 UI 聚焦失败点:

{:error, %{node_id: "review", reason: "门禁未通过: 完整", completed: %{...}}}

与 CMDC v0.2 的集成

:agent 节点支持透传 cmdc 0.2 的全部 Agent 选项::user_data / :prompt_mode / :plugins / :tools / :provider / :model 等。

v0.3 通过 Runtime 提供原生的会话池和 SubAgent 支持,无需用户手工管理 CMDC.SubAgent.Supervisor

  • :pool 模式 → Runtime 内部按 pool_key 复用会话;
  • :subagent 模式 → Runtime 注册独立长生命周期会话,DAG 结束统一 CMDC.stop/1

内置模板(v0.4)

CMDCOrchestrator.Templates.names()
#=> ["contract_review", "order_delay_diagnosis", "ticket_triage", "rag_release_gate", "debate_review"]

spec = CMDCOrchestrator.Templates.get!("contract_review")
{:ok, _report} = CMDCOrchestrator.dry_run(spec)

模板是 JSON-ready 的 WorkflowSpec 草稿,可作为 Workflow Designer 初始化数据。

Telemetry(v0.4)

执行器发出稳定事件,metadata 只包含结构化摘要,不放 prompt、chunk 或完整工具输出:

  • [:cmdc_orchestrator, :run, :start | :stop | :exception]

  • [:cmdc_orchestrator, :node, :start | :stop | :exception]

这些事件可由 cmdc_gateway 或企业 AgentOps Trace Viewer 映射为 SSE / timeline。

human_task 会额外写入 human_task.createdhuman_task.decision_recordedhuman_task.progresshuman_task.completedhuman_task.timeout 等事件,供 Approval Center、Run Console 和 Trace Viewer 关联展示。

开发

mix deps.get
mix test
mix format --check-formatted
mix compile --warnings-as-errors
mix credo --strict

路线图

  • v0.1 ✅ 基础 DAG + 4 种节点 + 串行执行
  • v0.2 ✅ 真并行(拓扑分层 + Task.async_stream)+ Router 真剪枝 + AgentNode Recovery
  • v0.3 ✅ Runtime + AgentNode 三模式(standalone/pool/subagent)+ DebateNode + HierarchyNode + RouterNode LLM 策略
  • v0.4 ✅ WorkflowSpec + validate/dry_run/to_dag + Node Registry + Tool/EvalGate + Templates + Telemetry
  • v0.5 ✅ Async Run API + RunStore + event ledger + condition/signal
  • v0.6 M1 ✅ 可恢复 Executor + Run Console controls + retry/timeout/fallback + fork/join
  • v0.6 M2 ✅ human_task / 审批暂停恢复 / timeout tick helper

详见 CHANGELOG.mdexample/multi_agent_debate_demo.exs

许可

Apache-2.0