CMDC.Provider.Registry (cmdc v0.6.0)

Copy Markdown View Source

命名 Provider Profile 注册表 — 多租户场景下的运行时 provider/opts 寻址中心。

让 Agent 创建路径上 model: "registry:tenant-A-anthropic:claude-sonnet-4-5" 字符串一句话替代 200 µs 量级的 per-Agent provider_opts 拼装。

Registry 是 CMDC.Config.providers(启动期静态多路由)的运行时动态补集, 二者并存:model 字符串带 "registry:" prefix 走 Registry,否则走 Config。

设计

  • 每节点本地 ETS:set + read_concurrency: true)—— lookup/1 ≤ 1 µs hot path
  • 写串行化 GenServer call — register/2 / unregister/1 走 GenServer 保证原子
  • 跨节点同步CMDC.Provider.Registry.Broadcaster behaviour 解耦 (默认 Broadcaster.PG best-effort,生产推荐 Phoenix.PubSub)
  • Local-first —— 节点重启 = 集成方自家持久层 init 时 re-register

5 公开 API

:ok = Registry.register("tenant-A-anthropic",
        provider: "anthropic",
        opts: [api_key: System.get_env("TENANT_A_KEY"),
               base_url: "https://litellm.tenant-a.internal"])

{:ok, %{provider: "anthropic", opts: [...]}} = Registry.lookup("tenant-A-anthropic")
{:error, :not_found} = Registry.lookup("not-exist")

:ok = Registry.unregister("tenant-A-anthropic")

["tenant-A-anthropic", "tenant-B-openai"] = Registry.list()

:ok = Registry.subscribe("tenant-A-anthropic")
# 本进程 mailbox 收 {:cmdc_registry, :profile_changed, name, new_opts | nil}

v0.6+ resolver_fn 选项

:ok = Registry.register("tenant-vault",
        provider: "anthropic",
        opts: [vault_path: "/secret/tenant-vault/anthropic"],
        resolver_fn: fn opts ->
          {path, rest} = Keyword.pop!(opts, :vault_path)
          {:ok, key} = MyApp.Vault.read(path)
          Keyword.put(rest, :api_key, key)
        end)

# lookup 命中时 resolver_fn 自动调用,opts 已解析为运行态
{:ok, %{opts: opts}} = Registry.lookup("tenant-vault")
assert opts[:api_key] # 已是真实 key,vault_path 已剥离

# resolver_fn 失败分支(v0.6+ 新增)
{:error, {:resolver_failed, exception}} = Registry.lookup("vault-down")
{:error, :resolver_invalid_return} = Registry.lookup("bad-resolver")

详见下方「Vault / 静态加密集成模式」段。

与 Agent / Checkpoint 的关系

  • Agent 启动时若 options.model"registry:profile:model_id"Agent.init/1 一次性 Registry.lookup/1 写入 state.config.provider_opts此后 profile 改动不再影响该 Agent(运行时一致性)
  • 想切到新 profile 走 CMDC.switch_model(sid, "registry:new-profile:model_id") 显式触发,所有切换都有 EventBus :model_switched 事件留痕
  • CMDC.Checkpoint.Snapshot.state.options.model 可能含 "registry:" prefix; resume 时该 profile 必须仍在 Registry,否则 CMDC.resume_session!/2{:error, {:registry_profile_missing, name}}

profile name 约束

  • 必须是非空 binary
  • 不能含 : 字符(否则 "registry:#{name}:#{model_id}" 解析时 name 会被吃掉一段)
  • 违反返 {:error, {:invalid_name, :contains_colon | :empty | :not_binary}}

Studio 现有 "tenant-#{tid}-#{provider}" 风格继续可用;UUID / 自定义 ID 含 : 时调用方需自行替换为 -_

