Agent2Agent (A2A) 协议 — Task RPC 实现(ADP 第 15 章 / Phase 11C)。
本模块实现 A2A 协议中两个核心 Task 端点:
tasks/send— 同步 Task:单次 Prompt → 等待 Agent 完成 → 返回完整 Task 结果tasks/sendSubscribe— 流式 Task:单次 Prompt → 推送TaskStatusUpdateEvent与TaskArtifactUpdateEvent直至完成
与
/v1/sessions/:id/prompt区别:A2A Task 协议是外部 Agent 调用我方 Agent 的 单次任务接口,每次 send 创建独立 Session(task-scoped),不复用对话历史。 适合"Agent 调用 Agent"的 RPC 场景。
JSON-RPC 2.0 形态
请求体:
{
"jsonrpc": "2.0",
"id": "req-123",
"method": "tasks/send",
"params": {
"id": "task-abc", # taskId,幂等键
"sessionId": "...", # 可选;不传则按 taskId 自动生成
"message": {
"role": "user",
"parts": [{ "type": "text", "text": "Hello agent" }]
},
"agent_config": { # 可选;新 session 用
"model": "anthropic:claude-sonnet-4-5",
"tools": [],
"plugins": [],
"blueprint": "base"
}
}
}响应(成功):
{
"jsonrpc": "2.0",
"id": "req-123",
"result": {
"id": "task-abc",
"status": { "state": "completed", "timestamp": "..." },
"artifacts": [
{ "name": "reply", "parts": [{ "type": "text", "text": "..." }] }
]
}
}错误(标准 JSON-RPC error object):
{
"jsonrpc": "2.0",
"id": "req-123",
"error": { "code": -32602, "message": "Invalid params" }
}
Summary
Functions
同步执行一个 A2A Task。
流式 Task:返回 SSE chunk 序列。
Webhook 模式:立即返回 accepted,后台 Agent 异步运行,状态转变时 POST 给 callbackUrl。
把 CMDC EventBus 事件翻译为 A2A SSE chunk 的 JSON payload。
Types
@type session_id() :: String.t()
@type task_id() :: String.t()
@type task_result() :: %{id: task_id(), status: task_status(), artifacts: [map()]}
Functions
@spec handle_send(params :: map(), api_key :: String.t(), tenant_id :: String.t()) :: {:ok, task_result()} | {:error, map()}
同步执行一个 A2A Task。
返回 {:ok, task_result} | {:error, %{code, message}}。
taskId重复(已存在同 ID Session)→ 幂等返回原 Task 结果(如果还活着)timeout_ms默认 60_000;超时返回{:error, :timeout}
@spec start_subscribe(params :: map(), api_key :: String.t(), tenant_id :: String.t()) :: {:ok, %{task_id: task_id(), session_id: session_id(), session_pid: pid()}} | {:error, map()}
流式 Task:返回 SSE chunk 序列。
调用方:SSEHandler 复用,把 EventBus 事件翻译为 A2A TaskStatusUpdateEvent /
TaskArtifactUpdateEvent 格式。本函数仅负责 Session 创建 + prompt 启动 +
返回 session_id 让 caller 接管 SSE 流。
返回 {:ok, %{task_id, session_id, session_pid}} 或 {:error, _}。
@spec start_with_webhook( params :: map(), api_key :: String.t(), tenant_id :: String.t() ) :: {:ok, %{task_id: task_id(), status: String.t()}} | {:error, map()}
Webhook 模式:立即返回 accepted,后台 Agent 异步运行,状态转变时 POST 给 callbackUrl。
对标 ADP Ch.15 A2A 4 种交互机制之一 — 给 >5min 长任务用, 避免 SSE 被 NAT/家庭路由器超时杀连接。
必填 params
id:taskIdcallbackUrl:CMDC POST 回调的目标 URLmessage:用户消息(同 sendSubscribe)
可选 params
webhookSecret:HMAC-SHA256 共享密钥(CMDC 用它在X-CMDC-Signatureheader 带签名,回调方验签防伪造)sessionId:复用现有 session(默认按 taskId 创建新 session)
返回 {:ok, %{task_id: ..., status: "accepted"}} 或 {:error, _}。
异步流程
- 同步:创建 session + EventBus subscribe + 派发
task.acceptedwebhook - 异步 (Task.start_link):
- 监听 EventBus 事件 → 翻译为 webhook payload → POST 给 callbackUrl
- 收到
:agent_end→ 派发task.completedwebhook + 退出 - 收到
:agent_abort/ timeout → 派发task.failedwebhook + 退出
- 派发失败:3 次指数退避重试,最终失败仅 log(v0.4.0 不持久化)
把 CMDC EventBus 事件翻译为 A2A SSE chunk 的 JSON payload。
返回 nil 表示该事件不映射到 A2A 协议(如内部循环检测),调用方应跳过。
返回 {:ok, map} 时是合法 A2A TaskStatusUpdateEvent / TaskArtifactUpdateEvent。