CMDCOrchestrator.Runtime (cmdc_orchestrator v0.4.0)

Copy Markdown View Source

DAG 运行时句柄:承载一次 Executor.run/2 周期内共享的进程资源。

负责 v0.3 新增:pool / :subagent 模式:

  • :pool — 按用户指定的 pool_key 缓存一个 CMDC session,多个 AgentNode 若共享同一个 key 则复用同一个 session(历史会话累积, 类似 "ConversationalAgent")
  • :subagent — 每次 AgentNode 启动独立 session,但登记到 runtime 注册表;DAG 结束时由 runtime 统一优雅停止(每节点自己不 stop)

生命周期 严格绑定 Executor.run/2

runtime = Runtime.start(agent_opts)
try do
  execute_layers(...)
after
  Runtime.shutdown(runtime)
end

线程安全

注册表使用 ETS table,public + read_concurrency 标志,支持多个节点 在层内并行访问。获取 pool session 使用 :ets.update_counter/4 + 双重 查找避免竞态。

Summary

Types

t()

Runtime 内部 state。

Functions

:pool 模式 —— 按 pool_key 获取或创建共享 session。

当前 runtime 登记的 session 数量(方便调试 / 测试)。

优雅关闭 runtime:并发 stop 所有已登记的 session,然后销毁 ETS 表。

:subagent 模式 —— 启动独立 session 并登记到 runtime,由 runtime 负责清理。

主动提前释放一个已登记 session(可选优化)。

Types

t()

@type t() :: %CMDCOrchestrator.Runtime{agent_opts: keyword(), registry: :ets.tid()}

Runtime 内部 state。

Functions

get_or_create_pool(runtime, pool_key, agent_opts)

@spec get_or_create_pool(t(), term(), keyword()) :: {:ok, pid()} | {:error, term()}

:pool 模式 —— 按 pool_key 获取或创建共享 session。

不同 pool_key 的调用互不干扰;相同 pool_key 的第二次调用返回已缓存 pid。

session_count(runtime)

@spec session_count(t()) :: non_neg_integer()

当前 runtime 登记的 session 数量(方便调试 / 测试)。

shutdown(runtime)

@spec shutdown(t()) :: :ok

优雅关闭 runtime:并发 stop 所有已登记的 session,然后销毁 ETS 表。

幂等:同一个 runtime 被 shutdown 多次不报错。

start(agent_opts \\ [])

@spec start(keyword()) :: t()

start_subagent(runtime, agent_opts)

@spec start_subagent(
  t(),
  keyword()
) :: {:ok, pid(), term()} | {:error, term()}

:subagent 模式 —— 启动独立 session 并登记到 runtime,由 runtime 负责清理。

返回 {:ok, pid, key},其中 key 可用于 untrack/2 在节点成功后主动 提前释放(可选)。

untrack(runtime, key)

@spec untrack(t(), term()) :: :ok

主动提前释放一个已登记 session(可选优化)。