Vault / 静态加密集成模式(v0.6+ :resolver_fn

生产环境一般不在内存里直接存明文 api_key,而是把密文 / vault 引用存进 opts,运行时通过 resolver_fn 懒解密。cmdc 不内置 vault 协议 / 全局 resolver / LRU cache,所有策略推回应用层;:resolver_fn 只是 per-profile 函数注入点,~30 行实现。

模式 1 — 启动时全量解密注入(推荐 99% 场景)

集成方在节点启动 init 时一次性把密文解密成明文 opts,不传 :resolver_fn:

defmodule MyApp.Boot.RegistryWarmup do
  def run do
    for tenant <- MyApp.Repo.all(Tenant) do
      {:ok, api_key} = MyApp.Vault.decrypt(tenant.encrypted_provider_key)

      Registry.register("tenant-#{tenant.id}",
        provider: tenant.provider,
        opts: [api_key: api_key, base_url: tenant.base_url]
      )
    end
  end
end

优点:

  • 简单 —— lookup hot path 零额外开销(~500 ns ETS read)
  • 故障早暴露 —— vault 不可达时启动直接 fail,不会等到首个 Agent 创建才报错
  • 与 Broadcaster 完全兼容 —— 明文 opts 直接跨节点同步

缺点:

  • 明文驻留 BEAM 堆,被 BEAM crash dump 暴露的风险
  • vault rotate 需要全节点 Registry.register 重灌

模式 2 — 懒解密 + 自管 cache(高安全 / 频繁 rotate 场景)

:resolver_fn 闭包里挂业务自定义解密 + cache(cmdc 不负责 cache 策略, 集成方按需要选 ETS / Cachex / persistent_term):

defmodule MyApp.RegistryWithVault do
  @cache_table :my_app_provider_opts_cache
  @ttl_ms 5 * 60 * 1000

  def warmup do
    :ets.new(@cache_table, [:set, :public, :named_table, read_concurrency: true])

    for tenant <- MyApp.Repo.all(Tenant) do
      Registry.register("tenant-#{tenant.id}",
        provider: tenant.provider,
        opts: [vault_path: tenant.vault_key_path, base_url: tenant.base_url],
        resolver_fn: &resolve_opts/1
      )
    end
  end

  defp resolve_opts(opts) do
    path = Keyword.fetch!(opts, :vault_path)
    now = System.monotonic_time(:millisecond)

    case :ets.lookup(@cache_table, path) do
      [{^path, decrypted, expires_at}] when expires_at > now ->
        Keyword.put(opts, :api_key, decrypted) |> Keyword.delete(:vault_path)

      _ ->
        {:ok, decrypted} = MyApp.Vault.read(path)
        :ets.insert(@cache_table, {path, decrypted, now + @ttl_ms})
        Keyword.put(opts, :api_key, decrypted) |> Keyword.delete(:vault_path)
    end
  end
end

优点:

  • 明文不长期驻留(cache TTL 控制)
  • vault rotate 自动生效(旧 TTL 过期即重新读 vault)
  • resolver 失败时 Agent 创建路径返 {:error, {:registry_resolver_failed, name, ...}}, 集成方可在 EventBus / monitor 捕获

缺点:

  • lookup hot path 多一次函数调用 + cache 检查(首次约 50µs,后续约 1µs)
  • resolver 闭包只在本地节点有效,多节点部署集成方需在每个节点 init 时各自传入相同的 &resolve_opts/1 引用

与 Broadcaster 的交互

:resolver_fn 闭包在跨节点 Broadcaster 同步时会被自动剥离——cmdc 内部 发往 broadcaster 的 {:register, name, opts} payload 不含 :resolver_fn, 远端节点接收后 profile.resolver_fn = nil,按"无 resolver"路径处理。 集成方多节点部署务必在每个节点初始化代码里各自调用一次 Registry.register/2 传入本节点版本的 resolver_fn。

跨节点同步语义

默认 Broadcaster.PG(基于 OTP :pg 模块)是 best-effort send-only

  • 无重试 —— send/2 失败丢消息
  • 无 ack —— 不知道远端是否收到
  • 网络分区会丢更新 —— 集群恢复后不会自动重放

生产多节点推荐实现 Broadcaster behaviour 接 Phoenix.PubSub / Redis Streams, 详见 CMDC.Provider.Registry.Broadcaster moduledoc。

配置

# config/config.exs(可选,默认 Broadcaster.PG)
config :cmdc, CMDC.Provider.Registry,
  broadcaster: MyApp.PhoenixPubSubBroadcaster

Telemetry

事件名metadata
[:cmdc, :provider, :registry, :lookup]%{name, hit?} + measurements %{duration_us}
[:cmdc, :provider, :registry, :register]%{name, broadcaster_called?}

详见 CMDC.Telemetry

Summary

Types

lookup/1 失败原因(含 v0.6+ resolver 失败分支)。

已注册的 Provider Profile 内部表示。

register/2 接受的入参 keyword。

register/2 校验失败原因。

Profile 解析函数(v0.6+,可选)。

Functions

列出所有已注册的 profile name(无序)。

按 name 精确查找 Profile(hot path,不走 GenServer,直接读 ETS)。

注册或覆盖一个 Provider Profile。同名 register 直接覆盖。

启动 Registry GenServer。CMDC.Application supervisor 树会自动拉起, 不需要业务代码手动调用。

当前进程订阅指定 profile 的变更通知。

注销 Profile。订阅者会收到 {:cmdc_registry, :profile_changed, name, nil}, 跨节点 broadcaster 也会派发 {:unregister, name} 事件。

Types

lookup_error()

@type lookup_error() ::
  :not_found | {:resolver_failed, term()} | :resolver_invalid_return

lookup/1 失败原因(含 v0.6+ resolver 失败分支)。

profile()

@type profile() :: %{
  provider: String.t(),
  opts: keyword(),
  resolver_fn: resolver_fn() | nil,
  registered_at: integer()
}

已注册的 Provider Profile 内部表示。

profile_opts()

@type profile_opts() :: [
  provider: String.t(),
  opts: keyword(),
  resolver_fn: resolver_fn() | nil
]

register/2 接受的入参 keyword。

register_error()

@type register_error() ::
  {:invalid_name, :contains_colon | :empty | :not_binary}
  | {:invalid_profile, atom()}

register/2 校验失败原因。

resolver_fn()

@type resolver_fn() :: (keyword() -> keyword())

Profile 解析函数(v0.6+,可选)。

lookup/1 命中时被调用,把存储态 opts 转成运行态 opts。 典型场景:opts 内存的是密文 / vault 路径,运行时解密为真实 api_key。

函数签名 (stored_opts :: keyword()) -> resolved_opts :: keyword()

注意:

  • 闭包只存在 register 该 profile 的本地节点,不参与 Broadcaster 跨节点同步
  • 集成方需在每个节点的初始化代码里各自 register/2 时传同样语义的 resolver_fn

Functions

list()

@spec list() :: [String.t()]

列出所有已注册的 profile name(无序)。

仅用于运维 / 调试场景,不要在 hot path 调用:ets.tab2list 是 O(N) 全表扫描)。

