a2a-im.md 19 KB

A2A IM:Agent 即时通讯系统

更新日期: 2026-03-04

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖
  2. 文档分层,链接代码 - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议,或大量代码;决策依据或修改日志若有必要,可在docs/decisions.md另行记录

文档说明

本文档描述 Agent 间即时通讯(A2A IM)系统的架构和实现。

相关文档


系统概述

A2A IM 是一个任务导向的 Agent 即时通讯系统,支持:

  • Agent 间消息传递(点对点、通过 Gateway)
  • 活跃协作者管理(当前任务)
  • 全局联系人管理(历史记录)
  • 在线状态查询
  • 对话历史追溯

与传统 IM 的区别

  • 任务导向(非纯聊天)
  • 长时间处理(分钟到小时)
  • 工具调用和执行记录
  • 完整的 Trace 追溯

架构层次关系

A2A IM 在整体架构中的定位:

┌─────────────────────────────────────────────────────────────┐
│ Enterprise Layer(组织级)- 可选                              │
│ - 认证和授权(飞书 OAuth、API Key、JWT)                      │
│ - 审计和监控(操作日志、成本记录、安全事件)                    │
│ - 多租户和权限控制(角色验证、资源访问控制)                    │
│ - 成本管理和限额(用户级/组织级限额、超限告警)                 │
│                                                              │
│ 实现位置: gateway/enterprise/                                │
│ 文档: gateway/docs/enterprise/overview.md                   │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ A2A IM Gateway(通讯层)★ 本文档                             │
│ - Agent 注册和发现(Registry)                                │
│ - 消息路由(Gateway Router)                                  │
│ - 活跃协作者管理(Collaborators)                             │
│ - 在线状态管理(Heartbeat)                                   │
│ - 联系人管理(ContactStore)                                  │
│                                                              │
│ 实现位置: gateway/core/                                       │
│ 文档: docs/a2a-im.md(本文档)                                │
└─────────────────────────────────────────────────────────────┘
         ↕ 使用(单向依赖)
┌─────────────────────────────────────────────────────────────┐
│ Agent Core(核心层)                                          │
│ - Trace、Message、Goal 管理                                  │
│ - 工具系统(文件、命令、网络、浏览器)                          │
│ - LLM 集成(Gemini、OpenRouter、Yescode)                    │
│ - Skills 和 Memory(跨会话知识)                              │
│ - 子 Agent 机制(agent 工具)                                 │
│                                                              │
│ 实现位置: agent/                                              │
│ 文档: docs/README.md                                         │
└─────────────────────────────────────────────────────────────┘

层次说明

Agent Core(核心层)

  • 提供单个 Agent 的执行能力
  • 管理 Trace、Message、Goal
  • 提供工具系统和 LLM 集成
  • 支持子 Agent 创建(通过 agent 工具)
  • 独立部署:可以不依赖 Gateway 运行

A2A IM Gateway(通讯层)

  • 与 Agent Core 并列,独立的系统
  • 提供 Agent 间通讯能力
  • 管理 Agent 注册和在线状态
  • 路由消息到目标 Agent
  • 维护活跃协作者和联系人
  • 依赖 Agent Core:使用 ToolContext、TraceStore 等组件
  • 独立部署:可以作为独立服务部署

Enterprise(组织层)

  • 可选的企业功能扩展
  • 提供企业级管理和控制
  • 认证、授权、审计
  • 多租户和成本管理
  • 可以集成到 Gateway:作为 Gateway 的扩展模块
  • 也可以独立部署:作为独立的 Enterprise Gateway 服务

依赖关系

Enterprise → Gateway → Agent Core
(可选)    (通讯)    (核心)

- Agent Core 不依赖任何其他层(独立)
- Gateway 依赖 Agent Core(单向依赖)
- Enterprise 依赖 Gateway(可选扩展)

部署方式

方式 1:单体部署(个人/小团队)

一个进程:
├─ Agent Core
└─ Gateway(包含 Enterprise 模块)

方式 2:分离部署(中等规模)

