a2a-cross-device.md 14 KB

Agent2Agent 跨设备通信方案

更新日期: 2026-03-03

场景分类

场景1:云端Agent ↔ 云端Agent

示例: 通用助理 → 爬虫运维Agent

  • 部署: 同一服务器/进程
  • 通信: 现有agent工具(内存调用)
  • 不需要HTTP接口

场景2:云端Agent ↔ 终端Agent ⭐

示例:

  • 云端通用助理 → 用户笔记本上的代码分析Agent
  • 用户终端Agent → 云端知识库Agent

需求:

  • 云端Agent需要调用终端Agent的能力
    • 访问用户本地文件
    • 执行本地命令
    • 使用本地工具(IDE、浏览器等)
  • 终端Agent需要调用云端Agent
    • 访问组织知识库
    • 查询共享资源
    • 协作任务

挑战:

  • 网络连接(终端可能在NAT后)
  • 认证和授权
  • 数据安全

需要HTTP接口!

场景3:终端Agent ↔ 终端Agent

示例: 团队成员的Agent互相协作

  • 可能性: 较小,但可能存在
  • 通信: 通过云端中转或P2P

架构方案

方案A:基于现有API封装(推荐)

架构图

云端Agent                    终端Agent
    |                            |
    | HTTP POST /api/a2a/call    |
    |--------------------------->|
    |                            |
    |    创建Trace + 执行任务     |
    |                            |
    | WebSocket /api/a2a/watch   |
    |<---------------------------|
    |    实时进度推送              |
    |                            |
    | HTTP GET /api/a2a/result   |
    |--------------------------->|
    |    返回最终结果              |
    |<---------------------------|

核心设计

1. 简化的A2A端点

# agent/api/a2a.py

@app.post("/api/a2a/call")
async def a2a_call(request: A2ACallRequest):
    """
    简化的Agent调用接口

    请求:
    {
        "task": "分析这个项目的架构",
        "agent_type": "explore",  # 可选
        "context": {              # 可选
            "files": [...],
            "previous_results": {...}
        },
        "callback_url": "https://..."  # 可选,完成后回调
    }

    响应:
    {
        "call_id": "a2a-xxx",
        "status": "started",
        "watch_url": "ws://host/api/a2a/watch/a2a-xxx"
    }
    """
    # 1. 认证和授权检查
    # 2. 转换为内部格式
    messages = [{"role": "user", "content": request.task}]
    if request.context:
        messages[0]["content"] += f"\n\n上下文:{json.dumps(request.context)}"

    # 3. 调用现有runner(复用所有逻辑)
    config = RunConfig(
        agent_type=request.agent_type or "default",
        trace_id=None  # 新建
    )

    # 4. 后台执行
    task_id = await start_background_task(runner.run(messages, config))

    # 5. 返回call_id(映射到trace_id)
    return {
        "call_id": f"a2a-{task_id}",
        "status": "started",
        "watch_url": f"ws://{host}/api/a2a/watch/{task_id}"
    }


@app.websocket("/api/a2a/watch/{call_id}")
async def a2a_watch(websocket: WebSocket, call_id: str):
    """
    实时监听执行进度(复用现有WebSocket)

    推送消息:
    {
        "type": "progress",
        "data": {
            "goal": "正在分析文件结构",
            "progress": 0.3
        }
    }

    {
        "type": "completed",
        "data": {
            "result": "...",
            "stats": {...}
        }
    }
    """
    # 复用现有的 /api/traces/{id}/watch 逻辑
    trace_id = call_id.replace("a2a-", "")
    await watch_trace(websocket, trace_id)


@app.get("/api/a2a/result/{call_id}")
async def a2a_result(call_id: str):
    """
    获取执行结果

    响应:
    {
        "status": "completed",
        "result": {
            "summary": "...",
            "details": {...}
        },
        "stats": {
            "duration_ms": 5000,
            "tokens": 1500,
            "cost": 0.05
        }
    }
    """
    trace_id = call_id.replace("a2a-", "")
    trace = await store.get_trace(trace_id)
    messages = await store.get_main_path_messages(trace_id, trace.head_sequence)

    # 提取最后的assistant消息作为结果
    result = extract_final_result(messages)

    return {
        "status": trace.status,
        "result": result,
        "stats": {
            "duration_ms": trace.total_duration_ms,
            "tokens": trace.total_tokens,
            "cost": trace.total_cost
        }
    }

2. 客户端SDK(终端Agent使用)

# agent/client/a2a_client.py

