a2a-trace-storage.md 12 KB

A2A跨设备Trace存储方案

问题: 现有Trace存储在本地文件系统,跨设备时如何访问?

场景分析

场景1:云端Agent调用终端Agent

云端Agent (云端存储)
    ↓ 调用
终端Agent (终端存储)
    ↓ 返回 sub_trace_id
云端Agent 想 continue_from sub_trace_id
    ↓ 问题:sub_trace在终端,云端访问不到!

场景2:终端Agent调用云端Agent

终端Agent (终端存储)
    ↓ 调用
云端Agent (云端存储)
    ↓ 返回 sub_trace_id
终端Agent 想 continue_from sub_trace_id
    ↓ 问题:sub_trace在云端,终端访问不到!

方案对比

方案1:远程Trace访问(推荐)

核心思想

Trace ID包含位置信息,通过HTTP API访问远程Trace

Trace ID格式

本地Trace: abc-123
远程Trace: agent://terminal-agent-456/abc-123
          ^^^^^^^^ ^^^^^^^^^^^^^^^^^ ^^^^^^^
          协议      Agent地址          本地ID

实现

# agent/trace/remote_store.py

class RemoteTraceStore:
    """远程Trace存储代理"""

    def __init__(self, agent_url: str, api_key: str):
        self.agent_url = agent_url
        self.api_key = api_key

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """通过HTTP API获取远程Trace"""
        response = await self._get(f"/api/traces/{trace_id}")
        if response:
            return Trace(**response)
        return None

    async def get_main_path_messages(
        self, trace_id: str, head_sequence: int
    ) -> List[Message]:
        """获取远程Trace的消息"""
        response = await self._get(
            f"/api/traces/{trace_id}/messages",
            params={"mode": "main_path", "head_sequence": head_sequence}
        )
        return [Message(**m) for m in response["messages"]]

    async def add_message(self, message: Message) -> str:
        """向远程Trace追加消息(续跑)"""
        response = await self._post(
            f"/api/traces/{message.trace_id}/messages",
            data=message.to_dict()
        )
        return response["message_id"]


# agent/trace/hybrid_store.py

class HybridTraceStore:
    """混合存储:本地 + 远程"""

    def __init__(self, local_store: FileSystemTraceStore):
        self.local_store = local_store
        self.remote_stores = {}  # agent_url -> RemoteTraceStore

    def _parse_trace_id(self, trace_id: str) -> tuple[str, str]:
        """
        解析Trace ID

        返回: (location, local_id)
        - location: "local" 或 agent_url
        - local_id: 本地Trace ID
        """
        if trace_id.startswith("agent://"):
            # agent://terminal-agent-456/abc-123
            parts = trace_id[8:].split("/", 1)
            return parts[0], parts[1]
        else:
            return "local", trace_id

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        """获取Trace(自动路由到本地或远程)"""
        location, local_id = self._parse_trace_id(trace_id)

        if location == "local":
            return await self.local_store.get_trace(local_id)
        else:
            # 远程Trace
            if location not in self.remote_stores:
                # 创建远程存储代理
                self.remote_stores[location] = RemoteTraceStore(
                    agent_url=f"https://{location}",
                    api_key=self._get_api_key(location)
                )
            return await self.remote_stores[location].get_trace(local_id)

    async def add_message(self, message: Message) -> str:
        """添加消息(自动路由)"""
        location, local_id = self._parse_trace_id(message.trace_id)

        if location == "local":
            return await self.local_store.add_message(message)
        else:
            # 向远程Trace追加消息
            return await self.remote_stores[location].add_message(message)

agent工具修改