进程 1:Agent Core
进程 2:Gateway(包含 Enterprise 模块)

方式 3:分层部署(大规模/企业)

进程 1:Agent Core
进程 2:Gateway Core
进程 3:Enterprise Gateway

架构设计

三层架构

┌─────────────────────────────────────────────────┐
│ Layer 3: Agent 逻辑层                            │
│ - Trace, Goal, Messages                         │
│ - 工具调用和执行                                  │
└─────────────────────────────────────────────────┘
                    ↕
┌─────────────────────────────────────────────────┐
│ Layer 2: A2A IM 层                               │
│ - 活跃协作者管理                                  │
│ - 全局联系人管理                                  │
│ - conversation_id ↔ trace_id 映射                │
└─────────────────────────────────────────────────┘
                    ↕
┌─────────────────────────────────────────────────┐
│ Layer 1: Gateway 层                              │
│ - Agent 注册和发现                                │
│ - 消息路由                                        │
│ - 在线状态管理                                    │
│ - WebSocket 长连接                                │
└─────────────────────────────────────────────────┘

通信模式

模式 1:内部 Agent(同进程)

Agent A → 直接调用 → Agent B
(复用现有 agent 工具)

模式 2:跨设备 Agent(组织内)

PC Agent → WebSocket → Gateway → 云端 Agent
(反向连接,无需公网 IP)

模式 3:外部 Agent(跨组织)

Agent A → MAMP 协议 → Agent B
(点对点 HTTP)

数据模型

活跃协作者(Layer 2)

存储在 trace.context["collaborators"],记录当前任务的协作者。

{
    "name": "code-analyst",
    "type": "agent",  # agent | human
    "agent_uri": "agent://other.com/code-analyst",
    "trace_id": "abc-123",
    "conversation_id": "conv-456",
    "status": "running",  # running | waiting | completed | failed
    "summary": "正在分析代码架构",
    "last_message_at": "2026-03-04T10:30:00Z"
}

实现位置agent/core/runner.py:AgentRunner._build_context_injection

全局联系人(Layer 2)

存储在 .trace/contacts.json,记录所有历史联系过的 Agent。

{
    "agent_uri": "agent://other.com/code-analyst",
    "name": "Code Analyst",
    "type": "agent",

    # 身份信息(从 Agent Card 获取)
    "card": {
        "description": "专注于代码分析",
        "capabilities": ["code_analysis", "file_read"],
        "owner": {"user_name": "张三"}
    },

    # 交互统计
    "stats": {
        "first_contact": "2026-02-01T10:00:00Z",
        "last_contact": "2026-03-04T10:30:00Z",
        "total_conversations": 15,
        "total_messages": 127
    },

    # 最近对话
    "recent_conversations": [
        {
            "conversation_id": "conv-456",
            "trace_id": "abc-123",
            "started_at": "2026-03-04T10:00:00Z",
            "last_message": "分析完成",
            "status": "active"
        }
    ],

    # 关系标签
    "tags": ["code", "architecture"],
    "pinned": false
}

实现位置agent/trace/contact_store.py

Agent 注册信息(Layer 1)

存储在 Gateway,记录在线 Agent 的连接信息。

{
    "agent_uri": "agent://internal/code-analyst",
    "connection_type": "websocket",  # websocket | http
    "websocket": <WebSocket>,  # WebSocket 连接对象
    "http_endpoint": "http://localhost:8001",  # HTTP 端点
    "last_heartbeat": "2026-03-04T10:30:00Z",
    "capabilities": ["code_analysis", "file_read"]
}

实现位置gateway/core/registry.py


核心功能

1. Agent 注册和发现

PC Agent 启动时注册

# 建立 WebSocket 长连接
ws = await websockets.connect("wss://gateway.com/gateway/connect")

# 注册
await ws.send(json.dumps({
    "type": "register",
    "agent_uri": "agent://internal/my-agent",
    "capabilities": ["file_read", "bash"]
}))

# 保持心跳
while True:
    await ws.send(json.dumps({"type": "heartbeat"}))
    await asyncio.sleep(30)

