|
|
@@ -1,733 +0,0 @@
|
|
|
-# Agent2Agent 持续对话方案
|
|
|
-
|
|
|
-**更新日期:** 2026-03-03
|
|
|
-
|
|
|
-## 问题定义
|
|
|
-
|
|
|
-### 单次任务 vs 持续对话
|
|
|
-
|
|
|
-**单次任务(之前的方案):**
|
|
|
-```
|
|
|
-云端Agent: 请分析本地项目
|
|
|
- ↓
|
|
|
-终端Agent: [执行分析] → 返回结果
|
|
|
- ↓
|
|
|
-结束
|
|
|
-```
|
|
|
-
|
|
|
-**持续对话(新需求):**
|
|
|
-```
|
|
|
-云端Agent: 请分析本地项目
|
|
|
- ↓
|
|
|
-终端Agent: 我看到有3个模块,你想重点分析哪个?
|
|
|
- ↓
|
|
|
-云端Agent: 重点分析core模块
|
|
|
- ↓
|
|
|
-终端Agent: core模块使用了X架构,需要我详细说明吗?
|
|
|
- ↓
|
|
|
-云端Agent: 是的,请详细说明
|
|
|
- ↓
|
|
|
-终端Agent: [详细分析] → 返回结果
|
|
|
- ↓
|
|
|
-结束(或继续)
|
|
|
-```
|
|
|
-
|
|
|
-## 核心挑战
|
|
|
-
|
|
|
-1. **上下文延续** - 如何维护多轮对话的上下文?
|
|
|
-2. **状态管理** - 对话进行到哪一步?谁在等待谁?
|
|
|
-3. **消息路由** - 如何确保消息发送到正确的Agent?
|
|
|
-4. **会话生命周期** - 何时开始?何时结束?
|
|
|
-5. **异步通信** - Agent可能不在线,如何处理?
|
|
|
-
|
|
|
-## 方案对比
|
|
|
-
|
|
|
-### 方案1:基于Trace的持续对话(推荐)
|
|
|
-
|
|
|
-#### 核心思想
|
|
|
-
|
|
|
-**利用现有的Trace机制作为对话容器**
|
|
|
-
|
|
|
-- 每个A2A对话创建一个共享的Trace
|
|
|
-- 双方Agent都可以向这个Trace追加消息
|
|
|
-- 通过WebSocket实时同步消息
|
|
|
-- 利用现有的续跑机制(`continue_from`)
|
|
|
-
|
|
|
-#### 架构设计
|
|
|
-
|
|
|
-```
|
|
|
-云端Agent 共享Trace 终端Agent
|
|
|
- | | |
|
|
|
- | 创建对话 | |
|
|
|
- |--------------------->| |
|
|
|
- | | |
|
|
|
- | 发送消息1 | |
|
|
|
- |--------------------->| |
|
|
|
- | |----WebSocket推送------->|
|
|
|
- | | |
|
|
|
- | |<----追加消息2-----------|
|
|
|
- |<--WebSocket推送------| |
|
|
|
- | | |
|
|
|
- | 发送消息3 | |
|
|
|
- |--------------------->| |
|
|
|
- | |----WebSocket推送------->|
|
|
|
- | | |
|
|
|
- ...持续对话...
|
|
|
-```
|
|
|
-
|
|
|
-#### API设计
|
|
|
-
|
|
|
-```python
|
|
|
-# 1. 创建对话会话
|
|
|
-POST /api/a2a/sessions
|
|
|
-{
|
|
|
- "participants": ["cloud-agent-123", "terminal-agent-456"],
|
|
|
- "initial_message": "请分析本地项目",
|
|
|
- "context": {...}
|
|
|
-}
|
|
|
-
|
|
|
-响应:
|
|
|
-{
|
|
|
- "session_id": "sess-xxx",
|
|
|
- "trace_id": "trace-yyy", # 底层使用Trace
|
|
|
- "ws_url": "wss://host/api/a2a/sessions/sess-xxx/stream"
|
|
|
-}
|
|
|
-
|
|
|
-# 2. 发送消息(追加到Trace)
|
|
|
-POST /api/a2a/sessions/{session_id}/messages
|
|
|
-{
|
|
|
- "from": "cloud-agent-123",
|
|
|
- "content": "重点分析core模块",
|
|
|
- "wait_for_response": true # 是否等待对方回复
|
|
|
-}
|
|
|
-
|
|
|
-响应:
|
|
|
-{
|
|
|
- "message_id": "msg-xxx",
|
|
|
- "status": "sent"
|
|
|
-}
|
|
|
-
|
|
|
-# 3. WebSocket监听(实时接收消息)
|
|
|
-WS /api/a2a/sessions/{session_id}/stream
|
|
|
-{
|
|
|
- "type": "message",
|
|
|
- "from": "terminal-agent-456",
|
|
|
- "content": "我看到有3个模块,你想重点分析哪个?",
|
|
|
- "message_id": "msg-yyy"
|
|
|
-}
|
|
|
-
|
|
|
-# 4. 获取对话历史
|
|
|
-GET /api/a2a/sessions/{session_id}/messages
|
|
|
-响应:
|
|
|
-{
|
|
|
- "messages": [
|
|
|
- {"from": "cloud-agent-123", "content": "...", "timestamp": "..."},
|
|
|
- {"from": "terminal-agent-456", "content": "...", "timestamp": "..."},
|
|
|
- ...
|
|
|
- ]
|
|
|
-}
|
|
|
-
|
|
|
-# 5. 结束对话
|
|
|
-POST /api/a2a/sessions/{session_id}/close
|
|
|
-{
|
|
|
- "reason": "completed"
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-#### 实现细节
|
|
|
-
|
|
|
-```python
|
|
|
-# agent/api/a2a_session.py
|
|
|
-
|
|
|
-class A2ASession:
|
|
|
- """A2A对话会话,基于Trace实现"""
|
|
|
-
|
|
|
- def __init__(self, session_id: str, trace_id: str, participants: List[str]):
|
|
|
- self.session_id = session_id
|
|
|
- self.trace_id = trace_id
|
|
|
- self.participants = participants
|
|
|
- self.ws_connections = {} # agent_id -> WebSocket
|
|
|
-
|
|
|
- async def send_message(
|
|
|
- self,
|
|
|
- from_agent: str,
|
|
|
- content: str,
|
|
|
- wait_for_response: bool = False
|
|
|
- ) -> Dict[str, Any]:
|
|
|
- """发送消息到对话"""
|
|
|
- # 1. 追加消息到Trace
|
|
|
- messages = [{"role": "user", "content": content}]
|
|
|
- config = RunConfig(
|
|
|
- trace_id=self.trace_id,
|
|
|
- after_sequence=None, # 续跑模式
|
|
|
- uid=from_agent
|
|
|
- )
|
|
|
-
|
|
|
- # 2. 执行(可能触发对方Agent的响应)
|
|
|
- async for event in runner.run(messages, config):
|
|
|
- if isinstance(event, Message):
|
|
|
- # 3. 通过WebSocket推送给其他参与者
|
|
|
- await self._broadcast_message(event, exclude=from_agent)
|
|
|
-
|
|
|
- if wait_for_response and event.role == "assistant":
|
|
|
- # 等待对方回复
|
|
|
- return {"message_id": event.message_id, "status": "sent"}
|
|
|
-
|
|
|
- return {"status": "completed"}
|
|
|
-
|
|
|
- async def _broadcast_message(self, message: Message, exclude: str = None):
|
|
|
- """广播消息给所有参与者(除了发送者)"""
|
|
|
- for agent_id, ws in self.ws_connections.items():
|
|
|
- if agent_id != exclude:
|
|
|
- await ws.send_json({
|
|
|
- "type": "message",
|
|
|
- "from": exclude,
|
|
|
- "content": message.content,
|
|
|
- "message_id": message.message_id,
|
|
|
- "timestamp": message.created_at.isoformat()
|
|
|
- })
|
|
|
-
|
|
|
-
|
|
|
-@app.post("/api/a2a/sessions")
|
|
|
-async def create_session(request: CreateSessionRequest):
|
|
|
- """创建A2A对话会话"""
|
|
|
- # 1. 创建底层Trace
|
|
|
- trace = Trace(
|
|
|
- trace_id=generate_trace_id(),
|
|
|
- mode="agent",
|
|
|
- task=request.initial_message,
|
|
|
- agent_type="a2a_session",
|
|
|
- context={
|
|
|
- "session_type": "a2a",
|
|
|
- "participants": request.participants
|
|
|
- }
|
|
|
- )
|
|
|
- await store.create_trace(trace)
|
|
|
-
|
|
|
- # 2. 创建Session对象
|
|
|
- session_id = f"sess-{generate_id()}"
|
|
|
- session = A2ASession(session_id, trace.trace_id, request.participants)
|
|
|
-
|
|
|
- # 3. 存储Session(内存或Redis)
|
|
|
- sessions[session_id] = session
|
|
|
-
|
|
|
- # 4. 发送初始消息
|
|
|
- if request.initial_message:
|
|
|
- await session.send_message(
|
|
|
- from_agent=request.participants[0],
|
|
|
- content=request.initial_message
|
|
|
- )
|
|
|
-
|
|
|
- return {
|
|
|
- "session_id": session_id,
|
|
|
- "trace_id": trace.trace_id,
|
|
|
- "ws_url": f"wss://{host}/api/a2a/sessions/{session_id}/stream"
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-@app.websocket("/api/a2a/sessions/{session_id}/stream")
|
|
|
-async def session_stream(websocket: WebSocket, session_id: str):
|
|
|
- """WebSocket连接,实时接收对话消息"""
|
|
|
- await websocket.accept()
|
|
|
-
|
|
|
- # 1. 获取Session
|
|
|
- session = sessions.get(session_id)
|
|
|
- if not session:
|
|
|
- await websocket.close(code=404)
|
|
|
- return
|
|
|
-
|
|
|
- # 2. 识别连接的Agent
|
|
|
- agent_id = await authenticate_websocket(websocket)
|
|
|
-
|
|
|
- # 3. 注册WebSocket连接
|
|
|
- session.ws_connections[agent_id] = websocket
|
|
|
-
|
|
|
- try:
|
|
|
- # 4. 保持连接,接收消息
|
|
|
- async for message in websocket:
|
|
|
- data = json.loads(message)
|
|
|
- if data["type"] == "message":
|
|
|
- # 发送消息到对话
|
|
|
- await session.send_message(
|
|
|
- from_agent=agent_id,
|
|
|
- content=data["content"]
|
|
|
- )
|
|
|
- finally:
|
|
|
- # 5. 清理连接
|
|
|
- del session.ws_connections[agent_id]
|
|
|
-
|
|
|
-
|
|
|
-@app.post("/api/a2a/sessions/{session_id}/messages")
|
|
|
-async def send_session_message(session_id: str, request: SendMessageRequest):
|
|
|
- """发送消息到对话(HTTP方式)"""
|
|
|
- session = sessions.get(session_id)
|
|
|
- if not session:
|
|
|
- raise HTTPException(404, "Session not found")
|
|
|
-
|
|
|
- result = await session.send_message(
|
|
|
- from_agent=request.from_agent,
|
|
|
- content=request.content,
|
|
|
- wait_for_response=request.wait_for_response
|
|
|
- )
|
|
|
-
|
|
|
- return result
|
|
|
-```
|
|
|
-
|
|
|
-#### 客户端SDK
|
|
|
-
|
|
|
-```python
|
|
|
-# agent/client/a2a_session_client.py
|
|
|
-
|
|
|
-class A2ASessionClient:
|
|
|
- """A2A持续对话客户端"""
|
|
|
-
|
|
|
- def __init__(self, base_url: str, agent_id: str, api_key: str):
|
|
|
- self.base_url = base_url
|
|
|
- self.agent_id = agent_id
|
|
|
- self.api_key = api_key
|
|
|
- self.ws = None
|
|
|
- self.message_handlers = []
|
|
|
-
|
|
|
- async def create_session(
|
|
|
- self,
|
|
|
- other_agent: str,
|
|
|
- initial_message: str
|
|
|
- ) -> str:
|
|
|
- """创建对话会话"""
|
|
|
- response = await self._post("/api/a2a/sessions", {
|
|
|
- "participants": [self.agent_id, other_agent],
|
|
|
- "initial_message": initial_message
|
|
|
- })
|
|
|
-
|
|
|
- session_id = response["session_id"]
|
|
|
-
|
|
|
- # 自动连接WebSocket
|
|
|
- await self._connect_websocket(session_id)
|
|
|
-
|
|
|
- return session_id
|
|
|
-
|
|
|
- async def _connect_websocket(self, session_id: str):
|
|
|
- """连接WebSocket接收消息"""
|
|
|
- ws_url = f"{self.ws_url}/api/a2a/sessions/{session_id}/stream"
|
|
|
- self.ws = await websockets.connect(
|
|
|
- ws_url,
|
|
|
- extra_headers={"Authorization": f"Bearer {self.api_key}"}
|
|
|
- )
|
|
|
-
|
|
|
- # 启动消息接收循环
|
|
|
- asyncio.create_task(self._receive_messages())
|
|
|
-
|
|
|
- async def _receive_messages(self):
|
|
|
- """接收WebSocket消息"""
|
|
|
- async for message in self.ws:
|
|
|
- data = json.loads(message)
|
|
|
- if data["type"] == "message":
|
|
|
- # 调用注册的消息处理器
|
|
|
- for handler in self.message_handlers:
|
|
|
- await handler(data)
|
|
|
-
|
|
|
- async def send_message(self, session_id: str, content: str):
|
|
|
- """发送消息"""
|
|
|
- if self.ws:
|
|
|
- # 通过WebSocket发送(实时)
|
|
|
- await self.ws.send(json.dumps({
|
|
|
- "type": "message",
|
|
|
- "content": content
|
|
|
- }))
|
|
|
- else:
|
|
|
- # 通过HTTP发送(备用)
|
|
|
- await self._post(f"/api/a2a/sessions/{session_id}/messages", {
|
|
|
- "from_agent": self.agent_id,
|
|
|
- "content": content
|
|
|
- })
|
|
|
-
|
|
|
- def on_message(self, handler):
|
|
|
- """注册消息处理器"""
|
|
|
- self.message_handlers.append(handler)
|
|
|
-
|
|
|
- async def close_session(self, session_id: str):
|
|
|
- """关闭对话"""
|
|
|
- await self._post(f"/api/a2a/sessions/{session_id}/close", {})
|
|
|
- if self.ws:
|
|
|
- await self.ws.close()
|
|
|
-```
|
|
|
-
|
|
|
-#### 使用示例
|
|
|
-
|
|
|
-```python
|
|
|
-# 云端Agent使用
|
|
|
-client = A2ASessionClient(
|
|
|
- base_url="https://org.agent.cloud",
|
|
|
- agent_id="cloud-agent-123",
|
|
|
- api_key="ak_xxx"
|
|
|
-)
|
|
|
-
|
|
|
-# 创建对话
|
|
|
-session_id = await client.create_session(
|
|
|
- other_agent="terminal-agent-456",
|
|
|
- initial_message="请分析本地项目"
|
|
|
-)
|
|
|
-
|
|
|
-# 注册消息处理器
|
|
|
-@client.on_message
|
|
|
-async def handle_message(message):
|
|
|
- print(f"收到消息: {message['content']}")
|
|
|
-
|
|
|
- # 根据消息内容决定如何回复
|
|
|
- if "哪个模块" in message['content']:
|
|
|
- await client.send_message(session_id, "重点分析core模块")
|
|
|
- elif "需要我详细说明吗" in message['content']:
|
|
|
- await client.send_message(session_id, "是的,请详细说明")
|
|
|
-
|
|
|
-# 等待对话完成
|
|
|
-await asyncio.sleep(60) # 或其他结束条件
|
|
|
-
|
|
|
-# 关闭对话
|
|
|
-await client.close_session(session_id)
|
|
|
-```
|
|
|
-
|
|
|
-#### 优势
|
|
|
-
|
|
|
-1. **复用Trace机制** - 所有消息管理、压缩、存储都复用
|
|
|
-2. **完整历史** - 对话历史自动保存在Trace中
|
|
|
-3. **实时通信** - WebSocket保证低延迟
|
|
|
-4. **状态追踪** - 利用Trace的状态管理
|
|
|
-5. **可回溯** - 可以查看完整的对话历史
|
|
|
-
|
|
|
-#### 劣势
|
|
|
-
|
|
|
-1. **Trace概念泄露** - 外部需要理解session_id和trace_id的关系
|
|
|
-2. **复杂度** - 需要管理WebSocket连接
|
|
|
-
|
|
|
-### 方案2:独立的对话管理器
|
|
|
-
|
|
|
-#### 核心思想
|
|
|
-
|
|
|
-**创建独立的对话管理系统,不依赖Trace**
|
|
|
-
|
|
|
-```python
|
|
|
-class Conversation:
|
|
|
- """独立的对话对象"""
|
|
|
- conversation_id: str
|
|
|
- participants: List[str]
|
|
|
- messages: List[ConversationMessage]
|
|
|
- status: str # active, waiting, completed
|
|
|
- created_at: datetime
|
|
|
-
|
|
|
-class ConversationMessage:
|
|
|
- """对话消息"""
|
|
|
- message_id: str
|
|
|
- from_agent: str
|
|
|
- to_agent: Optional[str] # None表示广播
|
|
|
- content: str
|
|
|
- timestamp: datetime
|
|
|
- metadata: Dict
|
|
|
-```
|
|
|
-
|
|
|
-#### 优势
|
|
|
-
|
|
|
-1. **概念清晰** - 对话就是对话,不混淆Trace
|
|
|
-2. **轻量级** - 不需要Trace的重量级机制
|
|
|
-3. **灵活** - 可以自定义对话逻辑
|
|
|
-
|
|
|
-#### 劣势
|
|
|
-
|
|
|
-1. **重复实现** - 需要重新实现消息管理、存储、压缩
|
|
|
-2. **不一致** - 与现有Trace机制不一致
|
|
|
-3. **维护成本** - 需要维护两套系统
|
|
|
-
|
|
|
-### 方案3:混合模式(推荐)
|
|
|
-
|
|
|
-#### 核心思想
|
|
|
-
|
|
|
-**对话层(Session)+ 执行层(Trace)分离**
|
|
|
-
|
|
|
-```
|
|
|
-对话层(Session)
|
|
|
- - 管理对话状态
|
|
|
- - 路由消息
|
|
|
- - WebSocket连接
|
|
|
- |
|
|
|
- | 每条消息触发
|
|
|
- ↓
|
|
|
-执行层(Trace)
|
|
|
- - 执行具体任务
|
|
|
- - 调用工具
|
|
|
- - 管理上下文
|
|
|
-```
|
|
|
-
|
|
|
-#### 架构
|
|
|
-
|
|
|
-```python
|
|
|
-class A2ASession:
|
|
|
- """对话会话(轻量级)"""
|
|
|
- session_id: str
|
|
|
- participants: List[str]
|
|
|
- current_speaker: str
|
|
|
- waiting_for: Optional[str]
|
|
|
- context: Dict # 共享上下文
|
|
|
- message_queue: List[Message]
|
|
|
-
|
|
|
- async def send_message(self, from_agent: str, content: str):
|
|
|
- """发送消息"""
|
|
|
- # 1. 添加到消息队列
|
|
|
- self.message_queue.append(Message(from_agent, content))
|
|
|
-
|
|
|
- # 2. 如果需要执行(不是简单问答),创建Trace
|
|
|
- if self._needs_execution(content):
|
|
|
- trace_id = await self._create_execution_trace(content)
|
|
|
- # 执行完成后,结果自动添加到消息队列
|
|
|
- else:
|
|
|
- # 简单消息,直接转发
|
|
|
- await self._forward_message(content, to=self._get_other_agent(from_agent))
|
|
|
-
|
|
|
- def _needs_execution(self, content: str) -> bool:
|
|
|
- """判断是否需要创建Trace执行"""
|
|
|
- # 例如:包含工具调用、复杂任务等
|
|
|
- return "分析" in content or "执行" in content or "查询" in content
|
|
|
-```
|
|
|
-
|
|
|
-#### 优势
|
|
|
-
|
|
|
-1. **分层清晰** - 对话管理和任务执行分离
|
|
|
-2. **灵活** - 简单消息不需要Trace,复杂任务才创建
|
|
|
-3. **高效** - 避免为每条消息创建Trace
|
|
|
-4. **复用** - 复杂任务仍然复用Trace机制
|
|
|
-
|
|
|
-#### 实现示例
|
|
|
-
|
|
|
-```python
|
|
|
-@app.post("/api/a2a/sessions/{session_id}/messages")
|
|
|
-async def send_message(session_id: str, request: SendMessageRequest):
|
|
|
- session = sessions[session_id]
|
|
|
-
|
|
|
- # 1. 判断消息类型
|
|
|
- if request.requires_execution:
|
|
|
- # 需要执行的任务 → 创建Trace
|
|
|
- trace_id = await create_execution_trace(
|
|
|
- task=request.content,
|
|
|
- parent_session=session_id,
|
|
|
- agent_id=request.from_agent
|
|
|
- )
|
|
|
-
|
|
|
- # 执行完成后,结果自动推送到Session
|
|
|
- result = await runner.run_result(
|
|
|
- messages=[{"role": "user", "content": request.content}],
|
|
|
- config=RunConfig(trace_id=trace_id)
|
|
|
- )
|
|
|
-
|
|
|
- # 将结果作为消息发送给对方
|
|
|
- await session.send_message(
|
|
|
- from_agent=request.from_agent,
|
|
|
- content=result["summary"]
|
|
|
- )
|
|
|
- else:
|
|
|
- # 简单消息 → 直接转发
|
|
|
- await session.send_message(
|
|
|
- from_agent=request.from_agent,
|
|
|
- content=request.content
|
|
|
- )
|
|
|
-
|
|
|
- return {"status": "sent"}
|
|
|
-```
|
|
|
-
|
|
|
-## 关键设计决策
|
|
|
-
|
|
|
-### 1. 消息类型分类
|
|
|
-
|
|
|
-| 类型 | 示例 | 处理方式 |
|
|
|
-|------|------|----------|
|
|
|
-| **简单问答** | "你好"、"收到"、"明白了" | 直接转发,不创建Trace |
|
|
|
-| **信息查询** | "当前进度如何?" | 查询Session状态,返回 |
|
|
|
-| **任务请求** | "分析core模块" | 创建Trace执行 |
|
|
|
-| **工具调用** | "读取文件X" | 创建Trace执行 |
|
|
|
-
|
|
|
-### 2. 上下文管理
|
|
|
-
|
|
|
-**Session级上下文(轻量):**
|
|
|
-```python
|
|
|
-session.context = {
|
|
|
- "current_topic": "项目分析",
|
|
|
- "focus_module": "core",
|
|
|
- "previous_results": {...}
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-**Trace级上下文(完整):**
|
|
|
-- 完整的消息历史
|
|
|
-- 工具调用记录
|
|
|
-- Goal树
|
|
|
-
|
|
|
-### 3. 生命周期管理
|
|
|
-
|
|
|
-```python
|
|
|
-# Session生命周期
|
|
|
-created → active → waiting → active → ... → completed/timeout
|
|
|
-
|
|
|
-# Trace生命周期(每个任务)
|
|
|
-created → running → completed
|
|
|
-```
|
|
|
-
|
|
|
-### 4. 超时和重连
|
|
|
-
|
|
|
-```python
|
|
|
-class A2ASession:
|
|
|
- timeout: int = 300 # 5分钟无活动则超时
|
|
|
- last_activity: datetime
|
|
|
-
|
|
|
- async def check_timeout(self):
|
|
|
- if datetime.now() - self.last_activity > timedelta(seconds=self.timeout):
|
|
|
- await self.close(reason="timeout")
|
|
|
-
|
|
|
- async def reconnect(self, agent_id: str, ws: WebSocket):
|
|
|
- """Agent重连"""
|
|
|
- self.ws_connections[agent_id] = ws
|
|
|
- # 发送未读消息
|
|
|
- await self._send_unread_messages(agent_id)
|
|
|
-```
|
|
|
-
|
|
|
-## 实现路线图
|
|
|
-
|
|
|
-### Phase 1:基础对话能力(2-3周)
|
|
|
-
|
|
|
-1. **Session管理**
|
|
|
- - 创建/关闭Session
|
|
|
- - 消息路由
|
|
|
- - WebSocket连接管理
|
|
|
-
|
|
|
-2. **简单消息转发**
|
|
|
- - 不涉及Trace
|
|
|
- - 纯消息传递
|
|
|
-
|
|
|
-3. **客户端SDK**
|
|
|
- - `A2ASessionClient`
|
|
|
- - 消息处理器
|
|
|
-
|
|
|
-### Phase 2:集成Trace执行(2-3周)
|
|
|
-
|
|
|
-1. **任务识别**
|
|
|
- - 判断消息是否需要执行
|
|
|
- - 自动创建Trace
|
|
|
-
|
|
|
-2. **结果集成**
|
|
|
- - Trace结果转换为消息
|
|
|
- - 自动推送给对方
|
|
|
-
|
|
|
-3. **上下文共享**
|
|
|
- - Session上下文传递给Trace
|
|
|
- - Trace结果更新Session上下文
|
|
|
-
|
|
|
-### Phase 3:高级功能(3-4周)
|
|
|
-
|
|
|
-1. **多方对话**
|
|
|
- - 支持3个以上Agent
|
|
|
- - 群聊模式
|
|
|
-
|
|
|
-2. **对话分支**
|
|
|
- - 子对话
|
|
|
- - 并行对话
|
|
|
-
|
|
|
-3. **持久化和恢复**
|
|
|
- - Session持久化
|
|
|
- - 断线重连
|
|
|
-
|
|
|
-## 示例场景
|
|
|
-
|
|
|
-### 场景:云端助理与终端Agent的持续对话
|
|
|
-
|
|
|
-```python
|
|
|
-# 云端助理
|
|
|
-client = A2ASessionClient("https://cloud", "cloud-agent", "ak_xxx")
|
|
|
-
|
|
|
-# 1. 创建对话
|
|
|
-session_id = await client.create_session(
|
|
|
- other_agent="terminal-agent-456",
|
|
|
- initial_message="请分析本地项目"
|
|
|
-)
|
|
|
-
|
|
|
-# 2. 注册消息处理器(自动响应)
|
|
|
-@client.on_message
|
|
|
-async def handle_message(msg):
|
|
|
- content = msg['content']
|
|
|
-
|
|
|
- if "哪个模块" in content:
|
|
|
- # 简单回复,不需要执行
|
|
|
- await client.send_message(session_id, "重点分析core模块")
|
|
|
-
|
|
|
- elif "详细说明" in content:
|
|
|
- # 需要进一步分析,触发执行
|
|
|
- await client.send_message(
|
|
|
- session_id,
|
|
|
- "是的,请详细说明架构设计和关键组件",
|
|
|
- requires_execution=True # 标记需要执行
|
|
|
- )
|
|
|
-
|
|
|
-# 3. 等待对话完成
|
|
|
-await client.wait_for_completion(session_id)
|
|
|
-```
|
|
|
-
|
|
|
-```python
|
|
|
-# 终端Agent
|
|
|
-client = A2ASessionClient("https://terminal", "terminal-agent-456", "ak_yyy")
|
|
|
-
|
|
|
-# 1. 监听新对话
|
|
|
-@client.on_new_session
|
|
|
-async def handle_new_session(session_id, initial_message):
|
|
|
- # 分析项目
|
|
|
- modules = await analyze_project()
|
|
|
-
|
|
|
- # 询问用户
|
|
|
- await client.send_message(
|
|
|
- session_id,
|
|
|
- f"我看到有{len(modules)}个模块:{', '.join(modules)},你想重点分析哪个?"
|
|
|
- )
|
|
|
-
|
|
|
-# 2. 处理后续消息
|
|
|
-@client.on_message
|
|
|
-async def handle_message(msg):
|
|
|
- if "core模块" in msg['content']:
|
|
|
- # 执行分析
|
|
|
- result = await analyze_module("core")
|
|
|
-
|
|
|
- # 返回结果并询问
|
|
|
- await client.send_message(
|
|
|
- msg['session_id'],
|
|
|
- f"core模块使用了{result['architecture']}架构,需要我详细说明吗?"
|
|
|
- )
|
|
|
-
|
|
|
- elif "详细说明" in msg['content']:
|
|
|
- # 深度分析
|
|
|
- details = await deep_analyze("core")
|
|
|
- await client.send_message(
|
|
|
- msg['session_id'],
|
|
|
- f"详细架构:\n{details}"
|
|
|
- )
|
|
|
-```
|
|
|
-
|
|
|
-## 总结
|
|
|
-
|
|
|
-### 推荐方案:混合模式
|
|
|
-
|
|
|
-**对话层(Session):**
|
|
|
-- 轻量级消息路由
|
|
|
-- WebSocket连接管理
|
|
|
-- 简单问答直接转发
|
|
|
-
|
|
|
-**执行层(Trace):**
|
|
|
-- 复杂任务创建Trace
|
|
|
-- 复用所有现有能力
|
|
|
-- 结果自动集成到对话
|
|
|
-
|
|
|
-### 关键优势
|
|
|
-
|
|
|
-1. **灵活** - 简单消息不需要Trace开销
|
|
|
-2. **强大** - 复杂任务复用Trace能力
|
|
|
-3. **清晰** - 对话和执行分层
|
|
|
-4. **高效** - 避免不必要的资源消耗
|
|
|
-
|
|
|
-### 实现优先级
|
|
|
-
|
|
|
-1. **Phase 1** - 基础Session + 简单消息(MVP)
|
|
|
-2. **Phase 2** - 集成Trace执行(核心能力)
|
|
|
-3. **Phase 3** - 高级功能(按需)
|
|
|
-
|