本指南面向企业 Phoenix AgentOps 平台接入 cmdc_orchestrator 0.6。目标是把
Workflow Designer、Run Console、Approval Center、Trace Viewer、Skill Registry
接到稳定 runtime contract 上,而不是把 Phoenix / Ecto / Oban / RBAC 实现下沉到
通用包。
边界
cmdc_orchestrator 负责:
- JSON-safe
WorkflowSpec、节点/边校验、dry run。 - Run / NodeRun / RunEvent 账本结构与
RunStorebehaviour。 - 可恢复
RunExecutor、pause/resume/retry/rerun/cancel。 :fork/:join、retry/timeout/fallback/on_error policy。:human_task等待、决策聚合、timeout tick 与 resume signal。
企业 Phoenix 平台负责:
- Workflow / Version / Run / Event / HumanTask 的 Ecto schema、索引与租户字段。
- Workflow Designer UI、审批中心、通知、RBAC/ABAC、业务审计表。
- Oban 定时触发、timeout tick、重试任务、通知任务。
- Gateway/SSE/WebSocket endpoint 与 Trace Viewer 展示。
- Skill / Tool / Agent / RAG 业务注册表。
推荐 Ecto 表
这些表是建议形状,不是库内 schema。字段名可按平台规范调整,但语义建议保持一致。
| 表 | 关键字段 | 说明 |
|---|---|---|
agent_workflows | id, tenant_id, slug, name, status, current_version_id | Workflow 业务入口。 |
agent_workflow_versions | id, workflow_id, version, spec, ui_metadata, published_by, published_at | 保存 JSON DSL 与画布布局。 |
agent_workflow_runs | id, tenant_id, workflow_id, workflow_version_id, status, trigger_source, trigger_ref, current_node_id, context_data, completed, pruned, signal_history, resume_cursor, retry_counters, branch_states, human_tasks, lock_version, claim_owner, claim_expires_at, started_at, finished_at | 映射 %CMDCOrchestrator.Run{}。 |
agent_workflow_node_runs | id, run_id, node_id, node_type, status, signal, attempts, input_snapshot, output_data, actor_ref, external_ref, started_at, finished_at | 映射 %CMDCOrchestrator.NodeRun{}。 |
agent_workflow_run_events | id, run_id, seq, type, node_id, trace_id, span_id, parent_id, payload, timestamp | 映射 %CMDCOrchestrator.RunEvent{};(run_id, seq) 唯一。 |
agent_workflow_idempotency_keys | scope, key, value, inserted_at | HTTP 重试、重复点击、审批幂等。 |
推荐索引:
agent_workflows(tenant_id, slug)unique。agent_workflow_versions(workflow_id, version)unique。agent_workflow_runs(tenant_id, workflow_id, status, started_at desc)。agent_workflow_runs(claim_owner, claim_expires_at)。agent_workflow_node_runs(run_id, node_id)unique。agent_workflow_run_events(run_id, seq)unique。agent_workflow_run_events(run_id, type, node_id, seq)。agent_workflow_idempotency_keys(scope, key)unique。
Ecto RunStore 注意事项
Ecto backend 应实现 CMDCOrchestrator.RunStore behaviour。关键点:
compare_and_update_run/4必须用lock_version或数据库条件更新做 CAS。claim_run/4应在事务里锁定 run 行,等价于SELECT ... FOR UPDATE。claim_expires_at过期后允许其它 executor 接管。put_idempotency/4使用 unique index;冲突时返回已有 value。append_event/3在同一事务内递增 run-localseq。list_events/2支持:limit,:after_id或:after_seq,给 Trace Viewer 做分页。- payload 保存前继续依赖
RunEvent.sanitize/1,不要把 prompt/chunk/raw result 入库。 - Phoenix 业务审计不要写进
RunEvent原始 payload;用平台审计表关联run_id/event_id。
Workflow Designer JSON DSL
Designer 保存 WorkflowSpec 原文,画布信息放在 metadata.ui 或平台自己的
ui_metadata 字段。节点 ID 必须稳定,节点名称和 label 只作展示。
{
"workflow_id": "contract_review",
"version": "2026.06.001",
"mode": "async",
"nodes": [
{
"id": "risk_check",
"type": "condition",
"label": "风险判断",
"config": {"left": "{{amount}}", "operator": "gte", "right": 100000}
},
{
"id": "legal_review",
"type": "human_task",
"label": "法务审批",
"config": {
"title": "合同法务审批",
"assignee_refs": ["role:legal"],
"approval_mode": "any_of",
"timeout_ms": 86400000,
"on_timeout": "fail"
}
}
],
"edges": [
{"from": "risk_check", "to": "legal_review", "signal": "true"}
],
"metadata": {
"ui": {
"nodes": {
"risk_check": {"x": 160, "y": 80},
"legal_review": {"x": 420, "y": 80}
}
}
}
}保存前调用:
{:ok, spec, warnings} = CMDCOrchestrator.validate_workflow(params["spec"])
{:ok, preview} = CMDCOrchestrator.dry_run(spec, context_data: params["context_data"])dry_run/2 不触发 Tool / Agent / human_task 副作用,适合画布校验和路径解释。
RunService API 映射
| Phoenix API | Orchestrator 调用 | 说明 |
|---|---|---|
POST /workflows/:id/validate | validate_workflow/1 | 画布校验。 |
POST /workflows/:id/dry-run | dry_run/2 | 路径预演。 |
POST /workflows/:id/runs | start_run/2 | 传 input, trigger_source, idempotency_key。 |
GET /runs/:id | status/2 | 返回 run/node_runs/events 快照。 |
GET /runs/:id/events | events/2 | 支持 limit, after_id, type, node_id。 |
POST /runs/:id/pause | pause_run/3 | 操作员暂停。 |
POST /runs/:id/resume | resume_run/2 | 从 waiting/paused/failed 恢复。 |
POST /runs/:id/retry | retry_run/2 | 从失败恢复点继续。 |
POST /runs/:id/nodes/:node_id/retry | retry_node/3 | 清掉该节点及下游结果后重跑。 |
POST /runs/:id/rerun | rerun/2 | 基于旧 run 的 DAG snapshot 创建新 run。 |
POST /runs/:id/cancel | cancel/3 | 取消未完成 run。 |
Run 列表、统计、成功率、P95、租户成本建议由 Phoenix 按 Ecto 表聚合,不要放进通用包。
Approval Center 映射
human_task task_id 与 correlation_id 是审批中心的关联键。平台可以把
run.human_tasks 同步到自己的审批表,但最终决策应回写 runtime:
CMDCOrchestrator.submit_human_task_decision(run_id, task_id, %{
action: :approve,
actor_ref: "account:#{current_account.id}",
payload: %{"approval_id" => approval.id},
comment: params["comment"]
}, idempotency_key: params["request_id"])定时超时由平台调度:
CMDCOrchestrator.expire_waiting_tasks(run_id, now: DateTime.utc_now())不要在 cmdc_orchestrator 内查 Accounts/Roles/RBAC。AssigneeResolver 只约定
assignee_refs 解析边界,具体人员、通知和审计由 Phoenix 平台处理。
Trace Viewer / Gateway 事件映射
推荐 Gateway SSE/WebSocket 输出字段:
{
"event": "workflow.node.completed",
"run_id": "run_x",
"node_id": "risk_check",
"trace_id": "trace_x",
"span_id": "run_x:risk_check:orchestrator.node.completed",
"timestamp": "2026-06-01T12:00:00Z",
"payload": {"signal": "true"}
}| RunEvent.type | Gateway event | UI 用途 |
|---|---|---|
run.started | workflow.run.started | Timeline 起点。 |
run.resumed | workflow.run.resumed | 恢复标记。 |
run.paused | workflow.run.paused | 操作员暂停。 |
run.waiting | workflow.run.waiting | 等待人工或外部事件。 |
run.completed | workflow.run.completed | 成功终态。 |
run.failed | workflow.run.failed | 失败终态。 |
run.cancelled | workflow.run.cancelled | 取消终态。 |
orchestrator.node.started | workflow.node.started | 节点执行中。 |
orchestrator.node.completed | workflow.node.completed | 节点成功和 signal。 |
orchestrator.node.failed | workflow.node.failed | 节点失败原因。 |
orchestrator.node.skipped | workflow.node.skipped | 分支剪枝。 |
human_task.created | workflow.human_task.created | Approval Center 创建入口。 |
human_task.progress | workflow.human_task.progress | 会签/进度更新。 |
human_task.completed | workflow.human_task.completed | 审批终局。 |
human_task.timeout | workflow.human_task.timeout | 超时处理。 |
Gateway 只做事件翻译和敏感字段裁剪,不持有 workflow 业务状态。
Skill Registry 接入
:skill 节点建议作为 adapter 层实现,不要让 orchestrator 直接依赖
cmdc_skill_engine。节点配置只保存 stable ref:
{
"id": "write_report",
"type": "skill",
"config": {
"skill_ref": "contract.report_writer",
"version": "1.2.0",
"input": {"risk": "{{risk_review}}"}
}
}企业平台在执行上下文里注册自定义 node 或 Tool adapter,并在 Skill Registry 做版本、 质量分和发布门禁。
Oban / Temporal 边界
| 场景 | 建议 |
|---|---|
| 定时触发 workflow | Oban cron 扫描业务 workflow,调用 start_run/2。 |
| human_task timeout tick | Oban worker 调 expire_waiting_tasks/2。 |
| 发送审批通知/催办 | Oban worker,不在 runtime 内做。 |
| 数小时到数天的普通审批 | cmdc_orchestrator + Ecto RunStore + Oban tick 足够。 |
| 跨系统、跨周、强补偿长事务 | Temporal 可作为外层,内部 Agentic 编排仍交给 cmdc_orchestrator。 |
Hive 迁移对照
Hive 的 Workflow/Step/Run/StepRun、validate/dry_run、approval waiting/resume、 fork/join、retry/cancel 对产品语义有参考价值。迁移时只搬语义:
| Hive | AgentOps + cmdc_orchestrator |
|---|---|
Workflow | Phoenix agent_workflows + agent_workflow_versions.spec。 |
WorkflowStep | WorkflowSpec.nodes / edges。 |
WorkflowRun | %CMDCOrchestrator.Run{} + Phoenix run table。 |
WorkflowStepRun | %CMDCOrchestrator.NodeRun{} + node run table。 |
Engine.broadcast | RunEvent -> Gateway SSE/WebSocket。 |
Approval step | :human_task + Approval Center。 |
WorkflowScheduler | Oban worker 调 start_run/2。 |
WorkflowReminder | Oban worker 调 expire_waiting_tasks/2 + 平台通知。 |
不要复制 Hive controller、schema、channel 或 React Flow 代码到通用包。
发布接入检查表
- WorkflowSpec JSON 不含匿名函数、pid、reference、tuple。
- 所有 node id 稳定,改名只改 label/name。
- Ecto RunStore 覆盖 CAS、claim lease、idempotency、event seq。
- Trace Viewer 使用
events/2分页,不一次性加载完整长 run。 - Approval Center 所有按钮传
idempotency_key。 - Oban timeout tick 可重复执行且幂等。
- Gateway SSE 默认不输出 prompt、chunks、raw results。
cmdc_testfake runtime 覆盖 CI;cmdc_evalWorkflowEval gate 覆盖发布门禁。