CMDC.AuditEvent (cmdc v0.6.1)

Copy Markdown View Source

审计事件标准 struct + 转换 helper(收敛版,v0.6 落地)。

目的:让集成方一行代码把 CMDC Agent 运行事件落到 audit_logs 表, 不必为众多 EventBus 事件 + telemetry 事件各写一段 case 翻译。

字段约定

借鉴 AWS CloudTrail / GitHub Audit Log 等行业稳定 schema:

字段类型含义
actor_typeatom谁做了这件事::agent / :user / :plugin / :tool / :system
actor_idbinary | nilactor 标识(session_id / account_id / plugin_name / tool_name)
actionbinary动作命名空间字符串("agent.session.start" / "tool.exec.stop" ...)
target_typebinary | nil目标对象类型("agent_session" / "tool" / "approval" ...)
target_idbinary | nil目标对象标识
occurred_atDateTime.t()事件发生时刻(UTC)
beforeterm | nil变更前状态(可选,变更类事件填充)
afterterm | nil变更后状态(可选)
metadatamap附加上下文(duration_ms、token usage、message preview...)
tags[String.t()]自定义标签(默认 [],resolver 可填充用于分类聚合)
schema_versionpos_integer当前 schema 版本,v0.6 起为 2(v1 加 :tags 字段)

推荐使用方式

方式 1:订阅 :after_turn Plugin hook(高价值快路径)

defmodule MyApp.Plugins.AuditSink do
  @behaviour CMDC.Plugin

  @impl true
  def init(opts), do: {:ok, opts}
  @impl true
  def priority, do: 900

  @impl true
  def handle_event({:after_turn, payload}, state, ctx) do
    audit = CMDC.AuditEvent.from_turn(payload, ctx)
    MyApp.AuditLogs.insert!(audit)
    {:continue, state}
  end
  def handle_event(_, state, _), do: {:continue, state}
end

一行写一条 audit_log,带本轮 outcome / messages_diff / token_usage_diff / duration。

方式 2:订阅 EventBus 事件(v0.6+ from_event/3)

CMDC.subscribe(session)

receive do
  {:cmdc_event, sid, event} ->
    case CMDC.AuditEvent.from_event(event, %{session_id: sid}) do
      nil -> :ok
      audit -> MyApp.AuditLogs.insert!(audit)
    end
end

v0.6 收敛版覆盖核心 EventBus 事件,并包含推理事件投影:

  • 会话生命周期::agent_start / :agent_end / :agent_abort
  • 审批::approval_required / :approval_resolved
  • 子代理::subagent_start / :subagent_end
  • 工具::tool_execution_start / :tool_execution_end / :tool_blocked
  • 系统::compact_start / :compact_end / :model_switched
  • 推理::reasoning_thought / :reasoning_branch / :reasoning_score / :reasoning_prune / :reasoning_done
  • 错误::stream_error / :error

剩余流式 / 业务事件(:message_delta / :thinking_delta / :status_update 等)按集成方反馈在 v0.6.x patch 滚动加;调用方可用 actor_resolver / target_resolver 自扩展。

方式 3:订阅细粒度 telemetry 事件(细粒度审计)

:telemetry.attach_many(
  "audit-sink",
  CMDC.Telemetry.all_events(),
  fn name, meas, meta, _ ->
    case CMDC.AuditEvent.from_telemetry(name, meas, meta) do
      nil -> :ok          # 不在当前覆盖集
      event -> MyApp.AuditLogs.insert!(event)
    end
  end,
  nil
)

v0.6 收敛版覆盖 18 个 telemetry 事件:

  • turn:agent.turn.start/stop
  • llm:llm.request.start/stop
  • tool:tool.exec.start/stop
  • plugin:plugin.pipeline.start/stop / plugin.crash
  • compactor:compactor.run.start/stop
  • checkpoint:checkpoint.save / checkpoint.load
  • subagent:subagent.start/stop
  • hibernate:agent.hibernate.configured
  • provider registry:provider.registry.lookup / provider.registry.register

剩余 telemetry 事件按反馈滚动加。

resolver 注入扩展(v0.6+)

from_event/3 / from_telemetry/4 的 opts 传入:

  • :actor_resolver :: (ctx, {default_actor_type, default_actor_id}) -> {atom, String.t()} | nil 返回 nil 走默认;返新 tuple 覆盖

  • :target_resolver :: (event, ctx, {default_target_type, default_target_id}) -> {String.t(), String.t()} | nil

  • :tags_resolver :: (event, ctx) -> [String.t()] | nil — 自定义 tags

resolver 抛异常 fail-safe(自动 rescue 后用默认值)。

audit = CMDC.AuditEvent.from_event(event, ctx,
  actor_resolver: fn ctx, _default -> {:user, ctx.user_data[:account_id]} end,
  tags_resolver: fn _event, ctx -> ["tenant:" <> ctx.user_data[:tenant]] end
)

序列化

@derive Jason.Encoder 直接 Jason.encode!(audit) 落 JSON;反序列化用 Jason.decode!(json, keys: :atoms!)(请配套使用 :atoms! 避免 atom table 泄漏)。