@tool(description="创建 Agent 执行任务")
async def agent(
    task: Union[str, List[str]],
    continue_from: Optional[str] = None,  # 支持远程Trace ID
    agent_url: Optional[str] = None,      # 新增:远程Agent地址
    context: Optional[dict] = None,
) -> Dict[str, Any]:
    """
    创建 Agent 执行任务

    Args:
        task: 任务描述
        continue_from: 继续已有trace(支持远程Trace ID)
        agent_url: 远程Agent地址(如果调用远程Agent)
        context: 框架注入的上下文
    """
    store = context.get("store")  # HybridTraceStore

    if agent_url:
        # 调用远程Agent
        result = await _call_remote_agent(agent_url, task, continue_from)
        # 返回远程Trace ID
        remote_trace_id = f"agent://{agent_url}/{result['sub_trace_id']}"
        return {
            **result,
            "sub_trace_id": remote_trace_id,
            "remote": True
        }
    else:
        # 本地执行(现有逻辑)
        if continue_from:
            # 可能是远程Trace,HybridStore会自动处理
            existing = await store.get_trace(continue_from)
            if not existing:
                return {"status": "failed", "error": "Trace not found"}

        # ... 现有逻辑

使用示例

# 云端Agent调用终端Agent
result1 = await agent(
    task="分析本地项目",
    agent_url="terminal-agent-456.local"
)
# 返回: {"sub_trace_id": "agent://terminal-agent-456.local/abc-123"}

# 继续对话(自动访问远程Trace)
result2 = await agent(
    task="重点分析core模块",
    continue_from=result1["sub_trace_id"],  # 远程Trace ID
    agent_url="terminal-agent-456.local"
)
# HybridStore自动通过HTTP API访问远程Trace

优势

  1. 透明 - Agent不需要关心Trace在哪里
  2. 灵活 - 支持本地和远程Trace
  3. 简单 - 只需要在Trace ID中编码位置信息
  4. 兼容 - 现有本地Trace不受影响

劣势

  1. 网络延迟 - 访问远程Trace需要HTTP请求
  2. 依赖网络 - 远程Agent必须在线

方案2:中心化Trace存储

核心思想

所有Agent共享同一个Trace存储(数据库)

架构

云端Agent ──┐
            ├──> 中心化Trace存储(PostgreSQL/MongoDB)
终端Agent ──┘

实现

# agent/trace/db_store.py

class DatabaseTraceStore:
    """数据库Trace存储"""

    def __init__(self, db_url: str):
        self.db = connect(db_url)

    async def create_trace(self, trace: Trace) -> str:
        await self.db.traces.insert_one(trace.to_dict())
        return trace.trace_id

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        doc = await self.db.traces.find_one({"trace_id": trace_id})
        if doc:
            return Trace(**doc)
        return None

    # ... 其他方法

配置

# config/storage.yaml
trace_store:
  type: database
  url: postgresql://user:pass@db.cloud/traces
  # 或
  url: mongodb://db.cloud/traces

优势

  1. 简单 - 所有Agent访问同一个存储,无需特殊处理
  2. 一致性 - 数据强一致性
  3. 查询能力 - 可以跨Trace查询和分析

劣势

  1. 依赖中心 - 需要中心化数据库
  2. 网络依赖 - 终端Agent必须能访问数据库
  3. 隐私问题 - 终端数据存储在云端
  4. 迁移成本 - 需要从文件系统迁移到数据库

方案3:Trace同步/缓存

核心思想

按需同步远程Trace到本地

实现

class CachedRemoteStore:
    """带缓存的远程存储"""

    def __init__(self, local_store, remote_url):
        self.local_store = local_store
        self.remote_url = remote_url
        self.cache = {}  # trace_id -> Trace

    async def get_trace(self, trace_id: str) -> Optional[Trace]:
        # 1. 检查本地
        trace = await self.local_store.get_trace(trace_id)
        if trace:
            return trace

        # 2. 检查缓存
        if trace_id in self.cache:
            return self.cache[trace_id]

        # 3. 从远程获取并缓存
        trace = await self._fetch_remote(trace_id)
        if trace:
            self.cache[trace_id] = trace
            # 可选:持久化到本地
            await self.local_store.create_trace(trace)
        return trace

    async def add_message(self, message: Message) -> str:
        # 同时写入本地和远程
        local_id = await self.local_store.add_message(message)
        await self._sync_to_remote(message)
        return local_id

