architecture.md 7.8 KB

Gateway 架构设计

更新日期: 2026-03-05

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现
  2. 文档分层,链接代码 - 关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录最重要的、与代码准确对应的或明确的已完成的设计

架构概述

Gateway 采用两层架构:

┌─────────────────────────────────────────────────┐
│ Router 层(路由和 API)                          │
│ - HTTP API 端点                                  │
│ - 消息路由(Webhook 推送)                       │
└─────────────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────────────┐
│ Registry 层(注册和状态)                         │
│ - Agent 注册信息(含 webhook_url)               │
│ - 在线状态管理                                    │
│ - 心跳超时检测                                    │
└─────────────────────────────────────────────────┘

核心组件

AgentRegistry

管理在线 Agent 的注册信息。

实现位置gateway/core/registry.py:AgentRegistry

职责

  • Agent 注册和注销(含 webhook_url)
  • 在线状态跟踪
  • 心跳超时检测
  • 自动清理过期连接

关键方法

  • register(agent_id, webhook_url, ...) - 注册 Agent
  • unregister(agent_id) - 注销 Agent
  • heartbeat(agent_id) - 更新心跳
  • is_online(agent_id) - 检查在线状态
  • list_agents() - 列出 Agent

GatewayRouter

处理消息路由和 HTTP API 端点。

实现位置gateway/core/router.py:GatewayRouter

职责

  • Agent 注册/注销/心跳端点
  • 消息接收,Webhook 推送到目标 Agent
  • 在线状态查询
  • Agent 列表查询

API 端点

  • POST /gateway/register - Agent 注册
  • POST /gateway/heartbeat - 心跳
  • POST /gateway/unregister - 注销
  • POST /gateway/send - 发送消息
  • GET /gateway/status/{agent_id} - 查询状态
  • GET /gateway/agents - 列出 Agent
  • GET /gateway/health - Gateway 状态

详见 API 文档

GatewayClient(SDK)

Agent 端用于与 Gateway 交互的客户端。

实现位置gateway/client/python/client.py:GatewayClient

职责

  • HTTP 注册/注销/心跳
  • 发送消息
  • 查询 Agent 列表和状态

通信流程

Agent 注册流程

Agent                       Gateway
   │                           │
   │  POST /gateway/register   │
   │  {agent_id, webhook_url,  │
   │   capabilities, ...}      │
   ├──────────────────────────>│
   │                           │  Registry.register()
   │                           │  保存 webhook_url
   │                           │
   │  {"status": "registered"} │
   │<──────────────────────────┤
   │                           │
   │  POST /gateway/heartbeat  │  (每 30 秒)
   ├──────────────────────────>│
   │                           │  Registry.heartbeat()
   │<──────────────────────────┤

消息路由流程

Agent A                 Gateway                 Agent B
   │                       │                       │
   │  POST /gateway/send   │                       │
   │  {from, to, content}  │                       │
   ├──────────────────────>│                       │
   │                       │  Registry.lookup()    │
   │                       │  获取 webhook_url     │
   │                       │                       │
   │                       │  POST {webhook_url}   │
   │                       ├──────────────────────>│
   │                       │                       │  message_queue.push()
   │                       │  200 OK               │
   │                       │<──────────────────────┤
   │                       │                       │
   │  {"status": "sent"}   │                       │
   │<──────────────────────┤                       │

消息通知到 Agent/LLM

设计方案:Context Injection Hook

借鉴 Agent 框架中的 Active Collaborators 设计,通过 context injection hook 机制周期性提醒 LLM 检查消息。

Webhook → A2AMessageQueue → Context Hook → Runner 每 10 轮注入 → LLM
                ↓                                                    ↓
          消息队列                                         调用 check_messages 工具

组件

A2AMessageQueue - 消息缓冲队列

实现位置agent/tools/builtin/a2a_im.py:A2AMessageQueue(待实现)

create_a2a_context_hook - 创建注入钩子,有消息时返回提醒文本

实现位置agent/tools/builtin/a2a_im.py:create_a2a_context_hook(待实现)

check_messages 工具 - LLM 调用后返回完整消息内容,并清空队列

实现位置agent/tools/builtin/a2a_im.py:check_messages(待实现)

Runner context_hooks 参数 - 每 10 轮注入时调用所有 hooks

实现位置agent/core/runner.py:AgentRunner._build_context_injection(待实现)

注入效果

## Current Plan
1. [in_progress] 分析代码架构

## Active Collaborators
- researcher [agent, completed]: 已完成调研

## Messages
💬 来自 code-reviewer 的 1 条新消息(使用 check_messages 工具查看)

LLM 自主决定何时调用 check_messages 工具查看详细内容。

详见 Agent 架构文档:Context Injection Hooks


设计决策

决策 1:消息接收采用 Webhook

问题:Agent 如何接收来自 Gateway 的消息?

决策:Webhook 推送,Agent 注册时提供 webhook_url

理由

  • 任务协作场景,秒级延迟完全可接受
  • 无状态 HTTP,无需管理长连接和重连
  • Agent 已运行 FastAPI 服务,添加端点简单
  • 标准 HTTP,易调试和监控

详见 设计决策文档

决策 2:心跳维持在线状态

问题:如何判断 Agent 是否在线?

决策:Agent 每 30 秒 HTTP POST 心跳,60 秒未收到则标记离线

理由

  • 无状态 HTTP,与 Webhook 方案一致
  • 参数选择:30s 间隔平衡实时性和网络开销,60s 超时允许一次心跳丢失

实现位置gateway/core/registry.py:AgentRegistry._cleanup_loop


扩展点

Enterprise 集成

Enterprise 功能通过中间件模式扩展:

router = GatewayRouter(
    registry=registry,
    middlewares=[EnterpriseAuth(), EnterpriseAudit()]
)

详见 Enterprise 层文档


相关文档