CMDCOrchestrator (cmdc_orchestrator v0.6.0)

Copy Markdown View Source

CMDC 多 Agent 编排引擎 — DAG 驱动的 Agent 协作。

核心概念

  • Orchestration:一个 DAG(有向无环图),定义 Agent 节点和依赖关系
  • Node:DAG 中的节点,支持 agent / router / condition / fork / join 等内置类型
  • Edge:节点间的数据流依赖,可按 signal 选择分支
  • Run:一次编排执行,可异步启动、暂停、恢复、重试和查询事件账本

使用示例

dag = %CMDCOrchestrator.DAG{
  nodes: [
    %{id: "research", type: :agent, config: %{prompt: "调研 AI 趋势"}},
    %{id: "write", type: :agent, config: %{prompt: "写一篇博客"}},
    %{id: "review", type: :gate, config: %{criteria: ["准确性"]}},
  ],
  edges: [
    %{from: "research", to: "write"},
    %{from: "write", to: "review"}
  ]
}

{:ok, results} = CMDCOrchestrator.execute(dag, agent_opts)

Summary

Functions

等待 run 到达终态。

把 human_task 标记为通过。

执行无副作用 dry run。

读取 run event ledger。

扫描指定 run 中已经超时的 human_task 并执行 timeout 策略。

把 human_task 标记为拒绝。

基于已有 run 的 DAG 快照重新创建一个新 run。

恢复 paused/waiting/failed run。

从指定节点重试,清掉该节点及其下游结果。

从失败 run 的恢复点重试。

同步执行新 Run API,成功时返回旧 execute/2 风格 results。

启动异步 workflow run,返回 run_id。

返回 run / node_runs / events 状态快照。

提交 human_task 决策;终态决策会自动恢复 waiting run。

把 WorkflowSpec 转为旧 DAG 执行结构。

校验可持久化 WorkflowSpec。

Functions

await_run(run_id, opts \\ [])

@spec await_run(
  String.t(),
  keyword()
) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

等待 run 到达终态。

cancel(run_id, reason \\ :cancelled, opts \\ [])

@spec cancel(String.t(), term(), keyword()) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

取消未完成的 run。

complete_human_task(run_id, task_id, opts \\ [])

@spec complete_human_task(String.t(), String.t(), keyword()) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

把 human_task 标记为通过。

dry_run(input, opts \\ [])

@spec dry_run(
  map() | keyword() | CMDCOrchestrator.WorkflowSpec.t(),
  keyword()
) :: {:ok, map()} | {:error, map()}

执行无副作用 dry run。

events(run_id, opts \\ [])

@spec events(
  String.t(),
  keyword()
) :: {:ok, [CMDCOrchestrator.RunEvent.t()]} | {:error, term()}

读取 run event ledger。

execute(orchestration, agent_opts \\ [])

@spec execute(
  CMDCOrchestrator.DAG.t() | CMDCOrchestrator.WorkflowSpec.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

expire_waiting_tasks(run_id, opts \\ [])

@spec expire_waiting_tasks(
  String.t(),
  keyword()
) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

扫描指定 run 中已经超时的 human_task 并执行 timeout 策略。

fail_human_task(run_id, task_id, reason, opts \\ [])

@spec fail_human_task(String.t(), String.t(), term(), keyword()) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

把 human_task 标记为拒绝。

pause_run(run_id, reason \\ :paused, opts \\ [])

@spec pause_run(String.t(), term(), keyword()) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

暂停未完成的 run。

rerun(run_id, opts \\ [])

@spec rerun(
  String.t(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

基于已有 run 的 DAG 快照重新创建一个新 run。

resume_run(run_id, opts \\ [])

@spec resume_run(
  String.t(),
  keyword()
) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

恢复 paused/waiting/failed run。

retry_node(run_id, node_id, opts \\ [])

@spec retry_node(String.t(), String.t(), keyword()) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

从指定节点重试,清掉该节点及其下游结果。

retry_run(run_id, opts \\ [])

@spec retry_run(
  String.t(),
  keyword()
) :: {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

从失败 run 的恢复点重试。

run_sync(orchestration, opts \\ [])

@spec run_sync(
  CMDCOrchestrator.DAG.t()
  | CMDCOrchestrator.WorkflowSpec.t()
  | map()
  | keyword(),
  keyword()
) :: {:ok, map()} | {:error, term()}

同步执行新 Run API,成功时返回旧 execute/2 风格 results。

start_run(orchestration, opts \\ [])

@spec start_run(
  CMDCOrchestrator.DAG.t()
  | CMDCOrchestrator.WorkflowSpec.t()
  | map()
  | keyword(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

启动异步 workflow run,返回 run_id。

0.5 起推荐企业 Run Console 使用本 API,再通过 status/2events/2 查询运行状态和事件账本。

status(run_id, opts \\ [])

@spec status(
  String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

返回 run / node_runs / events 状态快照。

submit_human_task_decision(run_id, task_id, decision, opts \\ [])

@spec submit_human_task_decision(
  String.t(),
  String.t(),
  map() | atom() | String.t(),
  keyword()
) ::
  {:ok, CMDCOrchestrator.Run.t()} | {:error, term()}

提交 human_task 决策;终态决策会自动恢复 waiting run。

to_dag(input)

@spec to_dag(map() | keyword() | CMDCOrchestrator.WorkflowSpec.t()) ::
  {:ok, CMDCOrchestrator.DAG.t()} | {:error, term()}

把 WorkflowSpec 转为旧 DAG 执行结构。

validate_workflow(input)

校验可持久化 WorkflowSpec。