审计事件标准 struct + 转换 helper(收敛版,v0.6 落地)。
目的:让集成方一行代码把 CMDC Agent 运行事件落到 audit_logs 表, 不必为众多 EventBus 事件 + telemetry 事件各写一段 case 翻译。
字段约定
借鉴 AWS CloudTrail / GitHub Audit Log 等行业稳定 schema:
| 字段 | 类型 | 含义 |
|---|---|---|
actor_type | atom | 谁做了这件事::agent / :user / :plugin / :tool / :system |
actor_id | binary | nil | actor 标识(session_id / account_id / plugin_name / tool_name) |
action | binary | 动作命名空间字符串("agent.session.start" / "tool.exec.stop" ...) |
target_type | binary | nil | 目标对象类型("agent_session" / "tool" / "approval" ...) |
target_id | binary | nil | 目标对象标识 |
occurred_at | DateTime.t() | 事件发生时刻(UTC) |
before | term | nil | 变更前状态(可选,变更类事件填充) |
after | term | nil | 变更后状态(可选) |
metadata | map | 附加上下文(duration_ms、token usage、message preview...) |
tags | [String.t()] | 自定义标签(默认 [],resolver 可填充用于分类聚合) |
schema_version | pos_integer | 当前 schema 版本,v0.6 起为 2(v1 加 :tags 字段) |
推荐使用方式
方式 1:订阅 :after_turn Plugin hook(高价值快路径)
defmodule MyApp.Plugins.AuditSink do
@behaviour CMDC.Plugin
@impl true
def init(opts), do: {:ok, opts}
@impl true
def priority, do: 900
@impl true
def handle_event({:after_turn, payload}, state, ctx) do
audit = CMDC.AuditEvent.from_turn(payload, ctx)
MyApp.AuditLogs.insert!(audit)
{:continue, state}
end
def handle_event(_, state, _), do: {:continue, state}
end一行写一条 audit_log,带本轮 outcome / messages_diff / token_usage_diff / duration。
方式 2:订阅 EventBus 事件(v0.6+ from_event/3)
CMDC.subscribe(session)
receive do
{:cmdc_event, sid, event} ->
case CMDC.AuditEvent.from_event(event, %{session_id: sid}) do
nil -> :ok
audit -> MyApp.AuditLogs.insert!(audit)
end
endv0.6 收敛版覆盖核心 EventBus 事件,并包含推理事件投影:
- 会话生命周期:
:agent_start/:agent_end/:agent_abort - 审批:
:approval_required/:approval_resolved - 子代理:
:subagent_start/:subagent_end - 工具:
:tool_execution_start/:tool_execution_end/:tool_blocked - 系统:
:compact_start/:compact_end/:model_switched - 推理:
:reasoning_thought/:reasoning_branch/:reasoning_score/:reasoning_prune/:reasoning_done - 错误:
:stream_error/:error
剩余流式 / 业务事件(:message_delta / :thinking_delta /
:status_update 等)按集成方反馈在 v0.6.x patch 滚动加;调用方可用
actor_resolver / target_resolver 自扩展。
方式 3:订阅细粒度 telemetry 事件(细粒度审计)
:telemetry.attach_many(
"audit-sink",
CMDC.Telemetry.all_events(),
fn name, meas, meta, _ ->
case CMDC.AuditEvent.from_telemetry(name, meas, meta) do
nil -> :ok # 不在当前覆盖集
event -> MyApp.AuditLogs.insert!(event)
end
end,
nil
)v0.6 收敛版覆盖 18 个 telemetry 事件:
- turn:
agent.turn.start/stop - llm:
llm.request.start/stop - tool:
tool.exec.start/stop - plugin:
plugin.pipeline.start/stop/plugin.crash - compactor:
compactor.run.start/stop - checkpoint:
checkpoint.save/checkpoint.load - subagent:
subagent.start/stop - hibernate:
agent.hibernate.configured - provider registry:
provider.registry.lookup/provider.registry.register
剩余 telemetry 事件按反馈滚动加。
resolver 注入扩展(v0.6+)
在 from_event/3 / from_telemetry/4 的 opts 传入:
:actor_resolver :: (ctx, {default_actor_type, default_actor_id}) -> {atom, String.t()} | nil返回 nil 走默认;返新 tuple 覆盖:target_resolver :: (event, ctx, {default_target_type, default_target_id}) -> {String.t(), String.t()} | nil:tags_resolver :: (event, ctx) -> [String.t()] | nil— 自定义 tags
resolver 抛异常 fail-safe(自动 rescue 后用默认值)。
audit = CMDC.AuditEvent.from_event(event, ctx,
actor_resolver: fn ctx, _default -> {:user, ctx.user_data[:account_id]} end,
tags_resolver: fn _event, ctx -> ["tenant:" <> ctx.user_data[:tenant]] end
)序列化
@derive Jason.Encoder 直接 Jason.encode!(audit) 落 JSON;反序列化用
Jason.decode!(json, keys: :atoms!)(请配套使用 :atoms! 避免 atom table 泄漏)。
v0.5 → v0.6 兼容性
schema_version从1升到2,加:tags字段(默认[])- 老 v1 消费者读 v2 兼容(忽略
:tags字段即可) from_turn/2API 完全不变,命名"agent.turn.finished/aborted"保留from_telemetry/3老 6 事件 API 完全不变;新增 12 事件 + 4 元 opts 形式
Summary
Functions
从 EventBus 事件 + Context 构造 AuditEvent。v0.6+ 新增。
从 telemetry 事件构造 AuditEvent。
从 Plugin :after_turn payload + Context 构造 AuditEvent。
返回当前 schema 版本号。
Types
@type actor_type() :: :agent | :user | :plugin | :tool | :system
@type resolver_opts() :: [ actor_resolver: (CMDC.Context.t() | map(), {atom(), String.t() | nil} -> {atom(), String.t() | nil} | nil), target_resolver: (term(), CMDC.Context.t() | map(), {String.t() | nil, String.t() | nil} -> {String.t() | nil, String.t() | nil} | nil), tags_resolver: (term(), CMDC.Context.t() | map() -> [String.t()] | nil) ]
@type t() :: %CMDC.AuditEvent{ action: String.t(), actor_id: String.t() | nil, actor_type: actor_type(), after: term() | nil, before: term() | nil, metadata: map(), occurred_at: DateTime.t(), schema_version: pos_integer(), tags: [String.t()], target_id: String.t() | nil, target_type: String.t() | nil }
Functions
@spec from_event(term(), CMDC.Context.t() | map(), resolver_opts()) :: t() | nil
从 EventBus 事件 + Context 构造 AuditEvent。v0.6+ 新增。
覆盖 15 个核心 EventBus 事件,完整列表见模块 moduledoc。
未覆盖事件返回 nil,调用方按需降级或用 :actor_resolver / :target_resolver
自扩展。
参数
event— CMDC.Event,如:agent_start/{:tool_execution_end, name, id, result}ctx—CMDC.Context.t()或与之 shape 一致的 mapopts— 可选 resolver 注入
示例
CMDC.subscribe(session)
receive do
{:cmdc_event, sid, event} ->
case CMDC.AuditEvent.from_event(event, %{session_id: sid}) do
nil -> :ok
audit -> MyApp.AuditLogs.insert!(audit)
end
end
@spec from_telemetry([atom()], map(), map(), resolver_opts()) :: t() | nil
从 telemetry 事件构造 AuditEvent。
v0.6 收敛版覆盖 18 个 telemetry 事件,完整列表见模块 moduledoc。
未覆盖事件返 nil。
参数
event_name— telemetry 事件名 list,如[:cmdc, :tool, :exec, :stop]measurements— telemetry measurements map(duration_ms/tokens_in等)metadata— telemetry metadata map(session_id/tool/model等)opts— 可选 resolver 注入
返回
t()— 已识别事件,返回构造好的 AuditEventnil— 未覆盖事件,调用方按需降级
@spec from_turn(map(), CMDC.Context.t() | map(), resolver_opts()) :: t()
从 Plugin :after_turn payload + Context 构造 AuditEvent。
这是高价值快路径:一轮对话(从 prompt 到 finish / abort)的完整审计, payload 含 outcome / messages_diff / token_usage_diff / duration_ms, 足以重建本轮关键上下文。
参数
payload— Plugin hook 收到的{:after_turn, payload}中的 payload map, 字段见CMDC.Pluginmoduledocafter_turn_payload类型ctx—CMDC.Context.t()或与之 shape 一致的 map(测试场景常用)opts— 可选 resolver 注入,见模块 moduledoc「resolver 注入扩展」段
示例
def handle_event({:after_turn, payload}, state, ctx) do
audit = CMDC.AuditEvent.from_turn(payload, ctx)
MyApp.AuditLogs.insert!(audit)
{:continue, state}
endmetadata 字段
:duration_ms— 本轮耗时:turn— 本轮序号(从 ctx):abort_reason— 若 outcome=:aborted 时的中止原因:message_count_diff— 本轮新增消息数:first_message_preview/:last_message_preview— 首/末消息文本前 200 字符:token_usage—CMDC.TokenUsage.t()(若 payload 含)
命名稳定性
v0.5.4 from_turn/2 返回 action 为 "agent.turn.finished" / "agent.turn.aborted",
v0.6 保持不变(确保 v0.5 → v0.6 升级零代码改动)。注意与 telemetry
[:cmdc, :agent, :turn, :stop] 通过 from_telemetry/3 得到的
"agent.turn.stop" 命名不同 — 两者反映不同来源,集成方可按需统一处理。
@spec schema_version() :: pos_integer()
返回当前 schema 版本号。
v0.5 为 1,v0.6 起为 2(加入 :tags 字段)。
字段集变化时会升版。