CMDC.Agent.Stream (cmdc v0.4.0)

Copy Markdown View Source

StreamChunk 事件处理与状态更新。

本模块位于 CMDC.Provider.StreamBridge 与 Agent 状态机之间, 处理 ReqLLM.StreamChunk 结构体。

StreamChunk 类型处理

chunk.type处理逻辑
:content提取内联 XML 标签(<status>/<title>),追加到 current_text,广播 delta
:thinking追加到 current_thinking,广播 thinking delta
:tool_call将工具调用 upsert 进 current_tool_calls(整合已解析的 name+arguments)
:meta提取 usage 信息和 tool_call_args 分片,更新 token_usage

XML 标签提取

LLM 响应可能包含内联 XML 标签(如 <status>…</status><title>…</title>), 需要从用户可见文本中剔除并作为独立事件广播。 用 tag_buffers 缓冲跨 chunk 的不完整匹配。

Summary

Functions

从流式 delta 中提取 <status>…</status> 标签,广播 {:status_update, text}

从流式 delta 中提取 <title>…</title> 标签,广播 {:title_generated, title}(限 60 字符)。

通用 XML 标签提取器,处理跨 chunk 的流式 delta。

处理单个 ReqLLM.StreamChunk,更新 Agent State。

返回 text 结尾匹配 <status> 前缀的字符数。

返回 text 结尾匹配 <title> 前缀的字符数。

Functions

extract_status_tags(delta, state)

@spec extract_status_tags(String.t(), CMDC.Agent.State.t()) ::
  {String.t(), CMDC.Agent.State.t()}

从流式 delta 中提取 <status>…</status> 标签,广播 {:status_update, text}

extract_title_tag(delta, state)

@spec extract_title_tag(String.t(), CMDC.Agent.State.t()) ::
  {String.t(), CMDC.Agent.State.t()}

从流式 delta 中提取 <title>…</title> 标签,广播 {:title_generated, title}(限 60 字符)。

extract_xml_tag(delta, tag_name, state, on_match)

通用 XML 标签提取器,处理跨 chunk 的流式 delta。

delta 中剔除 <tag_name>…</tag_name>,发现完整标签时调用 on_match。 不完整的标签缓冲到 state.tag_buffers[tag_name],等待下一个 chunk。

handle_chunk(chunk, state)

@spec handle_chunk(term(), CMDC.Agent.State.t()) :: CMDC.Agent.State.t()

处理单个 ReqLLM.StreamChunk,更新 Agent State。

是 gen_statem streaming 状态收到 {:cmdc_stream_chunk, chunk} 消息后的核心处理函数。 通过 chunk.type 分发到不同的处理逻辑。

partial_tag_length(text)

@spec partial_tag_length(String.t()) :: non_neg_integer()

返回 text 结尾匹配 <status> 前缀的字符数。

partial_title_tag_length(text)

@spec partial_title_tag_length(String.t()) :: non_neg_integer()

返回 text 结尾匹配 <title> 前缀的字符数。