# A2A跨设备Trace存储方案 **问题:** 现有Trace存储在本地文件系统,跨设备时如何访问? ## 场景分析 ### 场景1:云端Agent调用终端Agent ``` 云端Agent (云端存储) ↓ 调用 终端Agent (终端存储) ↓ 返回 sub_trace_id 云端Agent 想 continue_from sub_trace_id ↓ 问题:sub_trace在终端,云端访问不到! ``` ### 场景2:终端Agent调用云端Agent ``` 终端Agent (终端存储) ↓ 调用 云端Agent (云端存储) ↓ 返回 sub_trace_id 终端Agent 想 continue_from sub_trace_id ↓ 问题:sub_trace在云端,终端访问不到! ``` ## 方案对比 ### 方案1:远程Trace访问(推荐) #### 核心思想 **Trace ID包含位置信息,通过HTTP API访问远程Trace** #### Trace ID格式 ``` 本地Trace: abc-123 远程Trace: agent://terminal-agent-456/abc-123 ^^^^^^^^ ^^^^^^^^^^^^^^^^^ ^^^^^^^ 协议 Agent地址 本地ID ``` #### 实现 ```python # agent/trace/remote_store.py class RemoteTraceStore: """远程Trace存储代理""" def __init__(self, agent_url: str, api_key: str): self.agent_url = agent_url self.api_key = api_key async def get_trace(self, trace_id: str) -> Optional[Trace]: """通过HTTP API获取远程Trace""" response = await self._get(f"/api/traces/{trace_id}") if response: return Trace(**response) return None async def get_main_path_messages( self, trace_id: str, head_sequence: int ) -> List[Message]: """获取远程Trace的消息""" response = await self._get( f"/api/traces/{trace_id}/messages", params={"mode": "main_path", "head_sequence": head_sequence} ) return [Message(**m) for m in response["messages"]] async def add_message(self, message: Message) -> str: """向远程Trace追加消息(续跑)""" response = await self._post( f"/api/traces/{message.trace_id}/messages", data=message.to_dict() ) return response["message_id"] # agent/trace/hybrid_store.py class HybridTraceStore: """混合存储:本地 + 远程""" def __init__(self, local_store: FileSystemTraceStore): self.local_store = local_store self.remote_stores = {} # agent_url -> RemoteTraceStore def _parse_trace_id(self, trace_id: str) -> tuple[str, str]: """ 解析Trace ID 返回: (location, local_id) - location: "local" 或 agent_url - local_id: 本地Trace ID """ if trace_id.startswith("agent://"): # agent://terminal-agent-456/abc-123 parts = trace_id[8:].split("/", 1) return parts[0], parts[1] else: return "local", trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取Trace(自动路由到本地或远程)""" location, local_id = self._parse_trace_id(trace_id) if location == "local": return await self.local_store.get_trace(local_id) else: # 远程Trace if location not in self.remote_stores: # 创建远程存储代理 self.remote_stores[location] = RemoteTraceStore( agent_url=f"https://{location}", api_key=self._get_api_key(location) ) return await self.remote_stores[location].get_trace(local_id) async def add_message(self, message: Message) -> str: """添加消息(自动路由)""" location, local_id = self._parse_trace_id(message.trace_id) if location == "local": return await self.local_store.add_message(message) else: # 向远程Trace追加消息 return await self.remote_stores[location].add_message(message) ``` #### agent工具修改 ```python @tool(description="创建 Agent 执行任务") async def agent( task: Union[str, List[str]], continue_from: Optional[str] = None, # 支持远程Trace ID agent_url: Optional[str] = None, # 新增:远程Agent地址 context: Optional[dict] = None, ) -> Dict[str, Any]: """ 创建 Agent 执行任务 Args: task: 任务描述 continue_from: 继续已有trace(支持远程Trace ID) agent_url: 远程Agent地址(如果调用远程Agent) context: 框架注入的上下文 """ store = context.get("store") # HybridTraceStore if agent_url: # 调用远程Agent result = await _call_remote_agent(agent_url, task, continue_from) # 返回远程Trace ID remote_trace_id = f"agent://{agent_url}/{result['sub_trace_id']}" return { **result, "sub_trace_id": remote_trace_id, "remote": True } else: # 本地执行(现有逻辑) if continue_from: # 可能是远程Trace,HybridStore会自动处理 existing = await store.get_trace(continue_from) if not existing: return {"status": "failed", "error": "Trace not found"} # ... 现有逻辑 ``` #### 使用示例 ```python # 云端Agent调用终端Agent result1 = await agent( task="分析本地项目", agent_url="terminal-agent-456.local" ) # 返回: {"sub_trace_id": "agent://terminal-agent-456.local/abc-123"} # 继续对话(自动访问远程Trace) result2 = await agent( task="重点分析core模块", continue_from=result1["sub_trace_id"], # 远程Trace ID agent_url="terminal-agent-456.local" ) # HybridStore自动通过HTTP API访问远程Trace ``` #### 优势 1. **透明** - Agent不需要关心Trace在哪里 2. **灵活** - 支持本地和远程Trace 3. **简单** - 只需要在Trace ID中编码位置信息 4. **兼容** - 现有本地Trace不受影响 #### 劣势 1. **网络延迟** - 访问远程Trace需要HTTP请求 2. **依赖网络** - 远程Agent必须在线 --- ### 方案2:中心化Trace存储 #### 核心思想 **所有Agent共享同一个Trace存储(数据库)** #### 架构 ``` 云端Agent ──┐ ├──> 中心化Trace存储(PostgreSQL/MongoDB) 终端Agent ──┘ ``` #### 实现 ```python # agent/trace/db_store.py class DatabaseTraceStore: """数据库Trace存储""" def __init__(self, db_url: str): self.db = connect(db_url) async def create_trace(self, trace: Trace) -> str: await self.db.traces.insert_one(trace.to_dict()) return trace.trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: doc = await self.db.traces.find_one({"trace_id": trace_id}) if doc: return Trace(**doc) return None # ... 其他方法 ``` #### 配置 ```yaml # config/storage.yaml trace_store: type: database url: postgresql://user:pass@db.cloud/traces # 或 url: mongodb://db.cloud/traces ``` #### 优势 1. **简单** - 所有Agent访问同一个存储,无需特殊处理 2. **一致性** - 数据强一致性 3. **查询能力** - 可以跨Trace查询和分析 #### 劣势 1. **依赖中心** - 需要中心化数据库 2. **网络依赖** - 终端Agent必须能访问数据库 3. **隐私问题** - 终端数据存储在云端 4. **迁移成本** - 需要从文件系统迁移到数据库 --- ### 方案3:Trace同步/缓存 #### 核心思想 **按需同步远程Trace到本地** #### 实现 ```python class CachedRemoteStore: """带缓存的远程存储""" def __init__(self, local_store, remote_url): self.local_store = local_store self.remote_url = remote_url self.cache = {} # trace_id -> Trace async def get_trace(self, trace_id: str) -> Optional[Trace]: # 1. 检查本地 trace = await self.local_store.get_trace(trace_id) if trace: return trace # 2. 检查缓存 if trace_id in self.cache: return self.cache[trace_id] # 3. 从远程获取并缓存 trace = await self._fetch_remote(trace_id) if trace: self.cache[trace_id] = trace # 可选:持久化到本地 await self.local_store.create_trace(trace) return trace async def add_message(self, message: Message) -> str: # 同时写入本地和远程 local_id = await self.local_store.add_message(message) await self._sync_to_remote(message) return local_id ``` #### 优势 1. **性能** - 本地缓存减少网络请求 2. **离线能力** - 缓存后可以离线访问 #### 劣势 1. **一致性** - 缓存可能过期 2. **复杂度** - 需要处理同步和冲突 3. **存储开销** - 需要额外的本地存储 --- ### 方案4:混合模式(推荐) #### 核心思想 **结合方案1和方案2的优点** - **组织内部**:使用中心化存储(数据库) - **跨组织**:使用远程访问(HTTP API) #### 架构 ``` 组织内部: 云端Agent ──┐ ├──> 组织数据库 职能Agent ──┘ 跨组织: 组织A Agent <──HTTP API──> 组织B Agent ``` #### 配置 ```yaml # config/storage.yaml trace_store: # 组织内部使用数据库 internal: type: database url: postgresql://org-db/traces # 跨组织使用远程访问 external: type: remote agents: - url: https://partner-org.com api_key: xxx ``` #### 优势 1. **灵活** - 根据场景选择最优方案 2. **性能** - 组织内部低延迟 3. **隐私** - 跨组织数据不共享存储 --- ## 推荐方案 ### MVP阶段(Phase 1-2):方案1 - 远程Trace访问 **理由:** 1. **最小改动** - 只需要添加RemoteTraceStore和HybridStore 2. **灵活** - 支持任意拓扑 3. **隐私友好** - 数据不离开设备 4. **渐进式** - 可以逐步迁移到方案4 **实现步骤:** 1. 实现RemoteTraceStore(通过HTTP API访问) 2. 实现HybridTraceStore(路由到本地或远程) 3. 修改agent工具支持agent_url参数 4. 在Trace ID中编码位置信息 ### 长期(Phase 3+):方案4 - 混合模式 **理由:** 1. **组织内部高效** - 数据库存储,低延迟 2. **跨组织灵活** - HTTP API,保护隐私 3. **可扩展** - 支持复杂场景 ## 实现细节 ### 1. Trace ID格式 ```python # 本地Trace "abc-123" # 远程Trace(完整格式) "agent://terminal-agent-456.local:8000/abc-123" # 远程Trace(简化格式,使用注册的agent_id) "@terminal-agent-456/abc-123" ``` ### 2. Agent注册表 ```yaml # config/agents.yaml agents: terminal-agent-456: url: https://terminal-agent-456.local:8000 api_key: ak_xxx type: terminal ``` ### 3. API端点 ```python # 必需的API端点(用于远程访问) GET /api/traces/{trace_id} # 获取Trace GET /api/traces/{trace_id}/messages # 获取消息 POST /api/traces/{trace_id}/run # 续跑 POST /api/traces/{trace_id}/messages # 追加消息 ``` ### 4. 认证和授权 ```python # 每个Agent有自己的API Key headers = { "Authorization": f"Bearer {api_key}" } # 权限检查 if trace.uid != request.user_id: raise Forbidden("Cannot access other user's trace") ``` ### 5. 性能优化 ```python # 批量获取消息 GET /api/traces/{trace_id}/messages?limit=100&offset=0 # 增量同步 GET /api/traces/{trace_id}/messages?since_sequence=50 # 压缩传输 headers = {"Accept-Encoding": "gzip"} ``` ## 总结 **推荐路线:** 1. **Phase 1(MVP)** - 实现方案1(远程Trace访问) - 最小改动 - 快速验证跨设备A2A - 2-3周 2. **Phase 2** - 优化和增强 - 添加缓存 - 批量API - 性能优化 - 2-3周 3. **Phase 3(可选)** - 迁移到方案4(混合模式) - 组织内部使用数据库 - 跨组织使用远程访问 - 4-6周 **关键优势:** - 渐进式实现 - 最小化风险 - 保持灵活性