实现位置gateway/core/client.py

2. 消息路由

通过 Gateway 发送消息

# 发送方
POST /gateway/send
{
    "to": "agent://internal/code-analyst",
    "content": "帮我分析代码"
}

# Gateway 查找目标 Agent
agent_info = registry.lookup("agent://internal/code-analyst")

# 通过 WebSocket 推送
await agent_info["websocket"].send(json.dumps({
    "type": "message",
    "from": "agent://internal/caller",
    "content": "帮我分析代码"
}))

实现位置gateway/core/router.py

3. 活跃协作者管理

发送消息时自动更新

# agent/tools/builtin/a2a_im.py

async def send_to_agent(...):
    # 发送消息
    response = await gateway_client.send(...)

    # 更新活跃协作者
    await update_active_collaborator(
        trace_id=ctx.trace_id,
        agent_uri=target_agent,
        conversation_id=response["conversation_id"],
        status="waiting"
    )

周期性注入到 Agent 上下文

# agent/core/runner.py

if iteration % 10 == 0:
    collaborators = trace.context.get("collaborators", [])
    inject_collaborators_markdown(collaborators)

4. 全局联系人管理

查询联系人

# 通过工具查询
contacts = await get_contacts(
    type="agent",
    status="online",
    tags=["code"]
)

自动维护

# 发送/接收消息时自动更新
await contact_store.update(
    agent_uri=target_agent,
    last_contact=datetime.now(),
    increment_message_count=True
)

实现位置agent/trace/contact_store.py

5. 在线状态查询

查询 Agent 在线状态

GET /gateway/status/{agent_uri}

返回:
{
    "agent_uri": "agent://internal/code-analyst",
    "status": "online",  # online | offline
    "last_seen": "2026-03-04T10:30:00Z"
}

实现位置gateway/core/router.py:get_agent_status


API 端点

Gateway API

方法 路径 说明
WS /gateway/connect Agent 注册和保持连接
POST /gateway/send 发送消息到其他 Agent
GET /gateway/status/{agent_uri} 查询 Agent 在线状态
GET /gateway/agents 列出所有在线 Agent

A2A IM API

方法 路径 说明
GET /api/traces/{id}/collaborators 查询活跃协作者
GET /api/contacts 查询全局联系人
GET /api/contacts/{agent_uri} 查询特定联系人详情
GET /api/contacts/{agent_uri}/conversations 查询对话历史

工具系统

send_to_agent 工具

发送消息到其他 Agent(内部或外部)。

@tool(description="发送消息到其他 Agent")
async def send_to_agent(
    target_agent: str,  # agent://domain/id
    message: str,
    conversation_id: Optional[str] = None,
    ctx: ToolContext = None
) -> ToolResult

实现位置agent/tools/builtin/a2a_im.py:send_to_agent

get_active_collaborators 工具

查询当前任务的活跃协作者。

@tool(description="查询当前任务的活跃协作者")
async def get_active_collaborators(
    ctx: ToolContext
) -> ToolResult

实现位置agent/tools/builtin/a2a_im.py:get_active_collaborators

get_contacts 工具

查询全局联系人列表。

@tool(description="查询所有联系过的 Agent")
async def get_contacts(
    type: Optional[str] = None,  # agent | human
    status: Optional[str] = None,  # online | offline
    tags: Optional[List[str]] = None,
    ctx: ToolContext = None
) -> ToolResult

实现位置agent/tools/builtin/a2a_im.py:get_contacts


Skill 系统

a2a_im.md Skill

提供 A2A IM 使用指南,注入到 Agent 的 system prompt。

内容

  • 如何发送消息到其他 Agent
  • 如何查询活跃协作者
  • 如何查询联系人
  • 最佳实践

实现位置agent/memory/skills/a2a_im.md


使用示例

场景 1:调用其他 Agent 协作

# Agent A 需要代码分析帮助
result = await send_to_agent(
    target_agent="agent://internal/code-analyst",
    message="帮我分析 /path/to/project 的架构"
)

