architecture.md 7.3 KB

Gateway 架构设计

更新日期: 2026-03-04

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现
  2. 文档分层,链接代码 - 关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录最重要的、与代码准确对应的或明确的已完成的设计

架构概述

Gateway 采用三层架构:

┌─────────────────────────────────────────────────┐
│ Router 层(路由和 API)                          │
│ - WebSocket 连接处理                             │
│ - HTTP API 端点                                  │
│ - 消息路由逻辑                                    │
└─────────────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────────────┐
│ Registry 层(注册和状态)                         │
│ - Agent 连接信息                                 │
│ - 在线状态管理                                    │
│ - 心跳超时检测                                    │
└─────────────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────────────┐
│ Client 层(客户端)                               │
│ - WebSocket 客户端                               │
│ - 自动心跳                                        │
│ - 消息接收处理                                    │
└─────────────────────────────────────────────────┘

核心组件

AgentRegistry

管理在线 Agent 的注册信息和连接。

实现位置gateway/core/registry.py:AgentRegistry

职责

  • Agent 注册和注销
  • 在线状态跟踪
  • 心跳超时检测
  • 自动清理过期连接

关键方法

  • register() - 注册 Agent
  • unregister() - 注销 Agent
  • heartbeat() - 更新心跳
  • is_online() - 检查在线状态
  • list_agents() - 列出 Agent

GatewayRouter

处理消息路由和 API 端点。

实现位置gateway/core/router.py:GatewayRouter

职责

  • WebSocket 连接处理
  • 消息路由到目标 Agent
  • 在线状态查询
  • Agent 列表查询

API 端点

  • WS /gateway/connect - Agent 注册
  • POST /gateway/send - 发送消息
  • GET /gateway/status/{agent_uri} - 查询状态
  • GET /gateway/agents - 列出 Agent

GatewayClient

PC Agent 用于连接 Gateway 的客户端。

实现位置gateway/core/client.py:GatewayClient

职责

  • 建立 WebSocket 连接
  • 自动心跳保持
  • 接收和处理消息
  • 发送任务结果

通信流程

Agent 注册流程

PC Agent                    Gateway
   │                           │
   │  1. WebSocket 连接        │
   ├──────────────────────────>│
   │                           │
   │  2. 发送注册消息          │
   │  {type: "register",       │
   │   agent_uri: "...",       │
   │   capabilities: [...]}    │
   ├──────────────────────────>│
   │                           │  Registry.register()
   │                           │
   │  3. 返回注册确认          │
   │  {type: "registered"}     │
   │<──────────────────────────┤
   │                           │
   │  4. 开始心跳循环          │
   │  (每 30 秒)               │
   │                           │

消息路由流程

Agent A                 Gateway                 Agent B
   │                       │                       │
   │  1. 发送消息          │                       │
   │  POST /gateway/send   │                       │
   ├──────────────────────>│                       │
   │                       │  2. 查找目标 Agent    │
   │                       │  Registry.lookup()    │
   │                       │                       │
   │                       │  3. 通过 WebSocket    │
   │                       │     转发消息          │
   │                       ├──────────────────────>│
   │                       │                       │
   │  4. 返回成功          │                       │
   │<──────────────────────┤                       │

设计决策

决策 1:反向连接模式

问题:PC Agent 在 NAT 后面,如何被其他 Agent 访问?

决策:PC Agent 主动连接 Gateway(反向连接)

理由

  • 无需公网 IP
  • 无需配置防火墙
  • 安全(只有出站连接)
  • 类似飞书 Bot 模式

实现位置gateway/core/client.py:GatewayClient.connect

决策 2:心跳机制

问题:如何判断 Agent 是否在线?

决策:每 30 秒发送心跳,60 秒超时

理由

  • 及时检测离线
  • 避免频繁心跳消耗资源
  • 自动清理过期连接

实现位置gateway/core/registry.py:AgentRegistry._cleanup_loop

决策 3:WebSocket vs HTTP

问题:使用 WebSocket 还是 HTTP 轮询?

决策:WebSocket 长连接

理由

  • 实时性好(无需轮询)
  • 资源消耗低
  • 支持双向通信
  • 适合 IM 场景

实现位置gateway/core/router.py:GatewayRouter.handle_websocket


扩展点

中间件机制

可以添加中间件来扩展功能:

class AuthMiddleware:
    async def process(self, msg):
        # 认证检查
        if not await self.auth.verify(msg):
            raise Unauthorized()
        return msg

router = GatewayRouter(
    registry=registry,
    middlewares=[AuthMiddleware(), AuditMiddleware()]
)

Enterprise 集成

Enterprise 功能可以作为中间件集成:

from gateway.enterprise import EnterpriseAuth, EnterpriseAudit

router = GatewayRouter(
    registry=registry,
    middlewares=[
        EnterpriseAuth(),
        EnterpriseAudit(),
        CostMiddleware()
    ]
)

性能考虑

连接数限制

  • 单个 Gateway 实例:建议 < 10000 个连接
  • 超过限制:部署多个 Gateway 实例

消息吞吐量

  • 单个 Gateway 实例:约 1000 msg/s
  • 瓶颈:WebSocket 连接数和消息序列化

扩展方案

  • 水平扩展:部署多个 Gateway 实例
  • 负载均衡:使用 Nginx/HAProxy
  • 服务发现:使用 Consul/etcd

相关文档