class A2AClient:
    """A2A客户端,用于调用远程Agent"""

    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key

    async def call(
        self,
        task: str,
        agent_type: Optional[str] = None,
        context: Optional[dict] = None,
        wait: bool = True
    ) -> Dict[str, Any]:
        """
        调用远程Agent

        Args:
            task: 任务描述
            agent_type: Agent类型
            context: 上下文信息
            wait: 是否等待完成(False则立即返回call_id)
        """
        # 1. 发起调用
        response = await self._post("/api/a2a/call", {
            "task": task,
            "agent_type": agent_type,
            "context": context
        })

        call_id = response["call_id"]

        if not wait:
            return {"call_id": call_id, "status": "started"}

        # 2. 等待完成(通过WebSocket或轮询)
        result = await self._wait_for_completion(call_id)
        return result

    async def _wait_for_completion(self, call_id: str):
        """通过WebSocket监听完成"""
        async with websockets.connect(
            f"{self.ws_url}/api/a2a/watch/{call_id}",
            extra_headers={"Authorization": f"Bearer {self.api_key}"}
        ) as ws:
            async for message in ws:
                data = json.loads(message)
                if data["type"] == "completed":
                    return data["data"]
                elif data["type"] == "failed":
                    raise A2AError(data["data"]["error"])

    async def get_result(self, call_id: str) -> Dict[str, Any]:
        """获取执行结果(轮询方式)"""
        return await self._get(f"/api/a2a/result/{call_id}")

3. 作为工具集成到Agent

# agent/tools/builtin/remote_agent.py

@tool(description="调用远程Agent执行任务")
async def remote_agent(
    task: str,
    agent_url: str,
    agent_type: Optional[str] = None,
    context: Optional[dict] = None,
    ctx: ToolContext = None
) -> ToolResult:
    """
    调用远程Agent(云端或其他终端)

    Args:
        task: 任务描述
        agent_url: 远程Agent的URL
        agent_type: Agent类型
        context: 上下文信息
    """
    # 1. 创建客户端
    client = A2AClient(
        base_url=agent_url,
        api_key=ctx.config.get("remote_agent_api_key")
    )

    # 2. 调用远程Agent
    result = await client.call(
        task=task,
        agent_type=agent_type,
        context=context,
        wait=True
    )

    # 3. 返回结果
    return ToolResult(
        title=f"远程Agent完成: {task[:50]}",
        output=result["result"]["summary"],
        long_term_memory=f"调用远程Agent完成任务,耗时{result['stats']['duration_ms']}ms"
    )

优势

  1. 复用现有逻辑 - 所有Trace、Message、Goal管理都复用
  2. 简单易用 - 外部只需要提供task,不需要理解Trace概念
  3. 完整功能 - 继承所有现有能力(压缩、续跑、回溯等)
  4. 渐进式 - 可以先实现基础版本,逐步增强

方案B:实现标准A2A协议

架构

# agent/api/a2a_standard.py

@app.post("/api/a2a/v1/tasks")
async def create_task(request: A2ATaskRequest):
    """
    符合Google A2A协议的端点

    请求格式(A2A标准):
    {
        "header": {
            "message_id": "msg_001",
            "timestamp": "2026-03-03T10:30:00Z"
        },
        "task": {
            "description": "分析项目架构",
            "capabilities_required": ["file_read", "code_analysis"]
        },
        "context": {...}
    }
    """
    # 转换A2A格式到内部格式
    # 调用runner
    # 转换结果为A2A格式

优势

  1. 标准化 - 符合行业标准
  2. 互操作性 - 可以与其他A2A兼容的Agent通信
  3. 未来兼容 - 跟随行业发展

劣势

  1. 复杂度高 - 需要实现完整的A2A协议
  2. 过度设计 - MVP阶段可能不需要
  3. 标准未稳定 - A2A协议还在演进中

网络拓扑

拓扑1:云端中心化

        云端Gateway
            |
    +-------+-------+
    |       |       |
  通用    爬虫    成本
  助理    运维    统计
    |
    +-- 调用终端Agent(HTTP)
            |
        用户终端Agent

特点:

  • 云端Agent作为中心
  • 终端Agent需要暴露HTTP端点
  • 需要处理NAT穿透

拓扑2:终端主动连接

用户终端Agent
    |
    | WebSocket长连接
    |
云端Gateway
    |
    +-- 通过连接推送任务

特点:

  • 终端Agent主动连接云端
  • 云端通过WebSocket推送任务
  • 无需NAT穿透
  • 类似飞书Bot的模式

拓扑3:混合模式(推荐)

云端Agent <--HTTP--> 云端Agent(内存调用)
    |
    | WebSocket双向
    |
终端Agent <--HTTP--> 终端Agent(如果需要)

特点:

  • 云端Agent间用内存调用
  • 云端↔终端用WebSocket
  • 终端间可选HTTP(通过云端中转)

认证和授权

1. API Key认证

# 终端Agent启动时注册
POST /api/a2a/register
{
    "agent_id": "user123-laptop",
    "capabilities": ["file_read", "bash", "browser"],
    "device_info": {...}
}

# 返回API Key
{
    "api_key": "ak_xxx",
    "agent_id": "user123-laptop"
}

# 后续调用携带API Key
Authorization: Bearer ak_xxx

2. 权限控制

