CMDCOrchestrator (cmdc_orchestrator v0.5.0)

Copy Markdown View Source

CMDC 多 Agent 编排引擎 — DAG 驱动的 Agent 协作。

核心概念

  • Orchestration:一个 DAG(有向无环图),定义 Agent 节点和依赖关系
  • Node:DAG 中的节点,4 种类型:agent / aggregator / router / gate
  • Edge:节点间的数据流依赖
  • Run:一次编排执行,按拓扑顺序调度节点

使用示例

dag = %CMDCOrchestrator.DAG{
  nodes: [
    %{id: "research", type: :agent, config: %{prompt: "调研 AI 趋势"}},
    %{id: "write", type: :agent, config: %{prompt: "写一篇博客"}},
    %{id: "review", type: :gate, config: %{criteria: ["准确性"]}},
  ],
  edges: [
    %{from: "research", to: "write"},
    %{from: "write", to: "review"}
  ]
}

{:ok, results} = CMDCOrchestrator.execute(dag, agent_opts)

Summary

Functions

等待 run 到达终态。

执行无副作用 dry run。

读取 run event ledger。

同步执行新 Run API,成功时返回旧 execute/2 风格 results。

启动异步 workflow run,返回 run_id。

返回 run / node_runs / events 状态快照。

把 WorkflowSpec 转为旧 DAG 执行结构。

校验可持久化 WorkflowSpec。

Functions

await_run(run_id, opts \\ [])

@spec await_run(
  String.t(),
  keyword()
) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

等待 run 到达终态。

cancel(run_id, reason \\ :cancelled, opts \\ [])

@spec cancel(String.t(), term(), keyword()) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

取消未完成的 run。

dry_run(input, opts \\ [])

@spec dry_run(
  map() | keyword() | CMDCOrchestrator.WorkflowSpec.t(),
  keyword()
) :: {:ok, map()} | {:error, map()}

执行无副作用 dry run。

events(run_id, opts \\ [])

@spec events(
  String.t(),
  keyword()
) :: {:ok, [CMDCOrchestrator.RunEvent.t()]} | {:error, term()}

读取 run event ledger。

execute(orchestration, agent_opts \\ [])

@spec execute(
  CMDCOrchestrator.DAG.t() | CMDCOrchestrator.WorkflowSpec.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

run_sync(orchestration, opts \\ [])

@spec run_sync(
  CMDCOrchestrator.DAG.t()
  | CMDCOrchestrator.WorkflowSpec.t()
  | map()
  | keyword(),
  keyword()
) :: {:ok, map()} | {:error, term()}

同步执行新 Run API,成功时返回旧 execute/2 风格 results。

start_run(orchestration, opts \\ [])

@spec start_run(
  CMDCOrchestrator.DAG.t()
  | CMDCOrchestrator.WorkflowSpec.t()
  | map()
  | keyword(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

启动异步 workflow run,返回 run_id。

0.5 起推荐企业 Run Console 使用本 API,再通过 status/2events/2 查询运行状态和事件账本。

status(run_id, opts \\ [])

@spec status(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

返回 run / node_runs / events 状态快照。

to_dag(input)

@spec to_dag(map() | keyword() | CMDCOrchestrator.WorkflowSpec.t()) ::
  {:ok, CMDCOrchestrator.DAG.t()} | {:error, term()}

把 WorkflowSpec 转为旧 DAG 执行结构。

validate_workflow(input)

校验可持久化 WorkflowSpec。