# Agent2Agent 持续对话方案 **更新日期:** 2026-03-03 ## 问题定义 ### 单次任务 vs 持续对话 **单次任务(之前的方案):** ``` 云端Agent: 请分析本地项目 ↓ 终端Agent: [执行分析] → 返回结果 ↓ 结束 ``` **持续对话(新需求):** ``` 云端Agent: 请分析本地项目 ↓ 终端Agent: 我看到有3个模块,你想重点分析哪个? ↓ 云端Agent: 重点分析core模块 ↓ 终端Agent: core模块使用了X架构,需要我详细说明吗? ↓ 云端Agent: 是的,请详细说明 ↓ 终端Agent: [详细分析] → 返回结果 ↓ 结束(或继续) ``` ## 核心挑战 1. **上下文延续** - 如何维护多轮对话的上下文? 2. **状态管理** - 对话进行到哪一步?谁在等待谁? 3. **消息路由** - 如何确保消息发送到正确的Agent? 4. **会话生命周期** - 何时开始?何时结束? 5. **异步通信** - Agent可能不在线,如何处理? ## 方案对比 ### 方案1:基于Trace的持续对话(推荐) #### 核心思想 **利用现有的Trace机制作为对话容器** - 每个A2A对话创建一个共享的Trace - 双方Agent都可以向这个Trace追加消息 - 通过WebSocket实时同步消息 - 利用现有的续跑机制(`continue_from`) #### 架构设计 ``` 云端Agent 共享Trace 终端Agent | | | | 创建对话 | | |--------------------->| | | | | | 发送消息1 | | |--------------------->| | | |----WebSocket推送------->| | | | | |<----追加消息2-----------| |<--WebSocket推送------| | | | | | 发送消息3 | | |--------------------->| | | |----WebSocket推送------->| | | | ...持续对话... ``` #### API设计 ```python # 1. 创建对话会话 POST /api/a2a/sessions { "participants": ["cloud-agent-123", "terminal-agent-456"], "initial_message": "请分析本地项目", "context": {...} } 响应: { "session_id": "sess-xxx", "trace_id": "trace-yyy", # 底层使用Trace "ws_url": "wss://host/api/a2a/sessions/sess-xxx/stream" } # 2. 发送消息(追加到Trace) POST /api/a2a/sessions/{session_id}/messages { "from": "cloud-agent-123", "content": "重点分析core模块", "wait_for_response": true # 是否等待对方回复 } 响应: { "message_id": "msg-xxx", "status": "sent" } # 3. WebSocket监听(实时接收消息) WS /api/a2a/sessions/{session_id}/stream { "type": "message", "from": "terminal-agent-456", "content": "我看到有3个模块,你想重点分析哪个?", "message_id": "msg-yyy" } # 4. 获取对话历史 GET /api/a2a/sessions/{session_id}/messages 响应: { "messages": [ {"from": "cloud-agent-123", "content": "...", "timestamp": "..."}, {"from": "terminal-agent-456", "content": "...", "timestamp": "..."}, ... ] } # 5. 结束对话 POST /api/a2a/sessions/{session_id}/close { "reason": "completed" } ``` #### 实现细节 ```python # agent/api/a2a_session.py class A2ASession: """A2A对话会话,基于Trace实现""" def __init__(self, session_id: str, trace_id: str, participants: List[str]): self.session_id = session_id self.trace_id = trace_id self.participants = participants self.ws_connections = {} # agent_id -> WebSocket async def send_message( self, from_agent: str, content: str, wait_for_response: bool = False ) -> Dict[str, Any]: """发送消息到对话""" # 1. 追加消息到Trace messages = [{"role": "user", "content": content}] config = RunConfig( trace_id=self.trace_id, after_sequence=None, # 续跑模式 uid=from_agent ) # 2. 执行(可能触发对方Agent的响应) async for event in runner.run(messages, config): if isinstance(event, Message): # 3. 通过WebSocket推送给其他参与者 await self._broadcast_message(event, exclude=from_agent) if wait_for_response and event.role == "assistant": # 等待对方回复 return {"message_id": event.message_id, "status": "sent"} return {"status": "completed"} async def _broadcast_message(self, message: Message, exclude: str = None): """广播消息给所有参与者(除了发送者)""" for agent_id, ws in self.ws_connections.items(): if agent_id != exclude: await ws.send_json({ "type": "message", "from": exclude, "content": message.content, "message_id": message.message_id, "timestamp": message.created_at.isoformat() }) @app.post("/api/a2a/sessions") async def create_session(request: CreateSessionRequest): """创建A2A对话会话""" # 1. 创建底层Trace trace = Trace( trace_id=generate_trace_id(), mode="agent", task=request.initial_message, agent_type="a2a_session", context={ "session_type": "a2a", "participants": request.participants } ) await store.create_trace(trace) # 2. 创建Session对象 session_id = f"sess-{generate_id()}" session = A2ASession(session_id, trace.trace_id, request.participants) # 3. 存储Session(内存或Redis) sessions[session_id] = session # 4. 发送初始消息 if request.initial_message: await session.send_message( from_agent=request.participants[0], content=request.initial_message ) return { "session_id": session_id, "trace_id": trace.trace_id, "ws_url": f"wss://{host}/api/a2a/sessions/{session_id}/stream" } @app.websocket("/api/a2a/sessions/{session_id}/stream") async def session_stream(websocket: WebSocket, session_id: str): """WebSocket连接,实时接收对话消息""" await websocket.accept() # 1. 获取Session session = sessions.get(session_id) if not session: await websocket.close(code=404) return # 2. 识别连接的Agent agent_id = await authenticate_websocket(websocket) # 3. 注册WebSocket连接 session.ws_connections[agent_id] = websocket try: # 4. 保持连接,接收消息 async for message in websocket: data = json.loads(message) if data["type"] == "message": # 发送消息到对话 await session.send_message( from_agent=agent_id, content=data["content"] ) finally: # 5. 清理连接 del session.ws_connections[agent_id] @app.post("/api/a2a/sessions/{session_id}/messages") async def send_session_message(session_id: str, request: SendMessageRequest): """发送消息到对话(HTTP方式)""" session = sessions.get(session_id) if not session: raise HTTPException(404, "Session not found") result = await session.send_message( from_agent=request.from_agent, content=request.content, wait_for_response=request.wait_for_response ) return result ``` #### 客户端SDK ```python # agent/client/a2a_session_client.py class A2ASessionClient: """A2A持续对话客户端""" def __init__(self, base_url: str, agent_id: str, api_key: str): self.base_url = base_url self.agent_id = agent_id self.api_key = api_key self.ws = None self.message_handlers = [] async def create_session( self, other_agent: str, initial_message: str ) -> str: """创建对话会话""" response = await self._post("/api/a2a/sessions", { "participants": [self.agent_id, other_agent], "initial_message": initial_message }) session_id = response["session_id"] # 自动连接WebSocket await self._connect_websocket(session_id) return session_id async def _connect_websocket(self, session_id: str): """连接WebSocket接收消息""" ws_url = f"{self.ws_url}/api/a2a/sessions/{session_id}/stream" self.ws = await websockets.connect( ws_url, extra_headers={"Authorization": f"Bearer {self.api_key}"} ) # 启动消息接收循环 asyncio.create_task(self._receive_messages()) async def _receive_messages(self): """接收WebSocket消息""" async for message in self.ws: data = json.loads(message) if data["type"] == "message": # 调用注册的消息处理器 for handler in self.message_handlers: await handler(data) async def send_message(self, session_id: str, content: str): """发送消息""" if self.ws: # 通过WebSocket发送(实时) await self.ws.send(json.dumps({ "type": "message", "content": content })) else: # 通过HTTP发送(备用) await self._post(f"/api/a2a/sessions/{session_id}/messages", { "from_agent": self.agent_id, "content": content }) def on_message(self, handler): """注册消息处理器""" self.message_handlers.append(handler) async def close_session(self, session_id: str): """关闭对话""" await self._post(f"/api/a2a/sessions/{session_id}/close", {}) if self.ws: await self.ws.close() ``` #### 使用示例 ```python # 云端Agent使用 client = A2ASessionClient( base_url="https://org.agent.cloud", agent_id="cloud-agent-123", api_key="ak_xxx" ) # 创建对话 session_id = await client.create_session( other_agent="terminal-agent-456", initial_message="请分析本地项目" ) # 注册消息处理器 @client.on_message async def handle_message(message): print(f"收到消息: {message['content']}") # 根据消息内容决定如何回复 if "哪个模块" in message['content']: await client.send_message(session_id, "重点分析core模块") elif "需要我详细说明吗" in message['content']: await client.send_message(session_id, "是的,请详细说明") # 等待对话完成 await asyncio.sleep(60) # 或其他结束条件 # 关闭对话 await client.close_session(session_id) ``` #### 优势 1. **复用Trace机制** - 所有消息管理、压缩、存储都复用 2. **完整历史** - 对话历史自动保存在Trace中 3. **实时通信** - WebSocket保证低延迟 4. **状态追踪** - 利用Trace的状态管理 5. **可回溯** - 可以查看完整的对话历史 #### 劣势 1. **Trace概念泄露** - 外部需要理解session_id和trace_id的关系 2. **复杂度** - 需要管理WebSocket连接 ### 方案2:独立的对话管理器 #### 核心思想 **创建独立的对话管理系统,不依赖Trace** ```python class Conversation: """独立的对话对象""" conversation_id: str participants: List[str] messages: List[ConversationMessage] status: str # active, waiting, completed created_at: datetime class ConversationMessage: """对话消息""" message_id: str from_agent: str to_agent: Optional[str] # None表示广播 content: str timestamp: datetime metadata: Dict ``` #### 优势 1. **概念清晰** - 对话就是对话,不混淆Trace 2. **轻量级** - 不需要Trace的重量级机制 3. **灵活** - 可以自定义对话逻辑 #### 劣势 1. **重复实现** - 需要重新实现消息管理、存储、压缩 2. **不一致** - 与现有Trace机制不一致 3. **维护成本** - 需要维护两套系统 ### 方案3:混合模式(推荐) #### 核心思想 **对话层(Session)+ 执行层(Trace)分离** ``` 对话层(Session) - 管理对话状态 - 路由消息 - WebSocket连接 | | 每条消息触发 ↓ 执行层(Trace) - 执行具体任务 - 调用工具 - 管理上下文 ``` #### 架构 ```python class A2ASession: """对话会话(轻量级)""" session_id: str participants: List[str] current_speaker: str waiting_for: Optional[str] context: Dict # 共享上下文 message_queue: List[Message] async def send_message(self, from_agent: str, content: str): """发送消息""" # 1. 添加到消息队列 self.message_queue.append(Message(from_agent, content)) # 2. 如果需要执行(不是简单问答),创建Trace if self._needs_execution(content): trace_id = await self._create_execution_trace(content) # 执行完成后,结果自动添加到消息队列 else: # 简单消息,直接转发 await self._forward_message(content, to=self._get_other_agent(from_agent)) def _needs_execution(self, content: str) -> bool: """判断是否需要创建Trace执行""" # 例如:包含工具调用、复杂任务等 return "分析" in content or "执行" in content or "查询" in content ``` #### 优势 1. **分层清晰** - 对话管理和任务执行分离 2. **灵活** - 简单消息不需要Trace,复杂任务才创建 3. **高效** - 避免为每条消息创建Trace 4. **复用** - 复杂任务仍然复用Trace机制 #### 实现示例 ```python @app.post("/api/a2a/sessions/{session_id}/messages") async def send_message(session_id: str, request: SendMessageRequest): session = sessions[session_id] # 1. 判断消息类型 if request.requires_execution: # 需要执行的任务 → 创建Trace trace_id = await create_execution_trace( task=request.content, parent_session=session_id, agent_id=request.from_agent ) # 执行完成后,结果自动推送到Session result = await runner.run_result( messages=[{"role": "user", "content": request.content}], config=RunConfig(trace_id=trace_id) ) # 将结果作为消息发送给对方 await session.send_message( from_agent=request.from_agent, content=result["summary"] ) else: # 简单消息 → 直接转发 await session.send_message( from_agent=request.from_agent, content=request.content ) return {"status": "sent"} ``` ## 关键设计决策 ### 1. 消息类型分类 | 类型 | 示例 | 处理方式 | |------|------|----------| | **简单问答** | "你好"、"收到"、"明白了" | 直接转发,不创建Trace | | **信息查询** | "当前进度如何?" | 查询Session状态,返回 | | **任务请求** | "分析core模块" | 创建Trace执行 | | **工具调用** | "读取文件X" | 创建Trace执行 | ### 2. 上下文管理 **Session级上下文(轻量):** ```python session.context = { "current_topic": "项目分析", "focus_module": "core", "previous_results": {...} } ``` **Trace级上下文(完整):** - 完整的消息历史 - 工具调用记录 - Goal树 ### 3. 生命周期管理 ```python # Session生命周期 created → active → waiting → active → ... → completed/timeout # Trace生命周期(每个任务) created → running → completed ``` ### 4. 超时和重连 ```python class A2ASession: timeout: int = 300 # 5分钟无活动则超时 last_activity: datetime async def check_timeout(self): if datetime.now() - self.last_activity > timedelta(seconds=self.timeout): await self.close(reason="timeout") async def reconnect(self, agent_id: str, ws: WebSocket): """Agent重连""" self.ws_connections[agent_id] = ws # 发送未读消息 await self._send_unread_messages(agent_id) ``` ## 实现路线图 ### Phase 1:基础对话能力(2-3周) 1. **Session管理** - 创建/关闭Session - 消息路由 - WebSocket连接管理 2. **简单消息转发** - 不涉及Trace - 纯消息传递 3. **客户端SDK** - `A2ASessionClient` - 消息处理器 ### Phase 2:集成Trace执行(2-3周) 1. **任务识别** - 判断消息是否需要执行 - 自动创建Trace 2. **结果集成** - Trace结果转换为消息 - 自动推送给对方 3. **上下文共享** - Session上下文传递给Trace - Trace结果更新Session上下文 ### Phase 3:高级功能(3-4周) 1. **多方对话** - 支持3个以上Agent - 群聊模式 2. **对话分支** - 子对话 - 并行对话 3. **持久化和恢复** - Session持久化 - 断线重连 ## 示例场景 ### 场景:云端助理与终端Agent的持续对话 ```python # 云端助理 client = A2ASessionClient("https://cloud", "cloud-agent", "ak_xxx") # 1. 创建对话 session_id = await client.create_session( other_agent="terminal-agent-456", initial_message="请分析本地项目" ) # 2. 注册消息处理器(自动响应) @client.on_message async def handle_message(msg): content = msg['content'] if "哪个模块" in content: # 简单回复,不需要执行 await client.send_message(session_id, "重点分析core模块") elif "详细说明" in content: # 需要进一步分析,触发执行 await client.send_message( session_id, "是的,请详细说明架构设计和关键组件", requires_execution=True # 标记需要执行 ) # 3. 等待对话完成 await client.wait_for_completion(session_id) ``` ```python # 终端Agent client = A2ASessionClient("https://terminal", "terminal-agent-456", "ak_yyy") # 1. 监听新对话 @client.on_new_session async def handle_new_session(session_id, initial_message): # 分析项目 modules = await analyze_project() # 询问用户 await client.send_message( session_id, f"我看到有{len(modules)}个模块:{', '.join(modules)},你想重点分析哪个?" ) # 2. 处理后续消息 @client.on_message async def handle_message(msg): if "core模块" in msg['content']: # 执行分析 result = await analyze_module("core") # 返回结果并询问 await client.send_message( msg['session_id'], f"core模块使用了{result['architecture']}架构,需要我详细说明吗?" ) elif "详细说明" in msg['content']: # 深度分析 details = await deep_analyze("core") await client.send_message( msg['session_id'], f"详细架构:\n{details}" ) ``` ## 总结 ### 推荐方案:混合模式 **对话层(Session):** - 轻量级消息路由 - WebSocket连接管理 - 简单问答直接转发 **执行层(Trace):** - 复杂任务创建Trace - 复用所有现有能力 - 结果自动集成到对话 ### 关键优势 1. **灵活** - 简单消息不需要Trace开销 2. **强大** - 复杂任务复用Trace能力 3. **清晰** - 对话和执行分层 4. **高效** - 避免不必要的资源消耗 ### 实现优先级 1. **Phase 1** - 基础Session + 简单消息(MVP) 2. **Phase 2** - 集成Trace执行(核心能力) 3. **Phase 3** - 高级功能(按需)