StreamChunk 事件处理与状态更新。
本模块位于 CMDC.Provider.StreamBridge 与 Agent 状态机之间,
处理 ReqLLM.StreamChunk 结构体。
StreamChunk 类型处理
| chunk.type | 处理逻辑 |
|---|---|
:content | 提取内联 XML 标签(<status>/<title>),追加到 current_text,广播 delta |
:thinking | 追加到 current_thinking,广播 thinking delta |
:tool_call | 将工具调用 upsert 进 current_tool_calls(整合已解析的 name+arguments) |
:meta | 提取 usage 信息和 tool_call_args 分片,更新 token_usage |
XML 标签提取
LLM 响应可能包含内联 XML 标签(如 <status>…</status>、<title>…</title>),
需要从用户可见文本中剔除并作为独立事件广播。
用 tag_buffers 缓冲跨 chunk 的不完整匹配。
Summary
Functions
从流式 delta 中提取 <status>…</status> 标签,广播 {:status_update, text}。
从流式 delta 中提取 <title>…</title> 标签,广播 {:title_generated, title}(限 60 字符)。
通用 XML 标签提取器,处理跨 chunk 的流式 delta。
处理单个 ReqLLM.StreamChunk,更新 Agent State。
返回 text 结尾匹配 <status> 前缀的字符数。
返回 text 结尾匹配 <title> 前缀的字符数。
Functions
@spec extract_status_tags(String.t(), CMDC.Agent.State.t()) :: {String.t(), CMDC.Agent.State.t()}
从流式 delta 中提取 <status>…</status> 标签,广播 {:status_update, text}。
@spec extract_title_tag(String.t(), CMDC.Agent.State.t()) :: {String.t(), CMDC.Agent.State.t()}
从流式 delta 中提取 <title>…</title> 标签,广播 {:title_generated, title}(限 60 字符)。
@spec extract_xml_tag(String.t(), atom(), CMDC.Agent.State.t(), (String.t(), CMDC.Agent.State.t() -> CMDC.Agent.State.t())) :: {String.t(), CMDC.Agent.State.t()}
通用 XML 标签提取器,处理跨 chunk 的流式 delta。
从 delta 中剔除 <tag_name>…</tag_name>,发现完整标签时调用 on_match。
不完整的标签缓冲到 state.tag_buffers[tag_name],等待下一个 chunk。
@spec handle_chunk(term(), CMDC.Agent.State.t()) :: CMDC.Agent.State.t()
处理单个 ReqLLM.StreamChunk,更新 Agent State。
是 gen_statem streaming 状态收到 {:cmdc_stream_chunk, chunk} 消息后的核心处理函数。
通过 chunk.type 分发到不同的处理逻辑。
@spec partial_tag_length(String.t()) :: non_neg_integer()
返回 text 结尾匹配 <status> 前缀的字符数。
@spec partial_title_tag_length(String.t()) :: non_neg_integer()
返回 text 结尾匹配 <title> 前缀的字符数。