AgentOps 集成契约

Copy Markdown View Source

本指南面向企业 Phoenix AgentOps 平台接入 cmdc_orchestrator 0.6。目标是把 Workflow Designer、Run Console、Approval Center、Trace Viewer、Skill Registry 接到稳定 runtime contract 上,而不是把 Phoenix / Ecto / Oban / RBAC 实现下沉到 通用包。

边界

cmdc_orchestrator 负责:

  • JSON-safe WorkflowSpec、节点/边校验、dry run。
  • Run / NodeRun / RunEvent 账本结构与 RunStore behaviour。
  • 可恢复 RunExecutor、pause/resume/retry/rerun/cancel。
  • :fork / :join、retry/timeout/fallback/on_error policy。
  • :human_task 等待、决策聚合、timeout tick 与 resume signal。

企业 Phoenix 平台负责:

  • Workflow / Version / Run / Event / HumanTask 的 Ecto schema、索引与租户字段。
  • Workflow Designer UI、审批中心、通知、RBAC/ABAC、业务审计表。
  • Oban 定时触发、timeout tick、重试任务、通知任务。
  • Gateway/SSE/WebSocket endpoint 与 Trace Viewer 展示。
  • Skill / Tool / Agent / RAG 业务注册表。

推荐 Ecto 表

这些表是建议形状,不是库内 schema。字段名可按平台规范调整,但语义建议保持一致。

关键字段说明
agent_workflowsid, tenant_id, slug, name, status, current_version_idWorkflow 业务入口。
agent_workflow_versionsid, workflow_id, version, spec, ui_metadata, published_by, published_at保存 JSON DSL 与画布布局。
agent_workflow_runsid, tenant_id, workflow_id, workflow_version_id, status, trigger_source, trigger_ref, current_node_id, context_data, completed, pruned, signal_history, resume_cursor, retry_counters, branch_states, human_tasks, lock_version, claim_owner, claim_expires_at, started_at, finished_at映射 %CMDCOrchestrator.Run{}
agent_workflow_node_runsid, run_id, node_id, node_type, status, signal, attempts, input_snapshot, output_data, actor_ref, external_ref, started_at, finished_at映射 %CMDCOrchestrator.NodeRun{}
agent_workflow_run_eventsid, run_id, seq, type, node_id, trace_id, span_id, parent_id, payload, timestamp映射 %CMDCOrchestrator.RunEvent{};(run_id, seq) 唯一。
agent_workflow_idempotency_keysscope, key, value, inserted_atHTTP 重试、重复点击、审批幂等。

推荐索引:

  • agent_workflows(tenant_id, slug) unique。
  • agent_workflow_versions(workflow_id, version) unique。
  • agent_workflow_runs(tenant_id, workflow_id, status, started_at desc)
  • agent_workflow_runs(claim_owner, claim_expires_at)
  • agent_workflow_node_runs(run_id, node_id) unique。
  • agent_workflow_run_events(run_id, seq) unique。
  • agent_workflow_run_events(run_id, type, node_id, seq)
  • agent_workflow_idempotency_keys(scope, key) unique。

Ecto RunStore 注意事项

Ecto backend 应实现 CMDCOrchestrator.RunStore behaviour。关键点:

  • compare_and_update_run/4 必须用 lock_version 或数据库条件更新做 CAS。
  • claim_run/4 应在事务里锁定 run 行,等价于 SELECT ... FOR UPDATE
  • claim_expires_at 过期后允许其它 executor 接管。
  • put_idempotency/4 使用 unique index;冲突时返回已有 value。
  • append_event/3 在同一事务内递增 run-local seq
  • list_events/2 支持 :limit,:after_id:after_seq,给 Trace Viewer 做分页。
  • payload 保存前继续依赖 RunEvent.sanitize/1,不要把 prompt/chunk/raw result 入库。
  • Phoenix 业务审计不要写进 RunEvent 原始 payload;用平台审计表关联 run_id / event_id

Workflow Designer JSON DSL

Designer 保存 WorkflowSpec 原文,画布信息放在 metadata.ui 或平台自己的 ui_metadata 字段。节点 ID 必须稳定,节点名称和 label 只作展示。

{
  "workflow_id": "contract_review",
  "version": "2026.06.001",
  "mode": "async",
  "nodes": [
    {
      "id": "risk_check",
      "type": "condition",
      "label": "风险判断",
      "config": {"left": "{{amount}}", "operator": "gte", "right": 100000}
    },
    {
      "id": "legal_review",
      "type": "human_task",
      "label": "法务审批",
      "config": {
        "title": "合同法务审批",
        "assignee_refs": ["role:legal"],
        "approval_mode": "any_of",
        "timeout_ms": 86400000,
        "on_timeout": "fail"
      }
    }
  ],
  "edges": [
    {"from": "risk_check", "to": "legal_review", "signal": "true"}
  ],
  "metadata": {
    "ui": {
      "nodes": {
        "risk_check": {"x": 160, "y": 80},
        "legal_review": {"x": 420, "y": 80}
      }
    }
  }
}

保存前调用:

{:ok, spec, warnings} = CMDCOrchestrator.validate_workflow(params["spec"])
{:ok, preview} = CMDCOrchestrator.dry_run(spec, context_data: params["context_data"])

dry_run/2 不触发 Tool / Agent / human_task 副作用,适合画布校验和路径解释。

RunService API 映射