优势

  1. 性能 - 本地缓存减少网络请求
  2. 离线能力 - 缓存后可以离线访问

劣势

  1. 一致性 - 缓存可能过期
  2. 复杂度 - 需要处理同步和冲突
  3. 存储开销 - 需要额外的本地存储

方案4:混合模式(推荐)

核心思想

结合方案1和方案2的优点

  • 组织内部:使用中心化存储(数据库)
  • 跨组织:使用远程访问(HTTP API)

架构

组织内部:
云端Agent ──┐
            ├──> 组织数据库
职能Agent ──┘

跨组织:
组织A Agent <──HTTP API──> 组织B Agent

配置

# config/storage.yaml
trace_store:
  # 组织内部使用数据库
  internal:
    type: database
    url: postgresql://org-db/traces

  # 跨组织使用远程访问
  external:
    type: remote
    agents:
      - url: https://partner-org.com
        api_key: xxx

优势

  1. 灵活 - 根据场景选择最优方案
  2. 性能 - 组织内部低延迟
  3. 隐私 - 跨组织数据不共享存储

推荐方案

MVP阶段(Phase 1-2):方案1 - 远程Trace访问

理由:

  1. 最小改动 - 只需要添加RemoteTraceStore和HybridStore
  2. 灵活 - 支持任意拓扑
  3. 隐私友好 - 数据不离开设备
  4. 渐进式 - 可以逐步迁移到方案4

实现步骤:

  1. 实现RemoteTraceStore(通过HTTP API访问)
  2. 实现HybridTraceStore(路由到本地或远程)
  3. 修改agent工具支持agent_url参数
  4. 在Trace ID中编码位置信息

长期(Phase 3+):方案4 - 混合模式

理由:

  1. 组织内部高效 - 数据库存储,低延迟
  2. 跨组织灵活 - HTTP API,保护隐私
  3. 可扩展 - 支持复杂场景

实现细节

1. Trace ID格式

# 本地Trace
"abc-123"

# 远程Trace(完整格式)
"agent://terminal-agent-456.local:8000/abc-123"

# 远程Trace(简化格式,使用注册的agent_id)
"@terminal-agent-456/abc-123"

2. Agent注册表

# config/agents.yaml
agents:
  terminal-agent-456:
    url: https://terminal-agent-456.local:8000
    api_key: ak_xxx
    type: terminal

3. API端点

# 必需的API端点(用于远程访问)
GET  /api/traces/{trace_id}                    # 获取Trace
GET  /api/traces/{trace_id}/messages           # 获取消息
POST /api/traces/{trace_id}/run                # 续跑
POST /api/traces/{trace_id}/messages           # 追加消息

4. 认证和授权

# 每个Agent有自己的API Key
headers = {
    "Authorization": f"Bearer {api_key}"
}

# 权限检查
if trace.uid != request.user_id:
    raise Forbidden("Cannot access other user's trace")

5. 性能优化

# 批量获取消息
GET /api/traces/{trace_id}/messages?limit=100&offset=0

# 增量同步
GET /api/traces/{trace_id}/messages?since_sequence=50

# 压缩传输
headers = {"Accept-Encoding": "gzip"}

总结

推荐路线:

  1. Phase 1(MVP) - 实现方案1(远程Trace访问)

    • 最小改动
    • 快速验证跨设备A2A
    • 2-3周
  2. Phase 2 - 优化和增强

    • 添加缓存
    • 批量API
    • 性能优化
    • 2-3周
  3. Phase 3(可选) - 迁移到方案4(混合模式)

    • 组织内部使用数据库
    • 跨组织使用远程访问
    • 4-6周

关键优势:

  • 渐进式实现
  • 最小化风险
  • 保持灵活性