DAG 执行器 — 拓扑分层并行调度 + Router 剪枝。
算法
- 用 Kahn 算法把 DAG 切成 层(每层节点彼此独立)
- 层内:用
Task.async_stream/3并行执行(受:max_concurrency控制) - 层与层:串行(下游必须等上游全部完成)
- Router 剪枝:当上游为
:router节点 + 边带:branch标签时,仅route匹配的下游会被激活,其他分支自动剪枝并传染给所有下游 - 任一节点失败 → halt 整个 DAG(v0.2 默认)
对应 Agentic Design Patterns:
- 第 2 章 Routing
- 第 3 章 Parallelization
- 第 12 章 Exception Handling and Recovery(节点级 retry/fallback 在 AgentNode)
选项
:max_concurrency— 单层最大并行度,默认 4:node_timeout— 单节点最大执行时长(毫秒),默认 120_000- 其它键作为
agent_opts透传给:agent节点
Summary
Functions
@spec run( CMDCOrchestrator.DAG.t(), keyword() ) :: {:ok, map()} | {:error, term()}