Phoenix APIOrchestrator 调用说明
POST /workflows/:id/validatevalidate_workflow/1画布校验。
POST /workflows/:id/dry-rundry_run/2路径预演。
POST /workflows/:id/runsstart_run/2input, trigger_source, idempotency_key
GET /runs/:idstatus/2返回 run/node_runs/events 快照。
GET /runs/:id/eventsevents/2支持 limit, after_id, type, node_id
POST /runs/:id/pausepause_run/3操作员暂停。
POST /runs/:id/resumeresume_run/2从 waiting/paused/failed 恢复。
POST /runs/:id/retryretry_run/2从失败恢复点继续。
POST /runs/:id/nodes/:node_id/retryretry_node/3清掉该节点及下游结果后重跑。
POST /runs/:id/rerunrerun/2基于旧 run 的 DAG snapshot 创建新 run。
POST /runs/:id/cancelcancel/3取消未完成 run。

Run 列表、统计、成功率、P95、租户成本建议由 Phoenix 按 Ecto 表聚合,不要放进通用包。

Approval Center 映射

human_task task_id 与 correlation_id 是审批中心的关联键。平台可以把 run.human_tasks 同步到自己的审批表,但最终决策应回写 runtime:

CMDCOrchestrator.submit_human_task_decision(run_id, task_id, %{
  action: :approve,
  actor_ref: "account:#{current_account.id}",
  payload: %{"approval_id" => approval.id},
  comment: params["comment"]
}, idempotency_key: params["request_id"])

定时超时由平台调度:

CMDCOrchestrator.expire_waiting_tasks(run_id, now: DateTime.utc_now())

不要在 cmdc_orchestrator 内查 Accounts/Roles/RBAC。AssigneeResolver 只约定 assignee_refs 解析边界,具体人员、通知和审计由 Phoenix 平台处理。

Trace Viewer / Gateway 事件映射

推荐 Gateway SSE/WebSocket 输出字段:

{
  "event": "workflow.node.completed",
  "run_id": "run_x",
  "node_id": "risk_check",
  "trace_id": "trace_x",
  "span_id": "run_x:risk_check:orchestrator.node.completed",
  "timestamp": "2026-06-01T12:00:00Z",
  "payload": {"signal": "true"}
}
RunEvent.typeGateway eventUI 用途
run.startedworkflow.run.startedTimeline 起点。
run.resumedworkflow.run.resumed恢复标记。
run.pausedworkflow.run.paused操作员暂停。
run.waitingworkflow.run.waiting等待人工或外部事件。
run.completedworkflow.run.completed成功终态。
run.failedworkflow.run.failed失败终态。
run.cancelledworkflow.run.cancelled取消终态。
orchestrator.node.startedworkflow.node.started节点执行中。
orchestrator.node.completedworkflow.node.completed节点成功和 signal。
orchestrator.node.failedworkflow.node.failed节点失败原因。
orchestrator.node.skippedworkflow.node.skipped分支剪枝。
human_task.createdworkflow.human_task.createdApproval Center 创建入口。
human_task.progressworkflow.human_task.progress会签/进度更新。
human_task.completedworkflow.human_task.completed审批终局。
human_task.timeoutworkflow.human_task.timeout超时处理。

Gateway 只做事件翻译和敏感字段裁剪,不持有 workflow 业务状态。

Skill Registry 接入

:skill 节点建议作为 adapter 层实现,不要让 orchestrator 直接依赖 cmdc_skill_engine。节点配置只保存 stable ref:

{
  "id": "write_report",
  "type": "skill",
  "config": {
    "skill_ref": "contract.report_writer",
    "version": "1.2.0",
    "input": {"risk": "{{risk_review}}"}
  }
}

企业平台在执行上下文里注册自定义 node 或 Tool adapter,并在 Skill Registry 做版本、 质量分和发布门禁。

Oban / Temporal 边界

场景建议
定时触发 workflowOban cron 扫描业务 workflow,调用 start_run/2
human_task timeout tickOban worker 调 expire_waiting_tasks/2
发送审批通知/催办Oban worker,不在 runtime 内做。
数小时到数天的普通审批cmdc_orchestrator + Ecto RunStore + Oban tick 足够。
跨系统、跨周、强补偿长事务Temporal 可作为外层,内部 Agentic 编排仍交给 cmdc_orchestrator

Hive 迁移对照

Hive 的 Workflow/Step/Run/StepRun、validate/dry_run、approval waiting/resume、 fork/join、retry/cancel 对产品语义有参考价值。迁移时只搬语义:

HiveAgentOps + cmdc_orchestrator
WorkflowPhoenix agent_workflows + agent_workflow_versions.spec
WorkflowStepWorkflowSpec.nodes / edges
WorkflowRun%CMDCOrchestrator.Run{} + Phoenix run table。
WorkflowStepRun%CMDCOrchestrator.NodeRun{} + node run table。
Engine.broadcastRunEvent -> Gateway SSE/WebSocket。
Approval step:human_task + Approval Center。
WorkflowSchedulerOban worker 调 start_run/2
WorkflowReminderOban worker 调 expire_waiting_tasks/2 + 平台通知。

不要复制 Hive controller、schema、channel 或 React Flow 代码到通用包。

发布接入检查表

  • WorkflowSpec JSON 不含匿名函数、pid、reference、tuple。
  • 所有 node id 稳定,改名只改 label/name。
  • Ecto RunStore 覆盖 CAS、claim lease、idempotency、event seq。
  • Trace Viewer 使用 events/2 分页,不一次性加载完整长 run。
  • Approval Center 所有按钮传 idempotency_key
  • Oban timeout tick 可重复执行且幂等。
  • Gateway SSE 默认不输出 prompt、chunks、raw results。
  • cmdc_test fake runtime 覆盖 CI;cmdc_eval WorkflowEval gate 覆盖发布门禁。