CMDCOrchestrator.Executor (cmdc_orchestrator v0.5.0)

Copy Markdown View Source

DAG 执行器 — 拓扑分层并行调度 + Router 剪枝。

算法

  1. 用 Kahn 算法把 DAG 切成 (每层节点彼此独立)
  2. 层内:用 Task.async_stream/3 并行执行(受 :max_concurrency 控制)
  3. 层与层:串行(下游必须等上游全部完成)
  4. Router 剪枝:当上游为 :router 节点 + 边带 :branch 标签时,仅 route 匹配的下游会被激活,其他分支自动剪枝并传染给所有下游
  5. 任一节点失败 → halt 整个 DAG(v0.2 默认)

对应 Agentic Design Patterns

  • 第 2 章 Routing
  • 第 3 章 Parallelization
  • 第 12 章 Exception Handling and Recovery(节点级 retry/fallback 在 AgentNode)

选项

  • :max_concurrency — 单层最大并行度,默认 4
  • :node_timeout — 单节点最大执行时长(毫秒),默认 120_000
  • 其它键作为 agent_opts 透传给 :agent 节点

Summary

Functions

run(dag, agent_opts)

@spec run(
  CMDCOrchestrator.DAG.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}