# Gateway 架构设计 **更新日期:** 2026-03-04 ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现 1. **文档分层,链接代码** - 关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录最重要的、与代码准确对应的或明确的已完成的设计 --- ## 架构概述 Gateway 采用三层架构: ``` ┌─────────────────────────────────────────────────┐ │ Router 层(路由和 API) │ │ - WebSocket 连接处理 │ │ - HTTP API 端点 │ │ - 消息路由逻辑 │ └─────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────┐ │ Registry 层(注册和状态) │ │ - Agent 连接信息 │ │ - 在线状态管理 │ │ - 心跳超时检测 │ └─────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────┐ │ Client 层(客户端) │ │ - WebSocket 客户端 │ │ - 自动心跳 │ │ - 消息接收处理 │ └─────────────────────────────────────────────────┘ ``` --- ## 核心组件 ### AgentRegistry 管理在线 Agent 的注册信息和连接。 **实现位置**:`gateway/core/registry.py:AgentRegistry` **职责**: - Agent 注册和注销 - 在线状态跟踪 - 心跳超时检测 - 自动清理过期连接 **关键方法**: - `register()` - 注册 Agent - `unregister()` - 注销 Agent - `heartbeat()` - 更新心跳 - `is_online()` - 检查在线状态 - `list_agents()` - 列出 Agent ### GatewayRouter 处理消息路由和 API 端点。 **实现位置**:`gateway/core/router.py:GatewayRouter` **职责**: - WebSocket 连接处理 - 消息路由到目标 Agent - 在线状态查询 - Agent 列表查询 **API 端点**: - `WS /gateway/connect` - Agent 注册 - `POST /gateway/send` - 发送消息 - `GET /gateway/status/{agent_uri}` - 查询状态 - `GET /gateway/agents` - 列出 Agent ### GatewayClient PC Agent 用于连接 Gateway 的客户端。 **实现位置**:`gateway/core/client.py:GatewayClient` **职责**: - 建立 WebSocket 连接 - 自动心跳保持 - 接收和处理消息 - 发送任务结果 --- ## 通信流程 ### Agent 注册流程 ``` PC Agent Gateway │ │ │ 1. WebSocket 连接 │ ├──────────────────────────>│ │ │ │ 2. 发送注册消息 │ │ {type: "register", │ │ agent_uri: "...", │ │ capabilities: [...]} │ ├──────────────────────────>│ │ │ Registry.register() │ │ │ 3. 返回注册确认 │ │ {type: "registered"} │ │<──────────────────────────┤ │ │ │ 4. 开始心跳循环 │ │ (每 30 秒) │ │ │ ``` ### 消息路由流程 ``` Agent A Gateway Agent B │ │ │ │ 1. 发送消息 │ │ │ POST /gateway/send │ │ ├──────────────────────>│ │ │ │ 2. 查找目标 Agent │ │ │ Registry.lookup() │ │ │ │ │ │ 3. 通过 WebSocket │ │ │ 转发消息 │ │ ├──────────────────────>│ │ │ │ │ 4. 返回成功 │ │ │<──────────────────────┤ │ ``` --- ## 设计决策 ### 决策 1:反向连接模式 **问题**:PC Agent 在 NAT 后面,如何被其他 Agent 访问? **决策**:PC Agent 主动连接 Gateway(反向连接) **理由**: - 无需公网 IP - 无需配置防火墙 - 安全(只有出站连接) - 类似飞书 Bot 模式 **实现位置**:`gateway/core/client.py:GatewayClient.connect` ### 决策 2:心跳机制 **问题**:如何判断 Agent 是否在线? **决策**:每 30 秒发送心跳,60 秒超时 **理由**: - 及时检测离线 - 避免频繁心跳消耗资源 - 自动清理过期连接 **实现位置**:`gateway/core/registry.py:AgentRegistry._cleanup_loop` ### 决策 3:WebSocket vs HTTP **问题**:使用 WebSocket 还是 HTTP 轮询? **决策**:WebSocket 长连接 **理由**: - 实时性好(无需轮询) - 资源消耗低 - 支持双向通信 - 适合 IM 场景 **实现位置**:`gateway/core/router.py:GatewayRouter.handle_websocket` --- ## 扩展点 ### 中间件机制 可以添加中间件来扩展功能: ```python class AuthMiddleware: async def process(self, msg): # 认证检查 if not await self.auth.verify(msg): raise Unauthorized() return msg router = GatewayRouter( registry=registry, middlewares=[AuthMiddleware(), AuditMiddleware()] ) ``` ### Enterprise 集成 Enterprise 功能可以作为中间件集成: ```python from gateway.enterprise import EnterpriseAuth, EnterpriseAudit router = GatewayRouter( registry=registry, middlewares=[ EnterpriseAuth(), EnterpriseAudit(), CostMiddleware() ] ) ``` --- ## 性能考虑 ### 连接数限制 - 单个 Gateway 实例:建议 < 10000 个连接 - 超过限制:部署多个 Gateway 实例 ### 消息吞吐量 - 单个 Gateway 实例:约 1000 msg/s - 瓶颈:WebSocket 连接数和消息序列化 ### 扩展方案 - 水平扩展:部署多个 Gateway 实例 - 负载均衡:使用 Nginx/HAProxy - 服务发现:使用 Consul/etcd --- ## 相关文档 - [部署指南](./deployment.md):部署方式和配置 - [API 文档](./api.md):完整的 API 参考 - [A2A IM 系统](../../docs/a2a-im.md):完整的 A2A IM 文档