v0.5 → v0.6 兼容性

  • schema_version1 升到 2,加 :tags 字段(默认 [])
  • 老 v1 消费者读 v2 兼容(忽略 :tags 字段即可)
  • from_turn/2 API 完全不变,命名 "agent.turn.finished/aborted" 保留
  • from_telemetry/3 老 6 事件 API 完全不变;新增 12 事件 + 4 元 opts 形式

Summary

Functions

从 EventBus 事件 + Context 构造 AuditEvent。v0.6+ 新增。

从 telemetry 事件构造 AuditEvent。

从 Plugin :after_turn payload + Context 构造 AuditEvent。

返回当前 schema 版本号。

Types

actor_type()

@type actor_type() :: :agent | :user | :plugin | :tool | :system

resolver_opts()

@type resolver_opts() :: [
  actor_resolver: (CMDC.Context.t() | map(), {atom(), String.t() | nil} ->
                     {atom(), String.t() | nil} | nil),
  target_resolver: (term(),
                    CMDC.Context.t()
                    | map(),
                    {String.t() | nil, String.t() | nil} ->
                      {String.t() | nil, String.t() | nil} | nil),
  tags_resolver: (term(), CMDC.Context.t() | map() -> [String.t()] | nil)
]

t()

@type t() :: %CMDC.AuditEvent{
  action: String.t(),
  actor_id: String.t() | nil,
  actor_type: actor_type(),
  after: term() | nil,
  before: term() | nil,
  metadata: map(),
  occurred_at: DateTime.t(),
  schema_version: pos_integer(),
  tags: [String.t()],
  target_id: String.t() | nil,
  target_type: String.t() | nil
}

Functions

from_event(event, ctx, opts \\ [])

@spec from_event(term(), CMDC.Context.t() | map(), resolver_opts()) :: t() | nil

从 EventBus 事件 + Context 构造 AuditEvent。v0.6+ 新增。

覆盖 15 个核心 EventBus 事件,完整列表见模块 moduledoc。 未覆盖事件返回 nil,调用方按需降级或用 :actor_resolver / :target_resolver 自扩展。

参数

  • event — CMDC.Event,如 :agent_start / {:tool_execution_end, name, id, result}
  • ctxCMDC.Context.t() 或与之 shape 一致的 map
  • opts — 可选 resolver 注入

示例

CMDC.subscribe(session)

receive do
  {:cmdc_event, sid, event} ->
    case CMDC.AuditEvent.from_event(event, %{session_id: sid}) do
      nil -> :ok
      audit -> MyApp.AuditLogs.insert!(audit)
    end
end

from_telemetry(event_name, measurements, metadata, opts \\ [])

@spec from_telemetry([atom()], map(), map(), resolver_opts()) :: t() | nil

从 telemetry 事件构造 AuditEvent。

v0.6 收敛版覆盖 18 个 telemetry 事件,完整列表见模块 moduledoc。 未覆盖事件返 nil

参数

  • event_name — telemetry 事件名 list,如 [:cmdc, :tool, :exec, :stop]
  • measurements — telemetry measurements map(duration_ms / tokens_in 等)
  • metadata — telemetry metadata map(session_id / tool / model 等)
  • opts — 可选 resolver 注入

返回

  • t() — 已识别事件,返回构造好的 AuditEvent
  • nil — 未覆盖事件,调用方按需降级

from_turn(payload, ctx, opts \\ [])

@spec from_turn(map(), CMDC.Context.t() | map(), resolver_opts()) :: t()

从 Plugin :after_turn payload + Context 构造 AuditEvent。

这是高价值快路径:一轮对话(从 prompt 到 finish / abort)的完整审计, payload 含 outcome / messages_diff / token_usage_diff / duration_ms, 足以重建本轮关键上下文。

参数

  • payload — Plugin hook 收到的 {:after_turn, payload} 中的 payload map, 字段见 CMDC.Plugin moduledoc after_turn_payload 类型
  • ctxCMDC.Context.t() 或与之 shape 一致的 map(测试场景常用)
  • opts — 可选 resolver 注入,见模块 moduledoc「resolver 注入扩展」段

示例

def handle_event({:after_turn, payload}, state, ctx) do
  audit = CMDC.AuditEvent.from_turn(payload, ctx)
  MyApp.AuditLogs.insert!(audit)
  {:continue, state}
end

metadata 字段

  • :duration_ms — 本轮耗时
  • :turn — 本轮序号(从 ctx)
  • :abort_reason — 若 outcome=:aborted 时的中止原因
  • :message_count_diff — 本轮新增消息数
  • :first_message_preview / :last_message_preview — 首/末消息文本前 200 字符
  • :token_usageCMDC.TokenUsage.t()(若 payload 含)

命名稳定性

v0.5.4 from_turn/2 返回 action 为 "agent.turn.finished" / "agent.turn.aborted", v0.6 保持不变(确保 v0.5 → v0.6 升级零代码改动)。注意与 telemetry [:cmdc, :agent, :turn, :stop] 通过 from_telemetry/3 得到的 "agent.turn.stop" 命名不同 — 两者反映不同来源,集成方可按需统一处理。

schema_version()

@spec schema_version() :: pos_integer()

返回当前 schema 版本号。

v0.5 为 1,v0.6 起为 2(加入 :tags 字段)。 字段集变化时会升版。