基于 Registry 的会话事件发布/订阅系统,附带可选的 ring buffer 用于断线重连补帧。
基础订阅
CMDC.EventBus.subscribe("session-abc123")
receive do
{:cmdc_event, "session-abc123", event} -> handle_event(event)
end
CMDC.EventBus.broadcast("session-abc123", {:stream_chunk, "session-abc123", "hello"})
CMDC.EventBus.subscribe_all()群体订阅(v0.6+)
当多个 Agent 协作一个上层"群"(多人房间 / 协作小组 / 联合调查 task force)时,
用 subscribe_group/2 一次性订阅 group 内所有 session 的事件,
替代手动维护 [session_id] 列表 + 多次 subscribe/1 调用。
group_id = CMDC.EventBus.create_group()
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)
CMDC.EventBus.subscribe_group(group_id)
receive do
{:cmdc_event, session_id, event} ->
# 收到 group 内任一 session 的事件,session_id 标识来源
handle_event(session_id, event)
endAgent 创建时不传 :group_id 行为完全等同 v0.5(不入任何 group,不影响订阅)。
Ring Buffer
适用场景:WebSocket / Channel 短暂断线重连时不丢事件。
Agent 启动时通过 CMDC.Options.new!(event_buffer_size: 100) 开启该会话的内存
ring buffer。Buffer 满后自动丢弃最早事件(FIFO)。默认 0(关闭,零内存开销)。
重连后用 :since 选项 replay:
{:ok, _pid} = CMDC.EventBus.subscribe("session-abc123", since: last_index)
receive do
{:cmdc_event, _sid, event} -> # ... 包含 replay + 新事件
end典型 buffer size 建议 50 ~ 200,对应 1-3 秒内的 stream_chunk 风暴。
Buffer 由 ETS 实现,session 终止时通过 disable_buffer/1 清理(CMDC.Agent
在 terminate/3 自动调用)。
Summary
Functions
广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。
广播事件到 session 订阅者 + group 订阅者(v0.6+)。
生成一个全局唯一的 group_id 字符串(v0.6+)。
关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。
开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。
返回该会话最新事件 index(未开启 buffer 时为 nil)。
订阅当前进程对指定会话 ID 的事件。
订阅当前进程对所有会话的事件(监控/调试用)。
订阅当前进程对指定 group 的所有 session 事件(v0.6+)。
取消当前进程对指定会话 ID 的订阅。
取消当前进程的通配符订阅。
取消当前进程对指定 group 的订阅(v0.6+)。
Functions
@spec broadcast(String.t(), term()) :: :ok | {:ok, pos_integer()}
广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。
返回事件分配到的单调递增 index(开启 buffer 时)或 :ok(未开启)。
Registry 未启动时(如部分测试环境)静默返回 :ok。
v0.6+ 三参数版本:同时派发到 group-level 订阅者(group_id 非 nil 时)。
二参数版本保留兼容,等同于 broadcast(session_id, nil, event)。
@spec broadcast(String.t(), String.t() | nil, term()) :: :ok | {:ok, pos_integer()}
广播事件到 session 订阅者 + group 订阅者(v0.6+)。
当 group_id 为 nil 时行为等同二参数版本(仅 session 派发,零额外开销)。
当 group_id 非 nil 时额外派发一份到 {:group, group_id} 的订阅者,
消息格式与单 session 订阅一致:{:cmdc_event, session_id, event}。
@spec create_group() :: String.t()
生成一个全局唯一的 group_id 字符串(v0.6+)。
实现走 :crypto.strong_rand_bytes/1 16 字节 → base32 lower 编码 →
前缀 "grp_",约 28 字符。集成方完全可用任意自有字符串(如
"study-room-<room_id>" / "task-force-<uuid>"),本 helper 只是
便利函数,不强制使用。
示例
iex> id = CMDC.EventBus.create_group()
iex> String.starts_with?(id, "grp_")
true
@spec disable_buffer(String.t()) :: :ok
关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。
@spec enable_buffer(String.t(), non_neg_integer()) :: :ok
开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。
buffer_size 必须为正整数;<= 0 等同 no-op。
@spec last_index(String.t()) :: non_neg_integer() | nil
返回该会话最新事件 index(未开启 buffer 时为 nil)。
常用于断线重连前记录"上次看到的 index",重连时传 subscribe(sid, since: idx)。
订阅当前进程对指定会话 ID 的事件。
接收 {:cmdc_event, session_id, event} 消息。
选项
:since :: non_neg_integer()— 从该 index 之后的事件开始 replay。要求该 会话已开启 ring buffer(Options.event_buffer_size > 0),否则该选项被忽略。 如since早于 buffer 起点(已被丢弃),仅 replay 还在 buffer 内的部分。:types :: [atom()]— 只 replay 这些 type 的事件; 适用于 UI 重连补帧只想要:stream_chunk+:agent_end不想要:tool_started等内部事件的场景。事件 type 来自事件第一个元素(如{:agent_end, _, _}的 type 是:agent_end,裸 atom 事件如:agent_starttype 就是它自己)。 此选项仅影响 replay 阶段;实时订阅不过滤,避免漏事件。
返回 {:ok, pid},replay 事件在订阅注册后按时序异步投递到当前进程。
订阅当前进程对所有会话的事件(监控/调试用)。
订阅当前进程对指定 group 的所有 session 事件(v0.6+)。
接收 {:cmdc_event, session_id, event} 消息,session_id 标识事件来源
group 内具体哪个 Agent。group_id 由集成方在 Agent 创建时通过
Options.group_id 指定(可用 create_group/0 生成或自定义字符串)。
示例
group_id = "study-room-1"
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)
{:ok, _pid} = CMDC.EventBus.subscribe_group(group_id)
receive do
{:cmdc_event, sid, event} -> # group 内任一 session 的事件
IO.inspect({sid, event})
end与 subscribe/1 的关系
二者互补——同一进程可同时订阅 session-level + group-level, group-level 是聚合视图,不替代单 session 订阅。Agent 不在任何 group 时, group-level 订阅者不会收到该 Agent 事件(行为与 v0.5 完全兼容)。
返回 {:ok, pid} 或 {:error, {:already_registered, pid}}(同一进程
对同一 group 重复订阅)。
@spec unsubscribe(String.t()) :: :ok
取消当前进程对指定会话 ID 的订阅。
@spec unsubscribe_all() :: :ok
取消当前进程的通配符订阅。
@spec unsubscribe_group(String.t()) :: :ok
取消当前进程对指定 group 的订阅(v0.6+)。