# A2A IM:Agent 即时通讯系统 **更新日期:** 2026-03-04 ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖 1. **文档分层,链接代码** - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议,或大量代码;决策依据或修改日志若有必要,可在`docs/decisions.md`另行记录 --- ## 文档说明 本文档描述 Agent 间即时通讯(A2A IM)系统的架构和实现。 **相关文档**: - [MAMP 协议](./research/a2a-mamp-protocol.md):消息格式和传输协议 - [A2A 跨设备通信](./research/a2a-cross-device.md):内部 Agent 通信方案 - [Agent 框架](./README.md):核心 Agent 能力 - [Enterprise 层](../gateway/docs/enterprise/overview.md):组织级功能 --- ## 系统概述 A2A IM 是一个**任务导向的 Agent 即时通讯系统**,支持: - Agent 间消息传递(点对点、通过 Gateway) - 活跃协作者管理(当前任务) - 全局联系人管理(历史记录) - 在线状态查询 - 对话历史追溯 **与传统 IM 的区别**: - 任务导向(非纯聊天) - 长时间处理(分钟到小时) - 工具调用和执行记录 - 完整的 Trace 追溯 --- ## 架构层次关系 A2A IM 在整体架构中的定位: ``` ┌─────────────────────────────────────────────────────────────┐ │ Enterprise Layer(组织级)- 可选 │ │ - 认证和授权(飞书 OAuth、API Key、JWT) │ │ - 审计和监控(操作日志、成本记录、安全事件) │ │ - 多租户和权限控制(角色验证、资源访问控制) │ │ - 成本管理和限额(用户级/组织级限额、超限告警) │ │ │ │ 实现位置: gateway/enterprise/ │ │ 文档: gateway/docs/enterprise/overview.md │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ A2A IM Gateway(通讯层)★ 本文档 │ │ - Agent 注册和发现(Registry) │ │ - 消息路由(Gateway Router) │ │ - 活跃协作者管理(Collaborators) │ │ - 在线状态管理(Heartbeat) │ │ - 联系人管理(ContactStore) │ │ │ │ 实现位置: gateway/core/ │ │ 文档: docs/a2a-im.md(本文档) │ └─────────────────────────────────────────────────────────────┘ ↕ 使用(单向依赖) ┌─────────────────────────────────────────────────────────────┐ │ Agent Core(核心层) │ │ - Trace、Message、Goal 管理 │ │ - 工具系统(文件、命令、网络、浏览器) │ │ - LLM 集成(Gemini、OpenRouter、Yescode) │ │ - Skills(领域知识注入) │ │ - 子 Agent 机制(agent 工具) │ │ │ │ 实现位置: agent/ │ │ 文档: docs/README.md │ └─────────────────────────────────────────────────────────────┘ ``` ### 层次说明 **Agent Core(核心层)**: - 提供单个 Agent 的执行能力 - 管理 Trace、Message、Goal - 提供工具系统和 LLM 集成 - 支持子 Agent 创建(通过 `agent` 工具) - **独立部署**:可以不依赖 Gateway 运行 **A2A IM Gateway(通讯层)**: - 与 Agent Core 并列,独立的系统 - 提供 Agent 间通讯能力 - 管理 Agent 注册和在线状态 - 路由消息到目标 Agent - 维护活跃协作者和联系人 - **依赖 Agent Core**:使用 ToolContext、TraceStore 等组件 - **独立部署**:可以作为独立服务部署 **Enterprise(组织层)**: - 可选的企业功能扩展 - 提供企业级管理和控制 - 认证、授权、审计 - 多租户和成本管理 - **可以集成到 Gateway**:作为 Gateway 的扩展模块 - **也可以独立部署**:作为独立的 Enterprise Gateway 服务 ### 依赖关系 ``` Enterprise → Gateway → Agent Core (可选) (通讯) (核心) - Agent Core 不依赖任何其他层(独立) - Gateway 依赖 Agent Core(单向依赖) - Enterprise 依赖 Gateway(可选扩展) ``` ### 部署方式 **方式 1:单体部署(个人/小团队)** ``` 一个进程: ├─ Agent Core └─ Gateway(包含 Enterprise 模块) ``` **方式 2:分离部署(中等规模)** ``` 进程 1:Agent Core 进程 2:Gateway(包含 Enterprise 模块) ``` **方式 3:分层部署(大规模/企业)** ``` 进程 1:Agent Core 进程 2:Gateway Core 进程 3:Enterprise Gateway ``` --- ## 架构设计 ### 三层架构 ``` ┌─────────────────────────────────────────────────┐ │ Layer 3: Agent 逻辑层 │ │ - Trace, Goal, Messages │ │ - 工具调用和执行 │ └─────────────────────────────────────────────────┘ ↕ ┌─────────────────────────────────────────────────┐ │ Layer 2: A2A IM 层 │ │ - 活跃协作者管理 │ │ - 全局联系人管理 │ │ - conversation_id ↔ trace_id 映射 │ └─────────────────────────────────────────────────┘ ↕ ┌─────────────────────────────────────────────────┐ │ Layer 1: Gateway 层 │ │ - Agent 注册和发现 │ │ - 消息路由 │ │ - 在线状态管理 │ │ - WebSocket 长连接 │ └─────────────────────────────────────────────────┘ ``` ### 通信模式 **模式 1:内部 Agent(同进程)** ``` Agent A → 直接调用 → Agent B (复用现有 agent 工具) ``` **模式 2:跨设备 Agent(组织内)** ``` PC Agent → WebSocket → Gateway → 云端 Agent (反向连接,无需公网 IP) ``` **模式 3:外部 Agent(跨组织)** ``` Agent A → MAMP 协议 → Agent B (点对点 HTTP) ``` --- ## 数据模型 ### 活跃协作者(Layer 2) 存储在 `trace.context["collaborators"]`,记录当前任务的协作者。 ```python { "name": "code-analyst", "type": "agent", # agent | human "agent_uri": "agent://other.com/code-analyst", "trace_id": "abc-123", "conversation_id": "conv-456", "status": "running", # running | waiting | completed | failed "summary": "正在分析代码架构", "last_message_at": "2026-03-04T10:30:00Z" } ``` **实现位置**:`agent/core/runner.py:AgentRunner._build_context_injection` ### 全局联系人(Layer 2) 存储在 `.trace/contacts.json`,记录所有历史联系过的 Agent。 ```python { "agent_uri": "agent://other.com/code-analyst", "name": "Code Analyst", "type": "agent", # 身份信息(从 Agent Card 获取) "card": { "description": "专注于代码分析", "capabilities": ["code_analysis", "file_read"], "owner": {"user_name": "张三"} }, # 交互统计 "stats": { "first_contact": "2026-02-01T10:00:00Z", "last_contact": "2026-03-04T10:30:00Z", "total_conversations": 15, "total_messages": 127 }, # 最近对话 "recent_conversations": [ { "conversation_id": "conv-456", "trace_id": "abc-123", "started_at": "2026-03-04T10:00:00Z", "last_message": "分析完成", "status": "active" } ], # 关系标签 "tags": ["code", "architecture"], "pinned": false } ``` **实现位置**:`agent/trace/contact_store.py` ### Agent 注册信息(Layer 1) 存储在 Gateway,记录在线 Agent 的连接信息。 ```python { "agent_uri": "agent://internal/code-analyst", "connection_type": "websocket", # websocket | http "websocket": , # WebSocket 连接对象 "http_endpoint": "http://localhost:8001", # HTTP 端点 "last_heartbeat": "2026-03-04T10:30:00Z", "capabilities": ["code_analysis", "file_read"] } ``` **实现位置**:`gateway/core/registry.py` --- ## 核心功能 ### 1. Agent 注册和发现 **PC Agent 启动时注册**: ```python # 建立 WebSocket 长连接 ws = await websockets.connect("wss://gateway.com/gateway/connect") # 注册 await ws.send(json.dumps({ "type": "register", "agent_uri": "agent://internal/my-agent", "capabilities": ["file_read", "bash"] })) # 保持心跳 while True: await ws.send(json.dumps({"type": "heartbeat"})) await asyncio.sleep(30) ``` **实现位置**:`gateway/core/client.py` ### 2. 消息路由 **通过 Gateway 发送消息**: ```python # 发送方 POST /gateway/send { "to": "agent://internal/code-analyst", "content": "帮我分析代码" } # Gateway 查找目标 Agent agent_info = registry.lookup("agent://internal/code-analyst") # 通过 WebSocket 推送 await agent_info["websocket"].send(json.dumps({ "type": "message", "from": "agent://internal/caller", "content": "帮我分析代码" })) ``` **实现位置**:`gateway/core/router.py` ### 3. 活跃协作者管理 **发送消息时自动更新**: ```python # agent/tools/builtin/a2a_im.py async def send_to_agent(...): # 发送消息 response = await gateway_client.send(...) # 更新活跃协作者 await update_active_collaborator( trace_id=ctx.trace_id, agent_uri=target_agent, conversation_id=response["conversation_id"], status="waiting" ) ``` **周期性注入到 Agent 上下文**: ```python # agent/core/runner.py if iteration % 10 == 0: collaborators = trace.context.get("collaborators", []) inject_collaborators_markdown(collaborators) ``` ### 4. 全局联系人管理 **查询联系人**: ```python # 通过工具查询 contacts = await get_contacts( type="agent", status="online", tags=["code"] ) ``` **自动维护**: ```python # 发送/接收消息时自动更新 await contact_store.update( agent_uri=target_agent, last_contact=datetime.now(), increment_message_count=True ) ``` **实现位置**:`agent/trace/contact_store.py` ### 5. 在线状态查询 **查询 Agent 在线状态**: ```python GET /gateway/status/{agent_uri} 返回: { "agent_uri": "agent://internal/code-analyst", "status": "online", # online | offline "last_seen": "2026-03-04T10:30:00Z" } ``` **实现位置**:`gateway/core/router.py:get_agent_status` --- ## API 端点 ### Gateway API | 方法 | 路径 | 说明 | |------|------|------| | WS | `/gateway/connect` | Agent 注册和保持连接 | | POST | `/gateway/send` | 发送消息到其他 Agent | | GET | `/gateway/status/{agent_uri}` | 查询 Agent 在线状态 | | GET | `/gateway/agents` | 列出所有在线 Agent | ### A2A IM API | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/traces/{id}/collaborators` | 查询活跃协作者 | | GET | `/api/contacts` | 查询全局联系人 | | GET | `/api/contacts/{agent_uri}` | 查询特定联系人详情 | | GET | `/api/contacts/{agent_uri}/conversations` | 查询对话历史 | --- ## 工具系统 ### send_to_agent 工具 发送消息到其他 Agent(内部或外部)。 ```python @tool(description="发送消息到其他 Agent") async def send_to_agent( target_agent: str, # agent://domain/id message: str, conversation_id: Optional[str] = None, ctx: ToolContext = None ) -> ToolResult ``` **实现位置**:`agent/tools/builtin/a2a_im.py:send_to_agent` ### get_active_collaborators 工具 查询当前任务的活跃协作者。 ```python @tool(description="查询当前任务的活跃协作者") async def get_active_collaborators( ctx: ToolContext ) -> ToolResult ``` **实现位置**:`agent/tools/builtin/a2a_im.py:get_active_collaborators` ### get_contacts 工具 查询全局联系人列表。 ```python @tool(description="查询所有联系过的 Agent") async def get_contacts( type: Optional[str] = None, # agent | human status: Optional[str] = None, # online | offline tags: Optional[List[str]] = None, ctx: ToolContext = None ) -> ToolResult ``` **实现位置**:`agent/tools/builtin/a2a_im.py:get_contacts` --- ## Skill 系统 ### a2a_im.md Skill 提供 A2A IM 使用指南,注入到 Agent 的 system prompt。 **内容**: - 如何发送消息到其他 Agent - 如何查询活跃协作者 - 如何查询联系人 - 最佳实践 **实现位置**:`agent/skill/skills/a2a_im.md` --- ## 使用示例 ### 场景 1:调用其他 Agent 协作 ```python # Agent A 需要代码分析帮助 result = await send_to_agent( target_agent="agent://internal/code-analyst", message="帮我分析 /path/to/project 的架构" ) # 继续对话 result2 = await send_to_agent( target_agent="agent://internal/code-analyst", message="重点分析 core 模块", conversation_id=result["conversation_id"] ) ``` ### 场景 2:查询活跃协作者 ```python # 查看当前任务中有哪些 Agent 在协作 collaborators = await get_active_collaborators() # 输出: # ## 活跃协作者 # - code-analyst [agent, completed]: 分析完成,发现3个问题 # - test-runner [agent, running]: 正在运行测试 ``` ### 场景 3:查询联系人 ```python # 查找擅长代码分析的 Agent contacts = await get_contacts( type="agent", tags=["code", "architecture"] ) # 输出: # ## 联系人列表 # 🟢 code-analyst - agent://internal/code-analyst # 最后联系: 2026-03-04 10:30 # 对话次数: 15 ``` --- ## 架构决策 ### 决策 1:Gateway 与 Agent 并列而非包含 **问题**:Gateway 应该放在 agent/ 内部还是与 agent/ 并列? **决策**:与 agent/ 并列 **理由**: 1. **解耦**:Gateway 和 Agent Core 是两个独立的系统 2. **独立部署**:Gateway 可以独立部署和扩展 3. **职责清晰**:Agent Core 负责单 Agent 执行,Gateway 负责 Agent 间通讯 4. **依赖关系**:Gateway 依赖 Agent Core(单向),但 Agent Core 不依赖 Gateway **实现**: - 目录结构:`gateway/` 与 `agent/` 并列 - Import 路径:`from gateway.core import ...` ### 决策 2:Enterprise 与 Gateway 的关系 **问题**:Enterprise 应该是 Gateway 的上层(分层架构)还是 Gateway 的模块(模块化架构)? **决策**:根据阶段选择 **MVP 阶段(当前)**:模块化架构 - Enterprise 作为 Gateway 的可选模块 - 部署简单,快速迭代 - 适合中小规模 **大规模阶段(未来)**:可选分层架构 - Enterprise 作为独立的 Gateway 层 - 可独立扩容,团队协作 - 适合大规模部署 **理由**: 1. **灵活性**:两种架构都可以实现可选部署 2. **演进路径**:从模块化开始,需要时重构为分层 3. **规模决定**:小规模用模块化,大规模用分层 **实现**: - 当前:`gateway/enterprise/` 作为可选模块 - 未来:可重构为独立的 `enterprise_gateway/` 服务 ### 决策 3:活跃协作者的管理方式 **问题**:活跃协作者信息应该如何存储和管理? **决策**:存储在 `trace.context["collaborators"]`,由工具自动维护 **理由**: 1. **复用现有机制**:Agent Core 已有 context 机制 2. **自动注入**:Runner 周期性注入到 Agent 上下文(每 10 轮) 3. **工具维护**:send_to_agent 等工具自动更新 4. **与 Goal 一致**:与 GoalTree 一同注入,保持一致性 **实现位置**: - 存储:`trace.context["collaborators"]` - 注入:`agent/core/runner.py:AgentRunner._build_context_injection` - 更新:`agent/tools/builtin/a2a_im.py:_update_active_collaborator` --- ## 实现路线图 ### Phase 1:基础功能(1-2 周) **目标**:实现核心通信能力 **任务**: 1. 实现 Gateway(注册、路由、WebSocket) 2. 实现 send_to_agent 工具 3. 实现活跃协作者自动更新 4. 实现 a2a_im.md Skill **实现位置**: - `gateway/core/` - `agent/tools/builtin/a2a_im.py` - `agent/skill/skills/a2a_im.md` ### Phase 2:联系人管理(1 周) **目标**:完善联系人和历史记录 **任务**: 1. 实现 ContactStore 2. 实现 get_contacts 工具 3. 实现对话历史查询 4. 实现在线状态查询 **实现位置**: - `agent/trace/contact_store.py` - `agent/tools/builtin/a2a_im.py` ### Phase 3:增强功能(可选) **目标**:提升用户体验 **任务**: 1. 实现消息队列(异步处理) 2. 实现 Agent 发现和推荐 3. 实现关系标签和分组 4. 实现 UI 界面 --- ## 相关文档 - [MAMP 协议](./research/a2a-mamp-protocol.md):消息格式和传输协议 - [A2A 跨设备通信](./research/a2a-cross-device.md):内部 Agent 通信方案 - [工具系统](../agent/docs/tools.md):工具定义、注册 - [Skills 指南](../agent/docs/skills.md):Skill 分类、编写、加载 - [Agent 框架](./README.md):核心 Agent 能力 - [Gateway 架构](../gateway/docs/architecture.md):Gateway 三层架构 - [Gateway API](../gateway/docs/api.md):Gateway API 参考