# 继续对话
result2 = await send_to_agent(
    target_agent="agent://internal/code-analyst",
    message="重点分析 core 模块",
    conversation_id=result["conversation_id"]
)

场景 2:查询活跃协作者

# 查看当前任务中有哪些 Agent 在协作
collaborators = await get_active_collaborators()

# 输出:
# ## 活跃协作者
# - code-analyst [agent, completed]: 分析完成,发现3个问题
# - test-runner [agent, running]: 正在运行测试

场景 3:查询联系人

# 查找擅长代码分析的 Agent
contacts = await get_contacts(
    type="agent",
    tags=["code", "architecture"]
)

# 输出:
# ## 联系人列表
# 🟢 code-analyst - agent://internal/code-analyst
#    最后联系: 2026-03-04 10:30
#    对话次数: 15

架构决策

决策 1:Gateway 与 Agent 并列而非包含

问题:Gateway 应该放在 agent/ 内部还是与 agent/ 并列?

决策:与 agent/ 并列

理由

  1. 解耦:Gateway 和 Agent Core 是两个独立的系统
  2. 独立部署:Gateway 可以独立部署和扩展
  3. 职责清晰:Agent Core 负责单 Agent 执行,Gateway 负责 Agent 间通讯
  4. 依赖关系:Gateway 依赖 Agent Core(单向),但 Agent Core 不依赖 Gateway

实现

  • 目录结构:gateway/agent/ 并列
  • Import 路径:from gateway.core import ...

决策 2:Enterprise 与 Gateway 的关系

问题:Enterprise 应该是 Gateway 的上层(分层架构)还是 Gateway 的模块(模块化架构)?

决策:根据阶段选择

MVP 阶段(当前):模块化架构

  • Enterprise 作为 Gateway 的可选模块
  • 部署简单,快速迭代
  • 适合中小规模

大规模阶段(未来):可选分层架构

  • Enterprise 作为独立的 Gateway 层
  • 可独立扩容,团队协作
  • 适合大规模部署

理由

  1. 灵活性:两种架构都可以实现可选部署
  2. 演进路径:从模块化开始,需要时重构为分层
  3. 规模决定:小规模用模块化,大规模用分层

实现

  • 当前:gateway/enterprise/ 作为可选模块
  • 未来:可重构为独立的 enterprise_gateway/ 服务

决策 3:活跃协作者的管理方式

问题:活跃协作者信息应该如何存储和管理?

决策:存储在 trace.context["collaborators"],由工具自动维护

理由

  1. 复用现有机制:Agent Core 已有 context 机制
  2. 自动注入:Runner 周期性注入到 Agent 上下文(每 10 轮)
  3. 工具维护:send_to_agent 等工具自动更新
  4. 与 Goal 一致:与 GoalTree 一同注入,保持一致性

实现位置

  • 存储:trace.context["collaborators"]
  • 注入:agent/core/runner.py:AgentRunner._build_context_injection
  • 更新:agent/tools/builtin/a2a_im.py:_update_active_collaborator

实现路线图

Phase 1:基础功能(1-2 周)

目标:实现核心通信能力

任务

  1. 实现 Gateway(注册、路由、WebSocket)
  2. 实现 send_to_agent 工具
  3. 实现活跃协作者自动更新
  4. 实现 a2a_im.md Skill

实现位置

  • gateway/core/
  • agent/tools/builtin/a2a_im.py
  • agent/memory/skills/a2a_im.md

Phase 2:联系人管理(1 周)

目标:完善联系人和历史记录

任务

  1. 实现 ContactStore
  2. 实现 get_contacts 工具
  3. 实现对话历史查询
  4. 实现在线状态查询

实现位置

  • agent/trace/contact_store.py
  • agent/tools/builtin/a2a_im.py

Phase 3:增强功能(可选)

目标:提升用户体验

任务

  1. 实现消息队列(异步处理)
  2. 实现 Agent 发现和推荐
  3. 实现关系标签和分组
  4. 实现 UI 界面

相关文档