# config/a2a_permissions.yaml
agents:
  user123-laptop:
    can_access:
      - conversations/user123/*
      - resources/public/*
    cannot_access:
      - conversations/other_users/*
      - agents/*/memory/*

3. 数据隔离

  • 终端Agent只能访问自己用户的数据
  • 云端Agent可以访问组织共享数据
  • 通过Gateway强制执行

实现路线图

Phase 1:基础A2A接口(MVP)

目标: 云端Agent ↔ 终端Agent基础通信

实现:

  1. /api/a2a/call - 简化调用接口
  2. /api/a2a/watch - WebSocket监听
  3. /api/a2a/result - 获取结果
  4. A2AClient - 客户端SDK
  5. remote_agent - 工具集成

时间: 1-2周

Phase 2:增强功能

目标: 完善A2A能力

实现:

  1. 认证和授权
  2. 数据隔离
  3. 成本控制
  4. 审计日志
  5. 错误处理和重试

时间: 2-3周

Phase 3:标准化(可选)

目标: 兼容A2A标准协议

实现:

  1. 实现Google A2A协议
  2. 能力协商机制
  3. 与其他A2A Agent互操作

时间: 3-4周

示例场景

场景:云端助理调用终端Agent分析代码

1. 用户在飞书问: "帮我分析一下我笔记本上的项目架构"

2. 云端通用助理:

# 识别需要访问用户终端
result = await remote_agent(
    task="分析 /Users/sunlit/Code/Agent 的项目架构",
    agent_url="https://user123-laptop.agent.local",
    agent_type="explore"
)

3. 终端Agent:

  • 接收任务
  • 创建本地Trace
  • 使用本地工具(read, glob, grep)
  • 分析代码结构
  • 返回结果

4. 云端助理:

  • 接收终端Agent结果
  • 整合到回复中
  • 通过飞书返回给用户

场景:终端Agent查询云端知识库

1. 用户在终端运行:

agent-cli ask "公司的爬虫部署规范是什么?"

2. 终端Agent:

# 识别需要查询组织知识库
result = await remote_agent(
    task="查询爬虫部署规范",
    agent_url="https://org.agent.cloud",
    agent_type="knowledge_query"
)

3. 云端知识库Agent:

  • 查询resources/docs/
  • 查询experiences数据库
  • 返回相关文档

4. 终端Agent:

  • 接收结果
  • 展示给用户

技术细节

1. NAT穿透方案

方案A:终端主动连接(推荐)

# 终端Agent启动时建立WebSocket长连接
ws = await websockets.connect("wss://org.agent.cloud/api/a2a/connect")

# 云端通过连接推送任务
await ws.send(json.dumps({
    "type": "task",
    "task_id": "xxx",
    "data": {...}
}))

# 终端执行并返回结果
result = await execute_task(task)
await ws.send(json.dumps({
    "type": "result",
    "task_id": "xxx",
    "data": result
}))

方案B:使用ngrok等隧道服务

  • 终端Agent启动时创建隧道
  • 注册公网URL到云端
  • 云端通过公网URL调用

2. 消息序列化

# 简化格式(内部使用)
{
    "task": "string",
    "context": {...}
}

# 标准A2A格式(外部互操作)
{
    "header": {...},
    "task": {...},
    "capabilities": [...]
}

# 自动转换
def to_a2a_format(internal_msg):
    return {
        "header": generate_header(),
        "task": {"description": internal_msg["task"]},
        "context": internal_msg.get("context", {})
    }

3. 流式响应

# 支持流式返回中间结果
@app.websocket("/api/a2a/stream/{call_id}")
async def a2a_stream(websocket: WebSocket, call_id: str):
    async for event in runner.run(...):
        if isinstance(event, Message):
            await websocket.send_json({
                "type": "message",
                "data": event.to_dict()
            })

安全考虑

  1. 认证: API Key + JWT
  2. 授权: 基于角色的访问控制
  3. 加密: HTTPS/WSS强制
  4. 限流: 防止滥用
  5. 审计: 所有A2A调用记录
  6. 隔离: 数据访问严格隔离

成本控制

# 每次A2A调用记录成本
{
    "call_id": "a2a-xxx",
    "caller": "user123-laptop",
    "callee": "org-cloud-agent",
    "tokens": 1500,
    "cost": 0.05,
    "duration_ms": 5000
}

# 限额检查
if user_cost_today > user_limit:
    raise CostLimitExceeded()

总结

推荐方案

Phase 1(MVP): 方案A - 基于现有API封装

  • 简单、快速
  • 复用所有现有逻辑
  • 满足跨设备通信需求

Phase 3+: 可选实现标准A2A协议

  • 如果需要与外部系统互操作
  • 跟随行业标准发展

关键优势

  1. 复用现有能力 - Trace、Message、Goal、压缩等
  2. 渐进式实现 - 先简单后复杂
  3. 灵活扩展 - 可以逐步增强功能
  4. 标准兼容 - 未来可以支持A2A标准