CMDC.EventBus (cmdc v0.6.1)

Copy Markdown View Source

基于 Registry 的会话事件发布/订阅系统,附带可选的 ring buffer 用于断线重连补帧。

基础订阅

CMDC.EventBus.subscribe("session-abc123")

receive do
  {:cmdc_event, "session-abc123", event} -> handle_event(event)
end

CMDC.EventBus.broadcast("session-abc123", {:stream_chunk, "session-abc123", "hello"})

CMDC.EventBus.subscribe_all()

群体订阅(v0.6+)

当多个 Agent 协作一个上层"群"(多人房间 / 协作小组 / 联合调查 task force)时, 用 subscribe_group/2 一次性订阅 group 内所有 session 的事件, 替代手动维护 [session_id] 列表 + 多次 subscribe/1 调用。

group_id = CMDC.EventBus.create_group()

{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)

CMDC.EventBus.subscribe_group(group_id)

receive do
  {:cmdc_event, session_id, event} ->
    # 收到 group 内任一 session 的事件,session_id 标识来源
    handle_event(session_id, event)
end

Agent 创建时不传 :group_id 行为完全等同 v0.5(不入任何 group,不影响订阅)。

Ring Buffer

适用场景:WebSocket / Channel 短暂断线重连时不丢事件。

Agent 启动时通过 CMDC.Options.new!(event_buffer_size: 100) 开启该会话的内存 ring buffer。Buffer 满后自动丢弃最早事件(FIFO)。默认 0(关闭,零内存开销)。

重连后用 :since 选项 replay:

{:ok, _pid} = CMDC.EventBus.subscribe("session-abc123", since: last_index)

receive do
  {:cmdc_event, _sid, event} -> # ... 包含 replay + 新事件
end

典型 buffer size 建议 50 ~ 200,对应 1-3 秒内的 stream_chunk 风暴。 Buffer 由 ETS 实现,session 终止时通过 disable_buffer/1 清理(CMDC.Agent 在 terminate/3 自动调用)。

Summary

Functions

广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。

广播事件到 session 订阅者 + group 订阅者(v0.6+)。

生成一个全局唯一的 group_id 字符串(v0.6+)。

关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。

开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。

返回该会话最新事件 index(未开启 buffer 时为 nil)。

订阅当前进程对指定会话 ID 的事件。

订阅当前进程对所有会话的事件(监控/调试用)。

订阅当前进程对指定 group 的所有 session 事件(v0.6+)。

取消当前进程对指定会话 ID 的订阅。

取消当前进程的通配符订阅。

取消当前进程对指定 group 的订阅(v0.6+)。

Functions

broadcast(session_id, event)

@spec broadcast(String.t(), term()) :: :ok | {:ok, pos_integer()}

广播事件到指定会话所有订阅者,并写入 ring buffer(若已开启)。

返回事件分配到的单调递增 index(开启 buffer 时)或 :ok(未开启)。 Registry 未启动时(如部分测试环境)静默返回 :ok

v0.6+ 三参数版本:同时派发到 group-level 订阅者(group_id 非 nil 时)。 二参数版本保留兼容,等同于 broadcast(session_id, nil, event)

broadcast(session_id, group_id, event)

@spec broadcast(String.t(), String.t() | nil, term()) :: :ok | {:ok, pos_integer()}

广播事件到 session 订阅者 + group 订阅者(v0.6+)。

group_idnil 时行为等同二参数版本(仅 session 派发,零额外开销)。 当 group_id 非 nil 时额外派发一份到 {:group, group_id} 的订阅者, 消息格式与单 session 订阅一致:{:cmdc_event, session_id, event}

create_group()

@spec create_group() :: String.t()

生成一个全局唯一的 group_id 字符串(v0.6+)。

实现走 :crypto.strong_rand_bytes/1 16 字节 → base32 lower 编码 → 前缀 "grp_",约 28 字符。集成方完全可用任意自有字符串(如 "study-room-<room_id>" / "task-force-<uuid>"),本 helper 只是 便利函数,不强制使用。

示例

iex> id = CMDC.EventBus.create_group()
iex> String.starts_with?(id, "grp_")
true

disable_buffer(session_id)

@spec disable_buffer(String.t()) :: :ok

关闭 ring buffer 并清理该会话所有缓存事件(CMDC.Agent.terminate 自动调用)。

enable_buffer(session_id, size)

@spec enable_buffer(String.t(), non_neg_integer()) :: :ok

开启指定会话的 ring buffer(CMDC.Agent.init 自动调用)。

buffer_size 必须为正整数;<= 0 等同 no-op。

last_index(session_id)

@spec last_index(String.t()) :: non_neg_integer() | nil

返回该会话最新事件 index(未开启 buffer 时为 nil)。

常用于断线重连前记录"上次看到的 index",重连时传 subscribe(sid, since: idx)

subscribe(session_id, opts \\ [])

@spec subscribe(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

订阅当前进程对指定会话 ID 的事件。

接收 {:cmdc_event, session_id, event} 消息。

选项

  • :since :: non_neg_integer() — 从该 index 之后的事件开始 replay。要求该 会话已开启 ring buffer(Options.event_buffer_size > 0),否则该选项被忽略。 如 since 早于 buffer 起点(已被丢弃),仅 replay 还在 buffer 内的部分。
  • :types :: [atom()] — 只 replay 这些 type 的事件; 适用于 UI 重连补帧只想要 :stream_chunk + :agent_end 不想要 :tool_started 等内部事件的场景。事件 type 来自事件第一个元素(如 {:agent_end, _, _} 的 type 是 :agent_end,裸 atom 事件如 :agent_start type 就是它自己)。 此选项仅影响 replay 阶段;实时订阅不过滤,避免漏事件。

返回 {:ok, pid},replay 事件在订阅注册后按时序异步投递到当前进程。

subscribe_all()

@spec subscribe_all() :: {:ok, pid()} | {:error, {:already_registered, pid()}}

订阅当前进程对所有会话的事件(监控/调试用)。

subscribe_group(group_id)

@spec subscribe_group(String.t()) :: {:ok, pid()} | {:error, term()}

订阅当前进程对指定 group 的所有 session 事件(v0.6+)。

接收 {:cmdc_event, session_id, event} 消息,session_id 标识事件来源 group 内具体哪个 Agent。group_id 由集成方在 Agent 创建时通过 Options.group_id 指定(可用 create_group/0 生成或自定义字符串)。

示例

group_id = "study-room-1"
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)
{:ok, _} = CMDC.create_agent(model: "...", group_id: group_id)

{:ok, _pid} = CMDC.EventBus.subscribe_group(group_id)

receive do
  {:cmdc_event, sid, event} -> # group 内任一 session 的事件
    IO.inspect({sid, event})
end

与 subscribe/1 的关系

二者互补——同一进程可同时订阅 session-level + group-level, group-level 是聚合视图,不替代单 session 订阅。Agent 不在任何 group 时, group-level 订阅者不会收到该 Agent 事件(行为与 v0.5 完全兼容)。

返回 {:ok, pid}{:error, {:already_registered, pid}}(同一进程 对同一 group 重复订阅)。

unsubscribe(session_id)

@spec unsubscribe(String.t()) :: :ok

取消当前进程对指定会话 ID 的订阅。

unsubscribe_all()

@spec unsubscribe_all() :: :ok

取消当前进程的通配符订阅。

unsubscribe_group(group_id)

@spec unsubscribe_group(String.t()) :: :ok

取消当前进程对指定 group 的订阅(v0.6+)。