CMDCGateway.A2A (cmdc_gateway v0.4.2)

Copy Markdown View Source

Agent2Agent (A2A) 协议 — Task RPC 实现。

本模块实现 A2A 协议中两个核心 Task 端点:

  • tasks/send — 同步 Task:单次 Prompt → 等待 Agent 完成 → 返回完整 Task 结果
  • tasks/sendSubscribe — 流式 Task:单次 Prompt → 推送 TaskStatusUpdateEventTaskArtifactUpdateEvent 直至完成

/v1/sessions/:id/prompt 区别:A2A Task 协议是外部 Agent 调用我方 Agent 的 单次任务接口,每次 send 创建独立 Session(task-scoped),不复用对话历史。 适合"Agent 调用 Agent"的 RPC 场景。

JSON-RPC 2.0 形态

请求体:

{
  "jsonrpc": "2.0",
  "id": "req-123",
  "method": "tasks/send",
  "params": {
    "id": "task-abc",                # taskId,幂等键
    "sessionId": "...",              # 可选;不传则按 taskId 自动生成
    "message": {
      "role": "user",
      "parts": [{ "type": "text", "text": "Hello agent" }]
    },
    "agent_config": {                # 可选;新 session 用
      "model": "anthropic:claude-sonnet-4-5",
      "tools": [],
      "plugins": [],
      "blueprint": "base"
    }
  }
}

响应(成功):

{
  "jsonrpc": "2.0",
  "id": "req-123",
  "result": {
    "id": "task-abc",
    "status": { "state": "completed", "timestamp": "..." },
    "artifacts": [
      { "name": "reply", "parts": [{ "type": "text", "text": "..." }] }
    ]
  }
}

错误(标准 JSON-RPC error object):

{
  "jsonrpc": "2.0",
  "id": "req-123",
  "error": { "code": -32602, "message": "Invalid params" }
}

Summary

Functions

同步执行一个 A2A Task。

流式 Task:返回 SSE chunk 序列。

Webhook 模式:立即返回 accepted,后台 Agent 异步运行,状态转变时 POST 给 callbackUrl

把 CMDC EventBus 事件翻译为 A2A SSE chunk 的 JSON payload。

Types

message()

@type message() :: %{role: String.t(), parts: [%{type: String.t(), text: String.t()}]}

session_id()

@type session_id() :: String.t()

task_id()

@type task_id() :: String.t()

task_result()

@type task_result() :: %{id: task_id(), status: task_status(), artifacts: [map()]}

task_status()

@type task_status() :: %{state: String.t(), timestamp: String.t()}

Functions

handle_send(params, api_key, tenant_id)

@spec handle_send(params :: map(), api_key :: String.t(), tenant_id :: String.t()) ::
  {:ok, task_result()} | {:error, map()}

同步执行一个 A2A Task。

返回 {:ok, task_result} | {:error, %{code, message}}

  • taskId 重复(已存在同 ID Session)→ 幂等返回原 Task 结果(如果还活着)
  • timeout_ms 默认 60_000;超时返回 {:error, :timeout}

start_subscribe(params, api_key, tenant_id)

@spec start_subscribe(params :: map(), api_key :: String.t(), tenant_id :: String.t()) ::
  {:ok, %{task_id: task_id(), session_id: session_id(), session_pid: pid()}}
  | {:error, map()}

流式 Task:返回 SSE chunk 序列。

调用方:SSEHandler 复用,把 EventBus 事件翻译为 A2A TaskStatusUpdateEvent / TaskArtifactUpdateEvent 格式。本函数仅负责 Session 创建 + prompt 启动 + 返回 session_id 让 caller 接管 SSE 流。

返回 {:ok, %{task_id, session_id, session_pid}}{:error, _}

start_with_webhook(params, api_key, tenant_id)

@spec start_with_webhook(
  params :: map(),
  api_key :: String.t(),
  tenant_id :: String.t()
) ::
  {:ok, %{task_id: task_id(), status: String.t()}} | {:error, map()}

Webhook 模式:立即返回 accepted,后台 Agent 异步运行,状态转变时 POST 给 callbackUrl

A2A 协议长任务交互机制 — 给 >5min 长任务用, 避免 SSE 被 NAT/家庭路由器超时杀连接。

必填 params

  • id:taskId
  • callbackUrl:CMDC POST 回调的目标 URL
  • message:用户消息(同 sendSubscribe)

可选 params

  • webhookSecret:HMAC-SHA256 共享密钥(CMDC 用它在 X-CMDC-Signature header 带签名,回调方验签防伪造)
  • sessionId:复用现有 session(默认按 taskId 创建新 session)

返回 {:ok, %{task_id: ..., status: "accepted"}}{:error, _}

异步流程

  1. 同步:创建 session + EventBus subscribe + 派发 task.accepted webhook
  2. 异步 (Task.start_link):
    • 监听 EventBus 事件 → 翻译为 webhook payload → POST 给 callbackUrl
    • 收到 :agent_end → 派发 task.completed webhook + 退出
    • 收到 :agent_abort / timeout → 派发 task.failed webhook + 退出
  3. 派发失败:3 次指数退避重试,最终失败仅 log(v0.4.0 不持久化)

translate_event(task_id, event)

@spec translate_event(task_id(), term()) :: nil | {:ok, map()}

把 CMDC EventBus 事件翻译为 A2A SSE chunk 的 JSON payload。

返回 nil 表示该事件不映射到 A2A 协议(如内部循环检测),调用方应跳过。 返回 {:ok, map} 时是合法 A2A TaskStatusUpdateEvent / TaskArtifactUpdateEvent