|
|
4 днів тому | |
|---|---|---|
| .. | ||
| client | 4 днів тому | |
| core | 5 днів тому | |
| docs | 4 днів тому | |
| README.md | 4 днів тому | |
| __init__.py | 5 днів тому | |
| setup.py | 5 днів тому | |
框架无关的 Agent 间即时通讯网关,提供 Agent 注册、消息路由(Webhook 推送)、在线状态管理。
Gateway 是一个任务导向的 Agent 即时通讯系统,支持任何 Agent 框架使用。核心功能:
独立性:Gateway 与 Agent Core 并列,可以独立部署。
gateway/
├── core/ # Gateway 核心服务(框架无关)
│ ├── registry.py # Agent 注册和在线状态管理(含 webhook_url)
│ └── router.py # HTTP API 端点和消息路由
│
├── client/ # 通用客户端工具
│ ├── python/ # Python SDK
│ │ ├── tools.py # 通用工具函数
│ │ ├── client.py # GatewayClient
│ │ └── cli.py # CLI 工具
│ └── a2a_im.md # Agent Skill 文档
│
├── docs/ # 详细文档
│ ├── architecture.md
│ ├── deployment.md
│ ├── api.md
│ ├── decisions.md
│ └── enterprise/
│
└── enterprise/ # 企业功能(可选)
├── auth.py
├── audit.py
└── cost.py
examples/gateway_integration/ # 集成示例(项目根目录)
python gateway_server.py
服务器将在 http://localhost:8001 启动。
from gateway.client.python import GatewayClient, AgentCard
client = GatewayClient(
gateway_url="http://localhost:8001",
agent_card=AgentCard(
agent_id="my-agent-001",
agent_name="My Agent",
capabilities=["search", "analyze"]
),
webhook_url="http://my-agent.local:8080/webhook/a2a-messages"
)
await client.connect() # 注册 + 启动心跳
await client.send_message(
to_agent_id="target-agent",
content=[{"type": "text", "text": "Hello"}],
conversation_id="conv-123"
)
在 Agent 服务中添加 webhook 端点,Gateway 收到消息后会 POST 到此地址:
# api_server.py
@app.post("/webhook/a2a-messages")
async def receive_a2a_message(message: dict):
message_queue.push(message)
return {"status": "received"}
消息通过 Context Injection Hook 机制周期性提醒 LLM,由 LLM 调用 check_messages 工具查看详情。详见 消息通知机制。
gateway-cli send --from my-agent --to target-agent --message "Hello!"
gateway-cli list
gateway-cli status target-agent
详见 gateway/client/a2a_im.md(Agent Skill 文档)。
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /gateway/register |
注册 Agent(含 webhook_url) |
| POST | /gateway/heartbeat |
心跳(每 30 秒) |
| POST | /gateway/unregister |
注销 Agent |
| POST | /gateway/send |
发送消息 |
| GET | /gateway/status/{agent_id} |
查询 Agent 状态 |
| GET | /gateway/agents |
列出在线 Agent |
| GET | /gateway/health |
Gateway 状态 |
详见 API 文档。
module/file.py:function_namedocs/decisions.md 记录Gateway 是一个任务导向的 Agent 即时通讯系统,支持任何 Agent 框架使用。核心功能:
独立性:Gateway 与 Agent Core 并列,可以独立部署。
依赖关系:Gateway 依赖 Agent Core(使用 ToolContext、TraceStore 等组件)。
gateway/
├── core/ # Gateway 核心服务(框架无关)
│ ├── registry.py # Agent 注册和在线状态管理
│ ├── router.py # 消息路由
│ └── client.py # Gateway 客户端
│
├── client/ # 通用客户端工具
│ ├── python/ # Python SDK
│ │ ├── tools.py # 通用工具函数
│ │ ├── client.py # GatewayClient
│ │ └── cli.py # CLI 工具
│ └── a2a_im.md # Agent Skill 文档
│
├── docs/ # 详细文档
│ ├── architecture.md
│ ├── deployment.md
│ ├── api.md
│ ├── decisions.md
│ └── enterprise/
│
└── enterprise/ # 企业功能(可选)
├── auth.py # 认证和授权
├── audit.py # 审计和监控
└── cost.py # 成本管理
examples/gateway_integration/ # 集成示例(项目根目录)
├── README.md
├── python_client_example.py # 通用 SDK 使用示例
├── agent_tools_example.py # Agent 框架集成示例
└── agent_skill_example.md # Agent 框架 Skill 示例
python gateway_server.py
服务器将在 http://localhost:8001 启动(默认端口)。
# 发送消息
gateway-cli send --from my-agent --to target-agent --message "Hello!"
# 查询在线 Agent
gateway-cli list
# 查询 Agent 状态
gateway-cli status target-agent
详见 gateway/client/a2a_im.md(Agent Skill 文档)。
from gateway.client.python import tools
# 发送消息
result = await tools.send_message(
gateway_url="http://localhost:8001",
from_agent_id="my-agent",
to_agent_id="target-agent",
message="Hello!"
)
# 查询在线 Agent
agents = await tools.list_online_agents(
gateway_url="http://localhost:8001"
)
# 查询 Agent 状态
status = await tools.get_agent_status(
gateway_url="http://localhost:8001",
agent_id="target-agent"
)
参考 examples/gateway_integration/ 中的示例:
# 在 agent 启动时注册 Gateway 工具
from examples.gateway_integration.agent_tools_example import (
send_to_agent,
get_active_collaborators,
get_online_agents,
get_agent_status
)
# 注册工具
tool_registry.register(send_to_agent)
tool_registry.register(get_active_collaborators)
tool_registry.register(get_online_agents)
tool_registry.register(get_agent_status)
# 加载 skill
skill_loader.load("gateway/client/a2a_im.md")
PC Agent 启动时通过 WebSocket 连接到 Gateway 并注册。
实现位置:gateway/core/registry.py:AgentRegistry
from gateway.client.python import GatewayClient, AgentCard
# 创建客户端并注册
async with GatewayClient(
gateway_url="http://localhost:8001",
agent_card=AgentCard(
agent_id="my-agent-001",
agent_name="My Agent",
capabilities=["search", "analyze"]
)
) as client:
# 发送消息
await client.send_message(
to_agent_id="target-agent",
content=[{"type": "text", "text": "Hello"}],
conversation_id="conv-123"
)
Gateway 根据目标 Agent ID 路由消息。
实现位置:gateway/core/router.py:GatewayRouter
# 发送消息
POST /gateway/send
{
"from_agent_id": "my-agent",
"to_agent_id": "target-agent",
"content": [{"type": "text", "text": "帮我分析代码"}]
}
通过心跳机制跟踪 Agent 在线状态。
实现位置:gateway/core/registry.py:AgentRegistry.heartbeat
# 查询在线状态
GET /gateway/status/{agent_id}
| 方法 | 路径 | 说明 |
|---|---|---|
| WS | /gateway/connect |
Agent 注册和保持连接 |
| POST | /gateway/send |
发送消息到其他 Agent |
| GET | /gateway/status/{agent_id} |
查询 Agent 在线状态 |
| GET | /gateway/agents |
列出所有在线 Agent |
实现位置:gateway/core/router.py:GatewayRouter
详见 API 文档。
# 一个进程运行 Agent + Gateway
python main.py
# 两个独立进程
python api_server.py # Agent Core
python gateway_server.py # Gateway
# 三个独立进程
python api_server.py # Agent Core
python gateway_server.py # Gateway Core
python enterprise_gateway.py # Enterprise Gateway
详见 部署指南。
参考 examples/gateway_integration/ 中的示例创建你自己的集成:
gateway/client/python/ 的通用 SDK 或 CLI 工具详见 集成示例文档。
module/file.py:function_namedocs/decisions.md另行记录当前建议:暂时不拆分
理由:
未来拆分时机(满足以下条件时考虑):
现在的准备:
当需要拆分时,只需:
gateway/ 目录移到新仓库examples/gateway_integration/ 在 Agent 项目中作为集成示例MIT