lookup(name)

@spec lookup(String.t()) :: {:ok, profile()} | {:error, lookup_error()}

按 name 精确查找 Profile(hot path,不走 GenServer,直接读 ETS)。

返回

  • {:ok, profile()} —— %{provider:, opts:, resolver_fn:, registered_at:} 若 register 时传入 :resolver_fnlookup 命中后会立即调用 resolver_fn, opts 字段已是解析后的运行态 opts;profile 内 :resolver_fn 仅作元信息保留
  • {:error, :not_found} —— profile 未注册或已 unregister
  • {:error, {:resolver_failed, exception | term()}} —— resolver_fn 抛异常

  • {:error, :resolver_invalid_return} —— resolver_fn 返回非 keyword

性能

ETS :set read_concurrency: true —— 单次查询 ~500 ns, 10000 次并发不阻塞写。Agent 创建路径仅一次 lookup。 resolver_fn 调用开销由集成方控制(如 Cloak 解密 ~50 µs,建议在 resolver 内做自管 cache)。

register(name, profile_opts)

@spec register(String.t(), profile_opts()) :: :ok | {:error, register_error()}

注册或覆盖一个 Provider Profile。同名 register 直接覆盖。

参数

  • name — profile 唯一标识(不能含 :,否则字符串协议解析会出错)
  • profile_opts
    • :provider — 必填,req_llm provider 标识(如 "anthropic" / "openai"
    • :opts — 默认 [],透传给 CMDC.Provider.stream/4 的选项 (api_key / base_url / receive_timeout / temperature ...)
    • :resolver_fn —(v0.6+,可选)arity-1 函数 (keyword() -> keyword())lookup/1 命中时被调用把存储态 opts 转成运行态 opts。典型用于 Vault / Cloak 等密钥懒解密;详见「Vault 集成模式」段。 resolver_fn 闭包只存本地节点 ETS,不参与 Broadcaster 跨节点同步; 多节点部署集成方需在每个节点 init 时各自传入。

返回

  • :ok — 注册成功
  • {:error, {:invalid_name, reason}} — name 不合法
  • {:error, {:invalid_profile, reason}} — profile_opts 不合法

Examples

iex> Registry.register("tenant-A-anthropic",
...>   provider: "anthropic",
...>   opts: [api_key: "sk-tenant-a", base_url: "https://litellm.a.internal"])
:ok

iex> Registry.register("invalid:name", provider: "openai", opts: [])
{:error, {:invalid_name, :contains_colon}}

resolver_fn 示例

:ok = Registry.register("tenant-A",
  provider: "anthropic",
  opts: [encrypted_key: "AES.GCM.v1.xxx", base_url: "https://litellm.a.internal"],
  resolver_fn: fn opts ->
    {encrypted, rest} = Keyword.pop!(opts, :encrypted_key)
    {:ok, plain} = MyApp.Vault.decrypt(encrypted)
    Keyword.put(rest, :api_key, plain)
  end
)

{:ok, %{opts: opts}} = Registry.lookup("tenant-A")
# opts[:api_key] 已被解密替换;原 :encrypted_key 已剥离

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

启动 Registry GenServer。CMDC.Application supervisor 树会自动拉起, 不需要业务代码手动调用。

选项

  • :broadcaster — 覆盖默认 broadcaster module(默认从 Application env 读,再 fallback Broadcaster.PG

subscribe(name)

@spec subscribe(String.t()) :: :ok

当前进程订阅指定 profile 的变更通知。

收到的消息格式:

{:cmdc_registry, :profile_changed, name :: String.t(),
 new_profile_opts :: keyword() | nil}
  • new_profile_opts 是 register 时传入的 [provider:, opts:]
  • nil 表示该 profile 被 unregister

自动清理

当订阅进程退出时(exit / GC),Registry GenServer 通过 monitor 自动清理订阅。 无需手动 unsubscribe

unregister(name)

@spec unregister(String.t()) :: :ok

注销 Profile。订阅者会收到 {:cmdc_registry, :profile_changed, name, nil}, 跨节点 broadcaster 也会派发 {:unregister, name} 事件。

幂等:name 不存在时也返 :ok,不返错误。