CMDC.Agent.State (cmdc v0.4.0)

Copy Markdown View Source

CMDC.Agent gen_statem 的内部运行时状态。

字段分组

  • Identitysession_idmodelworking_dirconfigstatus
  • Conversationblueprint_system_promptmessages(反序存储,最新在前)
  • Plugin Pipelinepluginsplugin_states
  • Tool registrytoolsdisabled_tools
  • Tool executionpending_tool_taskstool_results
  • Streaming accumulatorcurrent_textcurrent_tool_callscurrent_thinking(每 turn 重置)
  • Stream transportstreaming_respstream_task_pid
  • Stream healthlast_chunk_atstream_erroredstall_count
  • Token & cost trackingtoken_usageturn_counttool_call_countcost_usd
  • Loop detectiontool_call_hashes(供 ToolRunner 内建循环检测)
  • Context relaysandboxsubagentstodosmemory_contentsuser_data(透传给 Context)
  • Resilienceretry_countmax_retries
  • Interactionpending_messages
  • Steeringsteering_queue(中段软中断 queue,详见 steering-design.md

与旧项目的差异

  • 新增 cost_usd — 内建成本追踪(吸收 CostTracker Plugin)
  • 新增 tool_call_hashes — 内建循环检测(吸收 LoopDetector Plugin)
  • 新增 sandboxsubagentstodosmemory_contents — 透传给 CMDC.Context
  • config 类型改为 CMDC.Config.t() | map()(渐进迁移)

Summary

Types

Steering queue 中的单条记录。

t()

Functions

向消息历史头部追加一条消息(反序存储:最新在前)。

向消息历史批量追加多条消息。

消费 marker,返回 {after_turn_payload | nil, state_with_marker_cleared}

一次性取出 :steering_queue 中的全部记录并清空。

:steering_queue 末尾追加一条 steering 记录。

增加 tool_call_count 计数。

增加 turn_count 计数。

返回当前 state 的 interrupt_immune_tools 白名单(缺省即 Options 默认值)。

在 prompt cycle 起始处(idle → running)记录快照。

返回循环检测 hash 历史的最大保留数。

返回当前 state 的 max_steering_queue 配置(无配置时回落到默认值 3)。

从 keyword list 构建 State struct。

记录一次工具调用 hash,用于 ToolRunner 循环检测。

重置每个 turn 的流式累积字段(在新 turn 开始时调用)。

映射 status 字段为 gen_statem 状态名称。

返回 :steering_queue 已满(达到 max_steering_queue 上限)。

从 Plugin.Pipeline 的执行结果中同步 plugin_states 回 State。

构建当前 State 对应的 Context 快照(只读,传给 Plugin/Tool)。

更新累计成本(USD)。

更新 token_usage,累加新的 usage 数据。

Types

status()

@type status() :: :idle | :running | :streaming | :executing_tools

steering_entry()

@type steering_entry() :: %{ref: reference(), text: String.t(), queued_at: integer()}

Steering queue 中的单条记录。

t()

@type t() :: %CMDC.Agent.State{
  blueprint_system_prompt: String.t(),
  cached_tokens: non_neg_integer() | nil,
  config: CMDC.Config.t() | map(),
  cost_usd: float(),
  current_text: String.t(),
  current_thinking: String.t() | nil,
  current_tool_calls: [map()],
  disabled_tools: [String.t()],
  dynamic_context_sections: %{required(atom()) => String.t()},
  exit_reason: term() | nil,
  last_chunk_at: integer() | nil,
  max_retries: pos_integer(),
  memory_contents: %{required(String.t()) => String.t()},
  message_started: boolean(),
  messages: [CMDC.Message.t()],
  model: String.t(),
  monitors: %{required(reference()) => pid()},
  overflow_detected: boolean(),
  pending_messages: [String.t()],
  pending_tool_tasks: %{required(reference()) => {Task.t(), map(), integer()}},
  plugin_states: %{required(module()) => term()},
  plugins: [{module(), term()}],
  prompt_mode: CMDC.Options.prompt_mode(),
  retry_base_delay_ms: pos_integer(),
  retry_count: non_neg_integer(),
  retry_max_delay_ms: pos_integer(),
  sandbox: module() | nil,
  session_id: String.t(),
  stall_count: non_neg_integer(),
  started_at: integer(),
  status: status(),
  steering_queue: [%{ref: reference(), text: String.t(), queued_at: integer()}],
  stream_errored: false | term(),
  stream_task_pid: pid() | nil,
  streaming_resp: term() | nil,
  subagents: [map()],
  tag_buffers: %{required(atom()) => String.t()},
  todos: [map()],
  token_usage: %{
    prompt_tokens: non_neg_integer(),
    completion_tokens: non_neg_integer(),
    total_tokens: non_neg_integer(),
    context_window: non_neg_integer(),
    current_context_tokens: non_neg_integer()
  },
  tool_call_arg_buffers: %{required(non_neg_integer()) => iodata()},
  tool_call_count: non_neg_integer(),
  tool_call_hashes: [binary()],
  tool_results: [{map(), term()}],
  tools: [module()],
  turn_count: non_neg_integer(),
  turn_start_marker:
    %{
      messages_offset: non_neg_integer(),
      token_usage_snapshot: map(),
      cost_usd_snapshot: float(),
      started_at_ms: integer()
    }
    | nil,
  user_data: map(),
  working_dir: String.t()
}

Functions

append_message(state, msg)

@spec append_message(t(), CMDC.Message.t()) :: t()

向消息历史头部追加一条消息(反序存储:最新在前)。

append_messages(state, msgs)

@spec append_messages(t(), [CMDC.Message.t()]) :: t()

向消息历史批量追加多条消息。

consume_turn_marker(state, outcome, abort_reason)

@spec consume_turn_marker(t(), :finished | :aborted, term() | nil) ::
  {map() | nil, t()}

消费 marker,返回 {after_turn_payload | nil, state_with_marker_cleared}

  • outcome :: :finished | :aborted

  • abort_reason 仅在 :aborted 时非 nil

payload 字段:

  • :outcome:finished | :aborted

  • :abort_reasonterm() | nil

  • :messages_diff — 本轮 prompt cycle 内新增的消息(按时间顺序)
  • :token_usage_diff%CMDC.TokenUsage{} 本轮增量
  • :duration_ms — 本轮总耗时
  • :started_at_ms / :ended_at_ms — 起止时间戳

marker 为 nil 时返回 {nil, state}(Plugin 不会触发 :after_turn)。

drain_steering(state)

@spec drain_steering(t()) :: {[steering_entry()], t()}

一次性取出 :steering_queue 中的全部记录并清空。

返回 {entries, state}entries 为按时间顺序排列的 list(最早在前)。 Steering 注入下一 turn 时由 ToolRunner.inject_steering 调用。

enqueue_steering(state, ref, text)

@spec enqueue_steering(t(), reference(), String.t()) :: t()

:steering_queue 末尾追加一条 steering 记录。

Queue 上限由 state.config[:max_steering_queue] 控制(缺省 3)。 溢出时丢弃最旧一条(FIFO 截断),保证总长不超 limit。

increment_tool_calls(state, delta \\ 1)

@spec increment_tool_calls(t(), non_neg_integer()) :: t()

增加 tool_call_count 计数。

increment_turn(state)

@spec increment_turn(t()) :: t()

增加 turn_count 计数。

interrupt_immune_tools(state)

@spec interrupt_immune_tools(t()) :: [String.t()]

返回当前 state 的 interrupt_immune_tools 白名单(缺省即 Options 默认值)。

mark_turn_start(state)

@spec mark_turn_start(t()) :: t()

在 prompt cycle 起始处(idle → running)记录快照。

如果当前已存在 marker(理论上不应发生,例如未消费的 cycle),保留旧 marker 不覆盖,避免漏算上一次 prompt cycle 的 diff。

max_hash_history()

@spec max_hash_history() :: pos_integer()

返回循环检测 hash 历史的最大保留数。

max_steering_queue(state)

@spec max_steering_queue(t()) :: pos_integer()

返回当前 state 的 max_steering_queue 配置(无配置时回落到默认值 3)。

new(opts)

@spec new(keyword()) :: t()

从 keyword list 构建 State struct。

供 Agent.init/1 使用,自动设置 started_at 时间戳。

示例

state = State.new(session_id: "abc", model: "anthropic:claude-sonnet-4-5", working_dir: ".")

record_tool_call_hash(state, hash)

@spec record_tool_call_hash(t(), binary()) :: t()

记录一次工具调用 hash,用于 ToolRunner 循环检测。

保留最近 20 条记录,超出时截断最旧的。

reset_stream_fields(state)

@spec reset_stream_fields(t()) :: t()

重置每个 turn 的流式累积字段(在新 turn 开始时调用)。

state_name(state)

@spec state_name(t()) :: status()

映射 status 字段为 gen_statem 状态名称。

steering_queue_full?(state)

@spec steering_queue_full?(t()) :: boolean()

返回 :steering_queue 已满(达到 max_steering_queue 上限)。

调用方在 enqueue 之前用此判断决定是否拒绝新 steer。

sync_plugin_states(state, pipeline_result)

@spec sync_plugin_states(t(), map()) :: t()

从 Plugin.Pipeline 的执行结果中同步 plugin_states 回 State。

to_context(state)

@spec to_context(t()) :: CMDC.Context.t()

构建当前 State 对应的 Context 快照(只读,传给 Plugin/Tool)。

将 State 内部字段映射到 Context 的公共接口:

  • turn_countturn
  • token_usage.total_tokenstotal_tokens
  • cost_usdcost_usd
  • sandboxsubagentstodosmemory_contentsuser_data 直接透传

update_cost(state, delta_usd)

@spec update_cost(t(), float()) :: t()

更新累计成本(USD)。

在每轮 stream_done 后由 Agent 调用,基于 token 使用量和模型定价计算。

update_token_usage(state, new_usage)

@spec update_token_usage(t(), map() | CMDC.TokenUsage.t()) :: t()

更新 token_usage,累加新的 usage 数据。

new_usage 接受 plain map 或 %CMDC.TokenUsage{} struct(自动取共有字段)。 Anthropic 的 cache_read_input_tokens 通过 :cached_tokens 字段累加到顶层 state.cached_tokens(独立于 token_usage map,保持向后兼容)。