命名 Provider Profile 注册表 — 多租户场景下的运行时 provider/opts 寻址中心。
让 Agent 创建路径上 model: "registry:tenant-A-anthropic:claude-sonnet-4-5"
字符串一句话替代 200 µs 量级的 per-Agent provider_opts 拼装。
Registry 是
CMDC.Config.providers(启动期静态多路由)的运行时动态补集, 二者并存:model 字符串带"registry:"prefix 走 Registry,否则走 Config。
设计
- 每节点本地 ETS(
:set+read_concurrency: true)——lookup/1≤ 1 µs hot path - 写串行化 GenServer call —
register/2/unregister/1走 GenServer 保证原子 - 跨节点同步 由
CMDC.Provider.Registry.Broadcasterbehaviour 解耦 (默认Broadcaster.PGbest-effort,生产推荐 Phoenix.PubSub) - Local-first —— 节点重启 = 集成方自家持久层 init 时 re-register
5 公开 API
:ok = Registry.register("tenant-A-anthropic",
provider: "anthropic",
opts: [api_key: System.get_env("TENANT_A_KEY"),
base_url: "https://litellm.tenant-a.internal"])
{:ok, %{provider: "anthropic", opts: [...]}} = Registry.lookup("tenant-A-anthropic")
{:error, :not_found} = Registry.lookup("not-exist")
:ok = Registry.unregister("tenant-A-anthropic")
["tenant-A-anthropic", "tenant-B-openai"] = Registry.list()
:ok = Registry.subscribe("tenant-A-anthropic")
# 本进程 mailbox 收 {:cmdc_registry, :profile_changed, name, new_opts | nil}v0.6+ resolver_fn 选项
:ok = Registry.register("tenant-vault",
provider: "anthropic",
opts: [vault_path: "/secret/tenant-vault/anthropic"],
resolver_fn: fn opts ->
{path, rest} = Keyword.pop!(opts, :vault_path)
{:ok, key} = MyApp.Vault.read(path)
Keyword.put(rest, :api_key, key)
end)
# lookup 命中时 resolver_fn 自动调用,opts 已解析为运行态
{:ok, %{opts: opts}} = Registry.lookup("tenant-vault")
assert opts[:api_key] # 已是真实 key,vault_path 已剥离
# resolver_fn 失败分支(v0.6+ 新增)
{:error, {:resolver_failed, exception}} = Registry.lookup("vault-down")
{:error, :resolver_invalid_return} = Registry.lookup("bad-resolver")详见下方「Vault / 静态加密集成模式」段。
与 Agent / Checkpoint 的关系
- Agent 启动时若
options.model是"registry:profile:model_id",Agent.init/1一次性Registry.lookup/1写入state.config.provider_opts, 此后 profile 改动不再影响该 Agent(运行时一致性) - 想切到新 profile 走
CMDC.switch_model(sid, "registry:new-profile:model_id")显式触发,所有切换都有 EventBus:model_switched事件留痕 CMDC.Checkpoint.Snapshot.state.options.model可能含"registry:"prefix; resume 时该 profile 必须仍在 Registry,否则CMDC.resume_session!/2返{:error, {:registry_profile_missing, name}}
profile name 约束
- 必须是非空 binary
- 不能含
:字符(否则"registry:#{name}:#{model_id}"解析时 name 会被吃掉一段) 违反返
{:error, {:invalid_name, :contains_colon | :empty | :not_binary}}
Studio 现有 "tenant-#{tid}-#{provider}" 风格继续可用;UUID / 自定义 ID
含 : 时调用方需自行替换为 - 或 _。
Vault / 静态加密集成模式(v0.6+ :resolver_fn)
生产环境一般不在内存里直接存明文 api_key,而是把密文 / vault 引用存进
opts,运行时通过 resolver_fn 懒解密。cmdc 不内置 vault 协议 /
全局 resolver / LRU cache,所有策略推回应用层;:resolver_fn 只是
per-profile 函数注入点,~30 行实现。
模式 1 — 启动时全量解密注入(推荐 99% 场景)
集成方在节点启动 init 时一次性把密文解密成明文 opts,不传 :resolver_fn:
defmodule MyApp.Boot.RegistryWarmup do
def run do
for tenant <- MyApp.Repo.all(Tenant) do
{:ok, api_key} = MyApp.Vault.decrypt(tenant.encrypted_provider_key)
Registry.register("tenant-#{tenant.id}",
provider: tenant.provider,
opts: [api_key: api_key, base_url: tenant.base_url]
)
end
end
end优点:
- 简单 —— lookup hot path 零额外开销(~500 ns ETS read)
- 故障早暴露 —— vault 不可达时启动直接 fail,不会等到首个 Agent 创建才报错
- 与 Broadcaster 完全兼容 —— 明文 opts 直接跨节点同步
缺点:
- 明文驻留 BEAM 堆,被 BEAM crash dump 暴露的风险
- vault rotate 需要全节点 Registry.register 重灌
模式 2 — 懒解密 + 自管 cache(高安全 / 频繁 rotate 场景)
在 :resolver_fn 闭包里挂业务自定义解密 + cache(cmdc 不负责 cache 策略,
集成方按需要选 ETS / Cachex / persistent_term):
defmodule MyApp.RegistryWithVault do
@cache_table :my_app_provider_opts_cache
@ttl_ms 5 * 60 * 1000
def warmup do
:ets.new(@cache_table, [:set, :public, :named_table, read_concurrency: true])
for tenant <- MyApp.Repo.all(Tenant) do
Registry.register("tenant-#{tenant.id}",
provider: tenant.provider,
opts: [vault_path: tenant.vault_key_path, base_url: tenant.base_url],
resolver_fn: &resolve_opts/1
)
end
end
defp resolve_opts(opts) do
path = Keyword.fetch!(opts, :vault_path)
now = System.monotonic_time(:millisecond)
case :ets.lookup(@cache_table, path) do
[{^path, decrypted, expires_at}] when expires_at > now ->
Keyword.put(opts, :api_key, decrypted) |> Keyword.delete(:vault_path)
_ ->
{:ok, decrypted} = MyApp.Vault.read(path)
:ets.insert(@cache_table, {path, decrypted, now + @ttl_ms})
Keyword.put(opts, :api_key, decrypted) |> Keyword.delete(:vault_path)
end
end
end优点:
- 明文不长期驻留(cache TTL 控制)
- vault rotate 自动生效(旧 TTL 过期即重新读 vault)
- resolver 失败时 Agent 创建路径返
{:error, {:registry_resolver_failed, name, ...}}, 集成方可在 EventBus / monitor 捕获
缺点:
- lookup hot path 多一次函数调用 + cache 检查(首次约 50µs,后续约 1µs)
- resolver 闭包只在本地节点有效,多节点部署集成方需在每个节点 init 时各自传入相同的
&resolve_opts/1引用
与 Broadcaster 的交互
:resolver_fn 闭包在跨节点 Broadcaster 同步时会被自动剥离——cmdc 内部
发往 broadcaster 的 {:register, name, opts} payload 不含 :resolver_fn,
远端节点接收后 profile.resolver_fn = nil,按"无 resolver"路径处理。
集成方多节点部署务必在每个节点初始化代码里各自调用一次 Registry.register/2
传入本节点版本的 resolver_fn。
跨节点同步语义
默认 Broadcaster.PG(基于 OTP :pg 模块)是 best-effort send-only:
- 无重试 ——
send/2失败丢消息 - 无 ack —— 不知道远端是否收到
- 网络分区会丢更新 —— 集群恢复后不会自动重放
生产多节点推荐实现 Broadcaster behaviour 接 Phoenix.PubSub / Redis Streams,
详见 CMDC.Provider.Registry.Broadcaster moduledoc。
配置
# config/config.exs(可选,默认 Broadcaster.PG)
config :cmdc, CMDC.Provider.Registry,
broadcaster: MyApp.PhoenixPubSubBroadcasterTelemetry
| 事件名 | metadata |
|---|---|
[:cmdc, :provider, :registry, :lookup] | %{name, hit?} + measurements %{duration_us} |
[:cmdc, :provider, :registry, :register] | %{name, broadcaster_called?} |
详见 CMDC.Telemetry。
Summary
Types
lookup/1 失败原因(含 v0.6+ resolver 失败分支)。
已注册的 Provider Profile 内部表示。
register/2 接受的入参 keyword。
register/2 校验失败原因。
Profile 解析函数(v0.6+,可选)。
Functions
列出所有已注册的 profile name(无序)。
按 name 精确查找 Profile(hot path,不走 GenServer,直接读 ETS)。
注册或覆盖一个 Provider Profile。同名 register 直接覆盖。
启动 Registry GenServer。CMDC.Application supervisor 树会自动拉起,
不需要业务代码手动调用。
当前进程订阅指定 profile 的变更通知。
注销 Profile。订阅者会收到 {:cmdc_registry, :profile_changed, name, nil},
跨节点 broadcaster 也会派发 {:unregister, name} 事件。
Types
@type lookup_error() :: :not_found | {:resolver_failed, term()} | :resolver_invalid_return
lookup/1 失败原因(含 v0.6+ resolver 失败分支)。
@type profile() :: %{ provider: String.t(), opts: keyword(), resolver_fn: resolver_fn() | nil, registered_at: integer() }
已注册的 Provider Profile 内部表示。
@type profile_opts() :: [ provider: String.t(), opts: keyword(), resolver_fn: resolver_fn() | nil ]
register/2 接受的入参 keyword。
@type register_error() :: {:invalid_name, :contains_colon | :empty | :not_binary} | {:invalid_profile, atom()}
register/2 校验失败原因。
Profile 解析函数(v0.6+,可选)。
在 lookup/1 命中时被调用,把存储态 opts 转成运行态 opts。
典型场景:opts 内存的是密文 / vault 路径,运行时解密为真实 api_key。
函数签名 (stored_opts :: keyword()) -> resolved_opts :: keyword()。
注意:
- 闭包只存在 register 该 profile 的本地节点,不参与 Broadcaster 跨节点同步
- 集成方需在每个节点的初始化代码里各自
register/2时传同样语义的 resolver_fn
Functions
@spec list() :: [String.t()]
列出所有已注册的 profile name(无序)。
仅用于运维 / 调试场景,不要在 hot path 调用
(:ets.tab2list 是 O(N) 全表扫描)。
@spec lookup(String.t()) :: {:ok, profile()} | {:error, lookup_error()}
按 name 精确查找 Profile(hot path,不走 GenServer,直接读 ETS)。
返回
{:ok, profile()}——%{provider:, opts:, resolver_fn:, registered_at:}若 register 时传入:resolver_fn,lookup 命中后会立即调用 resolver_fn,opts字段已是解析后的运行态 opts;profile 内:resolver_fn仅作元信息保留{:error, :not_found}—— profile 未注册或已 unregister{:error, {:resolver_failed, exception | term()}}—— resolver_fn 抛异常{:error, :resolver_invalid_return}—— resolver_fn 返回非 keyword
性能
ETS :set read_concurrency: true —— 单次查询 ~500 ns,
10000 次并发不阻塞写。Agent 创建路径仅一次 lookup。
resolver_fn 调用开销由集成方控制(如 Cloak 解密 ~50 µs,建议在 resolver
内做自管 cache)。
@spec register(String.t(), profile_opts()) :: :ok | {:error, register_error()}
注册或覆盖一个 Provider Profile。同名 register 直接覆盖。
参数
name— profile 唯一标识(不能含:,否则字符串协议解析会出错)profile_opts::provider— 必填,req_llm provider 标识(如"anthropic"/"openai"):opts— 默认[],透传给CMDC.Provider.stream/4的选项 (api_key/base_url/receive_timeout/temperature...):resolver_fn—(v0.6+,可选)arity-1 函数(keyword() -> keyword()),lookup/1命中时被调用把存储态 opts 转成运行态 opts。典型用于 Vault / Cloak 等密钥懒解密;详见「Vault 集成模式」段。 resolver_fn 闭包只存本地节点 ETS,不参与 Broadcaster 跨节点同步; 多节点部署集成方需在每个节点 init 时各自传入。
返回
:ok— 注册成功{:error, {:invalid_name, reason}}— name 不合法{:error, {:invalid_profile, reason}}— profile_opts 不合法
Examples
iex> Registry.register("tenant-A-anthropic",
...> provider: "anthropic",
...> opts: [api_key: "sk-tenant-a", base_url: "https://litellm.a.internal"])
:ok
iex> Registry.register("invalid:name", provider: "openai", opts: [])
{:error, {:invalid_name, :contains_colon}}resolver_fn 示例
:ok = Registry.register("tenant-A",
provider: "anthropic",
opts: [encrypted_key: "AES.GCM.v1.xxx", base_url: "https://litellm.a.internal"],
resolver_fn: fn opts ->
{encrypted, rest} = Keyword.pop!(opts, :encrypted_key)
{:ok, plain} = MyApp.Vault.decrypt(encrypted)
Keyword.put(rest, :api_key, plain)
end
)
{:ok, %{opts: opts}} = Registry.lookup("tenant-A")
# opts[:api_key] 已被解密替换;原 :encrypted_key 已剥离
@spec start_link(keyword()) :: GenServer.on_start()
启动 Registry GenServer。CMDC.Application supervisor 树会自动拉起,
不需要业务代码手动调用。
选项
:broadcaster— 覆盖默认 broadcaster module(默认从 Application env 读,再 fallbackBroadcaster.PG)
@spec subscribe(String.t()) :: :ok
当前进程订阅指定 profile 的变更通知。
收到的消息格式:
{:cmdc_registry, :profile_changed, name :: String.t(),
new_profile_opts :: keyword() | nil}new_profile_opts是 register 时传入的[provider:, opts:]nil表示该 profile 被 unregister
自动清理
当订阅进程退出时(exit / GC),Registry GenServer 通过 monitor 自动清理订阅。
无需手动 unsubscribe。
@spec unregister(String.t()) :: :ok
注销 Profile。订阅者会收到 {:cmdc_registry, :profile_changed, name, nil},
跨节点 broadcaster 也会派发 {:unregister, name} 事件。
幂等:name 不存在时也返 :ok,不返错误。