# Agent2Agent 跨设备通信方案 **更新日期:** 2026-03-03 ## 场景分类 ### 场景1:云端Agent ↔ 云端Agent **示例:** 通用助理 → 爬虫运维Agent - **部署:** 同一服务器/进程 - **通信:** 现有`agent`工具(内存调用) - **不需要HTTP接口** ### 场景2:云端Agent ↔ 终端Agent ⭐ **示例:** - 云端通用助理 → 用户笔记本上的代码分析Agent - 用户终端Agent → 云端知识库Agent **需求:** - 云端Agent需要调用终端Agent的能力 - 访问用户本地文件 - 执行本地命令 - 使用本地工具(IDE、浏览器等) - 终端Agent需要调用云端Agent - 访问组织知识库 - 查询共享资源 - 协作任务 **挑战:** - 网络连接(终端可能在NAT后) - 认证和授权 - 数据安全 **需要HTTP接口!** ### 场景3:终端Agent ↔ 终端Agent **示例:** 团队成员的Agent互相协作 - **可能性:** 较小,但可能存在 - **通信:** 通过云端中转或P2P ## 架构方案 ### 方案A:基于现有API封装(推荐) #### 架构图 ``` 云端Agent 终端Agent | | | HTTP POST /api/a2a/call | |--------------------------->| | | | 创建Trace + 执行任务 | | | | WebSocket /api/a2a/watch | |<---------------------------| | 实时进度推送 | | | | HTTP GET /api/a2a/result | |--------------------------->| | 返回最终结果 | |<---------------------------| ``` #### 核心设计 **1. 简化的A2A端点** ```python # agent/api/a2a.py @app.post("/api/a2a/call") async def a2a_call(request: A2ACallRequest): """ 简化的Agent调用接口 请求: { "task": "分析这个项目的架构", "agent_type": "explore", # 可选 "context": { # 可选 "files": [...], "previous_results": {...} }, "callback_url": "https://..." # 可选,完成后回调 } 响应: { "call_id": "a2a-xxx", "status": "started", "watch_url": "ws://host/api/a2a/watch/a2a-xxx" } """ # 1. 认证和授权检查 # 2. 转换为内部格式 messages = [{"role": "user", "content": request.task}] if request.context: messages[0]["content"] += f"\n\n上下文:{json.dumps(request.context)}" # 3. 调用现有runner(复用所有逻辑) config = RunConfig( agent_type=request.agent_type or "default", trace_id=None # 新建 ) # 4. 后台执行 task_id = await start_background_task(runner.run(messages, config)) # 5. 返回call_id(映射到trace_id) return { "call_id": f"a2a-{task_id}", "status": "started", "watch_url": f"ws://{host}/api/a2a/watch/{task_id}" } @app.websocket("/api/a2a/watch/{call_id}") async def a2a_watch(websocket: WebSocket, call_id: str): """ 实时监听执行进度(复用现有WebSocket) 推送消息: { "type": "progress", "data": { "goal": "正在分析文件结构", "progress": 0.3 } } { "type": "completed", "data": { "result": "...", "stats": {...} } } """ # 复用现有的 /api/traces/{id}/watch 逻辑 trace_id = call_id.replace("a2a-", "") await watch_trace(websocket, trace_id) @app.get("/api/a2a/result/{call_id}") async def a2a_result(call_id: str): """ 获取执行结果 响应: { "status": "completed", "result": { "summary": "...", "details": {...} }, "stats": { "duration_ms": 5000, "tokens": 1500, "cost": 0.05 } } """ trace_id = call_id.replace("a2a-", "") trace = await store.get_trace(trace_id) messages = await store.get_main_path_messages(trace_id, trace.head_sequence) # 提取最后的assistant消息作为结果 result = extract_final_result(messages) return { "status": trace.status, "result": result, "stats": { "duration_ms": trace.total_duration_ms, "tokens": trace.total_tokens, "cost": trace.total_cost } } ``` **2. 客户端SDK(终端Agent使用)** ```python # agent/client/a2a_client.py class A2AClient: """A2A客户端,用于调用远程Agent""" def __init__(self, base_url: str, api_key: str): self.base_url = base_url self.api_key = api_key async def call( self, task: str, agent_type: Optional[str] = None, context: Optional[dict] = None, wait: bool = True ) -> Dict[str, Any]: """ 调用远程Agent Args: task: 任务描述 agent_type: Agent类型 context: 上下文信息 wait: 是否等待完成(False则立即返回call_id) """ # 1. 发起调用 response = await self._post("/api/a2a/call", { "task": task, "agent_type": agent_type, "context": context }) call_id = response["call_id"] if not wait: return {"call_id": call_id, "status": "started"} # 2. 等待完成(通过WebSocket或轮询) result = await self._wait_for_completion(call_id) return result async def _wait_for_completion(self, call_id: str): """通过WebSocket监听完成""" async with websockets.connect( f"{self.ws_url}/api/a2a/watch/{call_id}", extra_headers={"Authorization": f"Bearer {self.api_key}"} ) as ws: async for message in ws: data = json.loads(message) if data["type"] == "completed": return data["data"] elif data["type"] == "failed": raise A2AError(data["data"]["error"]) async def get_result(self, call_id: str) -> Dict[str, Any]: """获取执行结果(轮询方式)""" return await self._get(f"/api/a2a/result/{call_id}") ``` **3. 作为工具集成到Agent** ```python # agent/tools/builtin/remote_agent.py @tool(description="调用远程Agent执行任务") async def remote_agent( task: str, agent_url: str, agent_type: Optional[str] = None, context: Optional[dict] = None, ctx: ToolContext = None ) -> ToolResult: """ 调用远程Agent(云端或其他终端) Args: task: 任务描述 agent_url: 远程Agent的URL agent_type: Agent类型 context: 上下文信息 """ # 1. 创建客户端 client = A2AClient( base_url=agent_url, api_key=ctx.config.get("remote_agent_api_key") ) # 2. 调用远程Agent result = await client.call( task=task, agent_type=agent_type, context=context, wait=True ) # 3. 返回结果 return ToolResult( title=f"远程Agent完成: {task[:50]}", output=result["result"]["summary"], long_term_memory=f"调用远程Agent完成任务,耗时{result['stats']['duration_ms']}ms" ) ``` #### 优势 1. **复用现有逻辑** - 所有Trace、Message、Goal管理都复用 2. **简单易用** - 外部只需要提供task,不需要理解Trace概念 3. **完整功能** - 继承所有现有能力(压缩、续跑、回溯等) 4. **渐进式** - 可以先实现基础版本,逐步增强 ### 方案B:实现标准A2A协议 #### 架构 ```python # agent/api/a2a_standard.py @app.post("/api/a2a/v1/tasks") async def create_task(request: A2ATaskRequest): """ 符合Google A2A协议的端点 请求格式(A2A标准): { "header": { "message_id": "msg_001", "timestamp": "2026-03-03T10:30:00Z" }, "task": { "description": "分析项目架构", "capabilities_required": ["file_read", "code_analysis"] }, "context": {...} } """ # 转换A2A格式到内部格式 # 调用runner # 转换结果为A2A格式 ``` #### 优势 1. **标准化** - 符合行业标准 2. **互操作性** - 可以与其他A2A兼容的Agent通信 3. **未来兼容** - 跟随行业发展 #### 劣势 1. **复杂度高** - 需要实现完整的A2A协议 2. **过度设计** - MVP阶段可能不需要 3. **标准未稳定** - A2A协议还在演进中 ## 网络拓扑 ### 拓扑1:云端中心化 ``` 云端Gateway | +-------+-------+ | | | 通用 爬虫 成本 助理 运维 统计 | +-- 调用终端Agent(HTTP) | 用户终端Agent ``` **特点:** - 云端Agent作为中心 - 终端Agent需要暴露HTTP端点 - 需要处理NAT穿透 ### 拓扑2:终端主动连接 ``` 用户终端Agent | | WebSocket长连接 | 云端Gateway | +-- 通过连接推送任务 ``` **特点:** - 终端Agent主动连接云端 - 云端通过WebSocket推送任务 - 无需NAT穿透 - 类似飞书Bot的模式 ### 拓扑3:混合模式(推荐) ``` 云端Agent <--HTTP--> 云端Agent(内存调用) | | WebSocket双向 | 终端Agent <--HTTP--> 终端Agent(如果需要) ``` **特点:** - 云端Agent间用内存调用 - 云端↔终端用WebSocket - 终端间可选HTTP(通过云端中转) ## 认证和授权 ### 1. API Key认证 ```python # 终端Agent启动时注册 POST /api/a2a/register { "agent_id": "user123-laptop", "capabilities": ["file_read", "bash", "browser"], "device_info": {...} } # 返回API Key { "api_key": "ak_xxx", "agent_id": "user123-laptop" } # 后续调用携带API Key Authorization: Bearer ak_xxx ``` ### 2. 权限控制 ```yaml # config/a2a_permissions.yaml agents: user123-laptop: can_access: - conversations/user123/* - resources/public/* cannot_access: - conversations/other_users/* - agents/*/memory/* ``` ### 3. 数据隔离 - 终端Agent只能访问自己用户的数据 - 云端Agent可以访问组织共享数据 - 通过Gateway强制执行 ## 实现路线图 ### Phase 1:基础A2A接口(MVP) **目标:** 云端Agent ↔ 终端Agent基础通信 **实现:** 1. `/api/a2a/call` - 简化调用接口 2. `/api/a2a/watch` - WebSocket监听 3. `/api/a2a/result` - 获取结果 4. `A2AClient` - 客户端SDK 5. `remote_agent` - 工具集成 **时间:** 1-2周 ### Phase 2:增强功能 **目标:** 完善A2A能力 **实现:** 1. 认证和授权 2. 数据隔离 3. 成本控制 4. 审计日志 5. 错误处理和重试 **时间:** 2-3周 ### Phase 3:标准化(可选) **目标:** 兼容A2A标准协议 **实现:** 1. 实现Google A2A协议 2. 能力协商机制 3. 与其他A2A Agent互操作 **时间:** 3-4周 ## 示例场景 ### 场景:云端助理调用终端Agent分析代码 **1. 用户在飞书问:** "帮我分析一下我笔记本上的项目架构" **2. 云端通用助理:** ```python # 识别需要访问用户终端 result = await remote_agent( task="分析 /Users/sunlit/Code/Agent 的项目架构", agent_url="https://user123-laptop.agent.local", agent_type="explore" ) ``` **3. 终端Agent:** - 接收任务 - 创建本地Trace - 使用本地工具(read, glob, grep) - 分析代码结构 - 返回结果 **4. 云端助理:** - 接收终端Agent结果 - 整合到回复中 - 通过飞书返回给用户 ### 场景:终端Agent查询云端知识库 **1. 用户在终端运行:** ```bash agent-cli ask "公司的爬虫部署规范是什么?" ``` **2. 终端Agent:** ```python # 识别需要查询组织知识库 result = await remote_agent( task="查询爬虫部署规范", agent_url="https://org.agent.cloud", agent_type="knowledge_query" ) ``` **3. 云端知识库Agent:** - 查询resources/docs/ - 查询experiences数据库 - 返回相关文档 **4. 终端Agent:** - 接收结果 - 展示给用户 ## 技术细节 ### 1. NAT穿透方案 **方案A:终端主动连接(推荐)** ```python # 终端Agent启动时建立WebSocket长连接 ws = await websockets.connect("wss://org.agent.cloud/api/a2a/connect") # 云端通过连接推送任务 await ws.send(json.dumps({ "type": "task", "task_id": "xxx", "data": {...} })) # 终端执行并返回结果 result = await execute_task(task) await ws.send(json.dumps({ "type": "result", "task_id": "xxx", "data": result })) ``` **方案B:使用ngrok等隧道服务** - 终端Agent启动时创建隧道 - 注册公网URL到云端 - 云端通过公网URL调用 ### 2. 消息序列化 ```python # 简化格式(内部使用) { "task": "string", "context": {...} } # 标准A2A格式(外部互操作) { "header": {...}, "task": {...}, "capabilities": [...] } # 自动转换 def to_a2a_format(internal_msg): return { "header": generate_header(), "task": {"description": internal_msg["task"]}, "context": internal_msg.get("context", {}) } ``` ### 3. 流式响应 ```python # 支持流式返回中间结果 @app.websocket("/api/a2a/stream/{call_id}") async def a2a_stream(websocket: WebSocket, call_id: str): async for event in runner.run(...): if isinstance(event, Message): await websocket.send_json({ "type": "message", "data": event.to_dict() }) ``` ## 安全考虑 1. **认证:** API Key + JWT 2. **授权:** 基于角色的访问控制 3. **加密:** HTTPS/WSS强制 4. **限流:** 防止滥用 5. **审计:** 所有A2A调用记录 6. **隔离:** 数据访问严格隔离 ## 成本控制 ```python # 每次A2A调用记录成本 { "call_id": "a2a-xxx", "caller": "user123-laptop", "callee": "org-cloud-agent", "tokens": 1500, "cost": 0.05, "duration_ms": 5000 } # 限额检查 if user_cost_today > user_limit: raise CostLimitExceeded() ``` ## 总结 ### 推荐方案 **Phase 1(MVP):** 方案A - 基于现有API封装 - 简单、快速 - 复用所有现有逻辑 - 满足跨设备通信需求 **Phase 3+:** 可选实现标准A2A协议 - 如果需要与外部系统互操作 - 跟随行业标准发展 ### 关键优势 1. **复用现有能力** - Trace、Message、Goal、压缩等 2. **渐进式实现** - 先简单后复杂 3. **灵活扩展** - 可以逐步增强功能 4. **标准兼容** - 未来可以支持A2A标准