CMDC 多 Agent 编排引擎 — DAG 驱动的 Agent 协作。
核心概念
- Orchestration:一个 DAG(有向无环图),定义 Agent 节点和依赖关系
- Node:DAG 中的节点,支持 agent / router / condition / fork / join 等内置类型
- Edge:节点间的数据流依赖,可按 signal 选择分支
- Run:一次编排执行,可异步启动、暂停、恢复、重试和查询事件账本
使用示例
dag = %CMDCOrchestrator.DAG{
nodes: [
%{id: "research", type: :agent, config: %{prompt: "调研 AI 趋势"}},
%{id: "write", type: :agent, config: %{prompt: "写一篇博客"}},
%{id: "review", type: :gate, config: %{criteria: ["准确性"]}},
],
edges: [
%{from: "research", to: "write"},
%{from: "write", to: "review"}
]
}
{:ok, results} = CMDCOrchestrator.execute(dag, agent_opts)
Summary
Functions
等待 run 到达终态。
取消未完成的 run。
把 human_task 标记为通过。
执行无副作用 dry run。
读取 run event ledger。
扫描指定 run 中已经超时的 human_task 并执行 timeout 策略。
把 human_task 标记为拒绝。
暂停未完成的 run。
基于已有 run 的 DAG 快照重新创建一个新 run。
恢复 paused/waiting/failed run。
从指定节点重试,清掉该节点及其下游结果。
从失败 run 的恢复点重试。
同步执行新 Run API,成功时返回旧 execute/2 风格 results。
启动异步 workflow run,返回 run_id。
返回 run / node_runs / events 状态快照。
提交 human_task 决策;终态决策会自动恢复 waiting run。
把 WorkflowSpec 转为旧 DAG 执行结构。
校验可持久化 WorkflowSpec。
Functions
@spec await_run( String.t(), keyword() ) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
等待 run 到达终态。
@spec cancel(String.t(), term(), keyword()) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
取消未完成的 run。
@spec complete_human_task(String.t(), String.t(), keyword()) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
把 human_task 标记为通过。
@spec dry_run( map() | keyword() | CMDCOrchestrator.WorkflowSpec.t(), keyword() ) :: {:ok, map()} | {:error, map()}
执行无副作用 dry run。
@spec events( String.t(), keyword() ) :: {:ok, [CMDCOrchestrator.RunEvent.t()]} | {:error, term()}
读取 run event ledger。
@spec execute( CMDCOrchestrator.DAG.t() | CMDCOrchestrator.WorkflowSpec.t(), keyword() ) :: {:ok, map()} | {:error, term()}
@spec expire_waiting_tasks( String.t(), keyword() ) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
扫描指定 run 中已经超时的 human_task 并执行 timeout 策略。
@spec fail_human_task(String.t(), String.t(), term(), keyword()) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
把 human_task 标记为拒绝。
@spec pause_run(String.t(), term(), keyword()) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
暂停未完成的 run。
基于已有 run 的 DAG 快照重新创建一个新 run。
@spec resume_run( String.t(), keyword() ) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
恢复 paused/waiting/failed run。
@spec retry_node(String.t(), String.t(), keyword()) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
从指定节点重试,清掉该节点及其下游结果。
@spec retry_run( String.t(), keyword() ) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
从失败 run 的恢复点重试。
@spec run_sync( CMDCOrchestrator.DAG.t() | CMDCOrchestrator.WorkflowSpec.t() | map() | keyword(), keyword() ) :: {:ok, map()} | {:error, term()}
同步执行新 Run API,成功时返回旧 execute/2 风格 results。
@spec start_run( CMDCOrchestrator.DAG.t() | CMDCOrchestrator.WorkflowSpec.t() | map() | keyword(), keyword() ) :: {:ok, String.t()} | {:error, term()}
启动异步 workflow run,返回 run_id。
0.5 起推荐企业 Run Console 使用本 API,再通过 status/2 与 events/2
查询运行状态和事件账本。
返回 run / node_runs / events 状态快照。
@spec submit_human_task_decision( String.t(), String.t(), map() | atom() | String.t(), keyword() ) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}
提交 human_task 决策;终态决策会自动恢复 waiting run。
@spec to_dag(map() | keyword() | CMDCOrchestrator.WorkflowSpec.t()) :: {:ok, CMDCOrchestrator.DAG.t()} | {:error, term()}
把 WorkflowSpec 转为旧 DAG 执行结构。
@spec validate_workflow(map() | keyword() | CMDCOrchestrator.WorkflowSpec.t()) :: {:ok, CMDCOrchestrator.WorkflowSpec.t(), [CMDCOrchestrator.WorkflowSpec.validation_issue()]} | {:error, [CMDCOrchestrator.WorkflowSpec.validation_issue()], [CMDCOrchestrator.WorkflowSpec.validation_issue()]}
校验可持久化 WorkflowSpec。