name: a2a-im description: 向其他 Agent 发送消息、查询在线 Agent、检查 Agent 状态、接收来自其他 Agent 的消息。当你需要与其他 Agent 通信或发现 Gateway 系统中的可用 Agent 时使用。
通过 Gateway 进行 Agent 间通信的框架无关工具。使用 CLI 命令或 Python SDK 发送消息、发现 Agent、协调多 Agent 工作流。支持通过 Webhook 接收来自其他 Agent 的消息。
Gateway 必须正在运行(默认:http://localhost:8001)。设置自定义 URL:
export GATEWAY_URL=http://your-gateway:8001
Agent 通过 Webhook 方式接收来自 Gateway 的消息:
其他 Agent → Gateway → HTTP POST → 本 Agent Webhook 端点 → 消息队列 → Context 提醒 → check_messages 工具
Gateway 推送消息
消息进入队列
A2AMessageQueue周期性提醒
💬 来自 xxx 的 N 条新消息(使用 check_messages 工具查看)LLM 决策
check_messages 工具查看查看消息
check_messages 工具获取完整消息内容在 Agent 服务中添加 webhook 端点:
# api_server.py
from agent.tools.builtin.a2a_im import A2AMessageQueue, create_a2a_context_hook
# 创建消息队列
message_queue = A2AMessageQueue()
# 创建 context hook(用于周期性提醒)
a2a_hook = create_a2a_context_hook(message_queue)
# 创建 Runner 时注入 hook
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
context_hooks=[a2a_hook]
)
# 添加 webhook 端点
@app.post("/webhook/a2a-messages")
async def receive_a2a_message(message: dict):
"""接收来自 Gateway 的消息"""
message_queue.push(message)
return {"status": "received"}
@tool(description="检查来自其他 Agent 的新消息")
async def check_messages(ctx: ToolContext) -> ToolResult:
"""
检查并获取来自其他 Agent 的新消息
当 context 中提示有新消息时,使用此工具查看详细内容。
"""
# 从消息队列获取所有消息
messages = message_queue.pop_all()
if not messages:
return ToolResult(title="无新消息", output="")
# 返回格式化的消息列表
return ToolResult(
title=f"收到 {len(messages)} 条新消息",
output=format_messages(messages)
)
Agent 执行过程中,每 10 轮自动注入:
## Current Plan
1. [in_progress] 分析代码架构
1.1. [completed] 读取项目结构
1.2. [in_progress] 分析核心模块
## Active Collaborators
- researcher [agent, completed]: 已完成调研
## Messages
💬 来自 code-reviewer 的 1 条新消息(使用 check_messages 工具查看)
LLM 看到提醒后可以调用 check_messages 工具:
收到 1 条新消息:
1. 来自 code-reviewer
对话 ID: conv-abc-123
时间: 2026-03-05T10:30:00
内容: 代码审查完成,发现 3 处需要优化的地方...
发送消息:
gateway-cli send --from my-agent --to target-agent --message "你好!"
列出在线 Agent:
gateway-cli list # 所有 Agent
gateway-cli list --type analyst # 按类型过滤
检查 Agent 状态:
gateway-cli status target-agent
继续对话:
gateway-cli send \
--from my-agent \
--to target-agent \
--message "继续之前的话题" \
--conversation-id conv-123
from gateway.client.python import tools
# 发送消息
result = await tools.send_message(
gateway_url="http://localhost:8001",
from_agent_id="my-agent",
to_agent_id="target-agent",
message="你好!"
)
# 返回: {"conversation_id": "conv-xxx", "message_id": "msg-xxx", "status": "sent"}
# 列出在线 Agent
agents = await tools.list_online_agents(
gateway_url="http://localhost:8001",
agent_type="analyst" # 可选过滤
)
# 检查 Agent 状态
status = await tools.get_agent_status(
gateway_url="http://localhost:8001",
agent_id="target-agent"
)
发送包含图片或其他媒体的文本:
result = await tools.send_multimodal_message(
gateway_url="http://localhost:8001",
from_agent_id="my-agent",
to_agent_id="target-agent",
content=[
{"type": "text", "text": "看这张图片"},
{"type": "image", "source": {"type": "url", "url": "https://..."}}
]
)
对于需要保持在线并接收消息的 Agent:
gateway-cli register \
--id my-agent \
--name "我的 Agent" \
--type analyst \
--capabilities search \
--capabilities analyze \
--description "我的分析 Agent"
或使用 Python:
client = await tools.register_agent(
gateway_url="http://localhost:8001",
agent_id="my-agent",
agent_name="我的 Agent",
agent_type="analyst",
capabilities=["search", "analyze"],
description="我的分析 Agent"
)
# 使用客户端进行操作
await client.send_message(...)
# 完成后断开连接
await client.disconnect()
# 查找可用的 Agent
gateway-cli list --type researcher
# 检查特定 Agent 是否在线
gateway-cli status research-agent-1
# 发送任务请求
gateway-cli send \
--from coordinator \
--to research-agent-1 \
--message "请研究最新的 AI 论文"
# 协调器 Agent 发现工作者
workers = await tools.list_online_agents(
gateway_url=gateway_url,
agent_type="worker"
)
# 分发任务
for i, worker in enumerate(workers):
await tools.send_message(
gateway_url=gateway_url,
from_agent_id="coordinator",
to_agent_id=worker["id"],
message=f"处理批次 {i}"
)
# 开始对话
result = await tools.send_message(
gateway_url=gateway_url,
from_agent_id="agent-a",
to_agent_id="agent-b",
message="开始新任务"
)
conv_id = result["conversation_id"]
# 在同一线程中继续
await tools.send_message(
gateway_url=gateway_url,
from_agent_id="agent-a",
to_agent_id="agent-b",
message="这是后续消息",
conversation_id=conv_id
)
发送文本消息。返回包含 conversation_id、message_id、status 的字典。
发送多模态消息(文本、图片等)。content 遵循 MAMP 格式。
查询在线 Agent。返回 Agent 信息字典列表。
检查 Agent 是否在线。返回状态信息。
向 Gateway 注册 Agent(长连接)。返回 GatewayClient 实例。
cd gateway
pip install -e .
# 设置 PYTHONPATH
export PYTHONPATH=/path/to/gateway:$PYTHONPATH
# 使用 Python 模块
python -m gateway.client.python.cli --help
安装后会提供:
from gateway.client.python import toolsgateway-cliGATEWAY_URL: Gateway 地址(默认: http://localhost:8001)如果你使用特定的 Agent 框架,可以将通用函数包装成框架工具。
示例:包装为框架工具
from gateway.client.python import tools
# 假设你的框架有 @tool 装饰器
@tool(description="发送消息到其他 Agent")
async def send_to_agent(
target_agent: str,
message: str,
gateway_url: str = "http://localhost:8001",
from_agent_id: str = None
) -> dict:
"""
发送消息到指定的 Agent
Args:
target_agent: 目标 Agent ID
message: 消息内容
gateway_url: Gateway 地址
from_agent_id: 发送方 Agent ID(通常从上下文获取)
Returns:
包含 conversation_id 和 message_id 的字典
"""
result = await tools.send_message(
gateway_url=gateway_url,
from_agent_id=from_agent_id,
to_agent_id=target_agent,
message=message
)
return result
@tool(description="查询在线 Agent")
async def list_online_agents(
agent_type: str = None,
gateway_url: str = "http://localhost:8001"
) -> list:
"""
查询当前在线的 Agent 列表
Args:
agent_type: 可选,按类型过滤
gateway_url: Gateway 地址
Returns:
Agent 信息列表
"""
agents = await tools.list_online_agents(
gateway_url=gateway_url,
agent_type=agent_type
)
return agents
@tool(description="检查 Agent 状态")
async def check_agent_status(
agent_id: str,
gateway_url: str = "http://localhost:8001"
) -> dict:
"""
检查指定 Agent 的在线状态
Args:
agent_id: Agent ID
gateway_url: Gateway 地址
Returns:
状态信息字典
"""
status = await tools.get_agent_status(
gateway_url=gateway_url,
agent_id=agent_id
)
return status
然后在你的框架中注册这些工具:
# 注册工具到框架
tool_registry.register(send_to_agent)
tool_registry.register(list_online_agents)
tool_registry.register(check_agent_status)
Gateway 使用 MAMP(Minimal Agent Message Protocol)进行消息传输。
{
"from_agent_id": "sender-id",
"to_agent_id": "receiver-id",
"conversation_id": "conv-123", # 可选,用于线程化对话
"content": [
{"type": "text", "text": "消息内容"},
{"type": "image", "source": {"type": "url", "url": "https://..."}}
],
"metadata": { # 可选
"priority": "high",
"task_type": "analysis"
}
}
注册时提供的 Agent 信息:
{
"agent_id": "my-agent-001",
"agent_name": "我的 Agent",
"agent_type": "analyst", # 可选:analyst, researcher, worker 等
"capabilities": ["search", "analyze", "summarize"],
"description": "专门用于数据分析的 Agent",
"metadata": { # 可选
"version": "1.0.0",
"owner": "team-a"
}
}
# 检查 Gateway 是否运行
curl http://localhost:8001/health
# 检查环境变量
echo $GATEWAY_URL
# 检查目标 Agent 是否在线
gateway-cli status target-agent
# 查看所有在线 Agent
gateway-cli list
# 重新安装
cd gateway
pip install -e .
# 或直接使用 Python 模块
python -m gateway.client.python.cli --help
from gateway.client.python import tools
async def distribute_tasks():
gateway_url = "http://localhost:8001"
# 1. 查找所有工作者
workers = await tools.list_online_agents(
gateway_url=gateway_url,
agent_type="worker"
)
print(f"找到 {len(workers)} 个工作者")
# 2. 分发任务
tasks = ["任务A", "任务B", "任务C"]
for i, task in enumerate(tasks):
worker = workers[i % len(workers)]
result = await tools.send_message(
gateway_url=gateway_url,
from_agent_id="coordinator",
to_agent_id=worker["id"],
message=f"请处理:{task}"
)
print(f"已发送任务到 {worker['name']}: {result['conversation_id']}")
from gateway.client.python import tools
async def collaborative_analysis():
gateway_url = "http://localhost:8001"
# 1. 开始对话
result = await tools.send_message(
gateway_url=gateway_url,
from_agent_id="analyst-1",
to_agent_id="analyst-2",
message="我发现了一个有趣的数据模式,你能帮我验证吗?"
)
conv_id = result["conversation_id"]
# 2. 发送详细信息(同一对话)
await tools.send_multimodal_message(
gateway_url=gateway_url,
from_agent_id="analyst-1",
to_agent_id="analyst-2",
conversation_id=conv_id,
content=[
{"type": "text", "text": "这是数据图表:"},
{"type": "image", "source": {"type": "url", "url": "https://example.com/chart.png"}}
]
)
print(f"对话 ID: {conv_id}")
# 终端 1:启动一个 Agent(模拟)
gateway-cli register \
--id worker-1 \
--name "工作者 1" \
--type worker \
--capabilities process \
--description "数据处理工作者"
# 终端 2:发送消息
gateway-cli send \
--from coordinator \
--to worker-1 \
--message "处理数据集 A"
# 终端 3:检查状态
gateway-cli status worker-1
gateway-cli list