--- name: a2a-im description: 向其他 Agent 发送消息、查询在线 Agent、检查 Agent 状态、接收来自其他 Agent 的消息。当你需要与其他 Agent 通信或发现 Gateway 系统中的可用 Agent 时使用。 allowed-tools: Bash(gateway-cli:*), check_messages --- # Agent 间即时通讯 通过 Gateway 进行 Agent 间通信的框架无关工具。使用 CLI 命令或 Python SDK 发送消息、发现 Agent、协调多 Agent 工作流。支持通过 Webhook 接收来自其他 Agent 的消息。 ## 前置条件 Gateway 必须正在运行(默认:http://localhost:8001)。设置自定义 URL: ```bash export GATEWAY_URL=http://your-gateway:8001 ``` ## 消息接收机制 ### 概述 Agent 通过 **Webhook** 方式接收来自 Gateway 的消息: ``` 其他 Agent → Gateway → HTTP POST → 本 Agent Webhook 端点 → 消息队列 → Context 提醒 → check_messages 工具 ``` ### 工作流程 1. **Gateway 推送消息** - 其他 Agent 发送消息到 Gateway - Gateway 通过 HTTP POST 推送到目标 Agent 的 webhook 端点 2. **消息进入队列** - Webhook 端点收到消息后放入 `A2AMessageQueue` - 不打断当前执行 3. **周期性提醒** - 每 10 轮 Agent 执行,自动注入消息提醒到 context - 格式:`💬 来自 xxx 的 N 条新消息(使用 check_messages 工具查看)` 4. **LLM 决策** - LLM 看到提醒后,可以选择: - 立即调用 `check_messages` 工具查看 - 完成当前子任务后再查看 - 根据任务优先级决定 5. **查看消息** - 调用 `check_messages` 工具获取完整消息内容 - 消息从队列中清空 ### 配置 Webhook 在 Agent 服务中添加 webhook 端点: ```python # api_server.py from agent.tools.builtin.a2a_im import A2AMessageQueue, create_a2a_context_hook # 创建消息队列 message_queue = A2AMessageQueue() # 创建 context hook(用于周期性提醒) a2a_hook = create_a2a_context_hook(message_queue) # 创建 Runner 时注入 hook runner = AgentRunner( llm_call=llm_call, trace_store=trace_store, context_hooks=[a2a_hook] ) # 添加 webhook 端点 @app.post("/webhook/a2a-messages") async def receive_a2a_message(message: dict): """接收来自 Gateway 的消息""" message_queue.push(message) return {"status": "received"} ``` ### check_messages 工具 ```python @tool(description="检查来自其他 Agent 的新消息") async def check_messages(ctx: ToolContext) -> ToolResult: """ 检查并获取来自其他 Agent 的新消息 当 context 中提示有新消息时,使用此工具查看详细内容。 """ # 从消息队列获取所有消息 messages = message_queue.pop_all() if not messages: return ToolResult(title="无新消息", output="") # 返回格式化的消息列表 return ToolResult( title=f"收到 {len(messages)} 条新消息", output=format_messages(messages) ) ``` ### 注入效果示例 Agent 执行过程中,每 10 轮自动注入: ```markdown ## Current Plan 1. [in_progress] 分析代码架构 1.1. [completed] 读取项目结构 1.2. [in_progress] 分析核心模块 ## Active Collaborators - researcher [agent, completed]: 已完成调研 ## Messages 💬 来自 code-reviewer 的 1 条新消息(使用 check_messages 工具查看) ``` LLM 看到提醒后可以调用 `check_messages` 工具: ``` 收到 1 条新消息: 1. 来自 code-reviewer 对话 ID: conv-abc-123 时间: 2026-03-05T10:30:00 内容: 代码审查完成,发现 3 处需要优化的地方... ``` ## 快速开始 ### CLI 命令 **发送消息:** ```bash gateway-cli send --from my-agent --to target-agent --message "你好!" ``` **列出在线 Agent:** ```bash gateway-cli list # 所有 Agent gateway-cli list --type analyst # 按类型过滤 ``` **检查 Agent 状态:** ```bash gateway-cli status target-agent ``` **继续对话:** ```bash gateway-cli send \ --from my-agent \ --to target-agent \ --message "继续之前的话题" \ --conversation-id conv-123 ``` ### Python SDK ```python from gateway.client.python import tools # 发送消息 result = await tools.send_message( gateway_url="http://localhost:8001", from_agent_id="my-agent", to_agent_id="target-agent", message="你好!" ) # 返回: {"conversation_id": "conv-xxx", "message_id": "msg-xxx", "status": "sent"} # 列出在线 Agent agents = await tools.list_online_agents( gateway_url="http://localhost:8001", agent_type="analyst" # 可选过滤 ) # 检查 Agent 状态 status = await tools.get_agent_status( gateway_url="http://localhost:8001", agent_id="target-agent" ) ``` ## 高级用法 ### 多模态消息 发送包含图片或其他媒体的文本: ```python result = await tools.send_multimodal_message( gateway_url="http://localhost:8001", from_agent_id="my-agent", to_agent_id="target-agent", content=[ {"type": "text", "text": "看这张图片"}, {"type": "image", "source": {"type": "url", "url": "https://..."}} ] ) ``` ### 注册 Agent(长连接) 对于需要保持在线并接收消息的 Agent: ```bash gateway-cli register \ --id my-agent \ --name "我的 Agent" \ --type analyst \ --capabilities search \ --capabilities analyze \ --description "我的分析 Agent" ``` 或使用 Python: ```python client = await tools.register_agent( gateway_url="http://localhost:8001", agent_id="my-agent", agent_name="我的 Agent", agent_type="analyst", capabilities=["search", "analyze"], description="我的分析 Agent" ) # 使用客户端进行操作 await client.send_message(...) # 完成后断开连接 await client.disconnect() ``` ## 常见模式 ### 发现并联系 Agent ```bash # 查找可用的 Agent gateway-cli list --type researcher # 检查特定 Agent 是否在线 gateway-cli status research-agent-1 # 发送任务请求 gateway-cli send \ --from coordinator \ --to research-agent-1 \ --message "请研究最新的 AI 论文" ``` ### 多 Agent 协调 ```python # 协调器 Agent 发现工作者 workers = await tools.list_online_agents( gateway_url=gateway_url, agent_type="worker" ) # 分发任务 for i, worker in enumerate(workers): await tools.send_message( gateway_url=gateway_url, from_agent_id="coordinator", to_agent_id=worker["id"], message=f"处理批次 {i}" ) ``` ### 对话线程 ```python # 开始对话 result = await tools.send_message( gateway_url=gateway_url, from_agent_id="agent-a", to_agent_id="agent-b", message="开始新任务" ) conv_id = result["conversation_id"] # 在同一线程中继续 await tools.send_message( gateway_url=gateway_url, from_agent_id="agent-a", to_agent_id="agent-b", message="这是后续消息", conversation_id=conv_id ) ``` ## API 参考 ### tools.send_message(gateway_url, from_agent_id, to_agent_id, message, conversation_id=None, metadata=None) 发送文本消息。返回包含 conversation_id、message_id、status 的字典。 ### tools.send_multimodal_message(gateway_url, from_agent_id, to_agent_id, content, conversation_id=None, metadata=None) 发送多模态消息(文本、图片等)。content 遵循 MAMP 格式。 ### tools.list_online_agents(gateway_url, agent_type=None) 查询在线 Agent。返回 Agent 信息字典列表。 ### tools.get_agent_status(gateway_url, agent_id) 检查 Agent 是否在线。返回状态信息。 ### tools.register_agent(gateway_url, agent_id, agent_name, agent_type=None, capabilities=None, description=None, metadata=None) 向 Gateway 注册 Agent(长连接)。返回 GatewayClient 实例。 ## 安装 ### 方式 1:从源码安装(开发模式) ```bash cd gateway pip install -e . ``` ### 方式 2:直接使用(无需安装) ```bash # 设置 PYTHONPATH export PYTHONPATH=/path/to/gateway:$PYTHONPATH # 使用 Python 模块 python -m gateway.client.python.cli --help ``` 安装后会提供: - Python SDK:`from gateway.client.python import tools` - CLI 工具:`gateway-cli` ## 环境变量 - `GATEWAY_URL`: Gateway 地址(默认: http://localhost:8001) ## 集成到 Agent 框架 如果你使用特定的 Agent 框架,可以将通用函数包装成框架工具。 **示例:包装为框架工具** ```python from gateway.client.python import tools # 假设你的框架有 @tool 装饰器 @tool(description="发送消息到其他 Agent") async def send_to_agent( target_agent: str, message: str, gateway_url: str = "http://localhost:8001", from_agent_id: str = None ) -> dict: """ 发送消息到指定的 Agent Args: target_agent: 目标 Agent ID message: 消息内容 gateway_url: Gateway 地址 from_agent_id: 发送方 Agent ID(通常从上下文获取) Returns: 包含 conversation_id 和 message_id 的字典 """ result = await tools.send_message( gateway_url=gateway_url, from_agent_id=from_agent_id, to_agent_id=target_agent, message=message ) return result @tool(description="查询在线 Agent") async def list_online_agents( agent_type: str = None, gateway_url: str = "http://localhost:8001" ) -> list: """ 查询当前在线的 Agent 列表 Args: agent_type: 可选,按类型过滤 gateway_url: Gateway 地址 Returns: Agent 信息列表 """ agents = await tools.list_online_agents( gateway_url=gateway_url, agent_type=agent_type ) return agents @tool(description="检查 Agent 状态") async def check_agent_status( agent_id: str, gateway_url: str = "http://localhost:8001" ) -> dict: """ 检查指定 Agent 的在线状态 Args: agent_id: Agent ID gateway_url: Gateway 地址 Returns: 状态信息字典 """ status = await tools.get_agent_status( gateway_url=gateway_url, agent_id=agent_id ) return status ``` 然后在你的框架中注册这些工具: ```python # 注册工具到框架 tool_registry.register(send_to_agent) tool_registry.register(list_online_agents) tool_registry.register(check_agent_status) ``` ## MAMP 协议 Gateway 使用 MAMP(Minimal Agent Message Protocol)进行消息传输。 ### 消息格式 ```python { "from_agent_id": "sender-id", "to_agent_id": "receiver-id", "conversation_id": "conv-123", # 可选,用于线程化对话 "content": [ {"type": "text", "text": "消息内容"}, {"type": "image", "source": {"type": "url", "url": "https://..."}} ], "metadata": { # 可选 "priority": "high", "task_type": "analysis" } } ``` ### Agent 卡片 注册时提供的 Agent 信息: ```python { "agent_id": "my-agent-001", "agent_name": "我的 Agent", "agent_type": "analyst", # 可选:analyst, researcher, worker 等 "capabilities": ["search", "analyze", "summarize"], "description": "专门用于数据分析的 Agent", "metadata": { # 可选 "version": "1.0.0", "owner": "team-a" } } ``` ## 故障排查 ### Gateway 连接失败 ```bash # 检查 Gateway 是否运行 curl http://localhost:8001/health # 检查环境变量 echo $GATEWAY_URL ``` ### 消息发送失败 ```bash # 检查目标 Agent 是否在线 gateway-cli status target-agent # 查看所有在线 Agent gateway-cli list ``` ### CLI 命令不可用 ```bash # 重新安装 cd gateway pip install -e . # 或直接使用 Python 模块 python -m gateway.client.python.cli --help ``` ## 完整示例 ### 示例 1:简单的任务分发 ```python from gateway.client.python import tools async def distribute_tasks(): gateway_url = "http://localhost:8001" # 1. 查找所有工作者 workers = await tools.list_online_agents( gateway_url=gateway_url, agent_type="worker" ) print(f"找到 {len(workers)} 个工作者") # 2. 分发任务 tasks = ["任务A", "任务B", "任务C"] for i, task in enumerate(tasks): worker = workers[i % len(workers)] result = await tools.send_message( gateway_url=gateway_url, from_agent_id="coordinator", to_agent_id=worker["id"], message=f"请处理:{task}" ) print(f"已发送任务到 {worker['name']}: {result['conversation_id']}") ``` ### 示例 2:对话式协作 ```python from gateway.client.python import tools async def collaborative_analysis(): gateway_url = "http://localhost:8001" # 1. 开始对话 result = await tools.send_message( gateway_url=gateway_url, from_agent_id="analyst-1", to_agent_id="analyst-2", message="我发现了一个有趣的数据模式,你能帮我验证吗?" ) conv_id = result["conversation_id"] # 2. 发送详细信息(同一对话) await tools.send_multimodal_message( gateway_url=gateway_url, from_agent_id="analyst-1", to_agent_id="analyst-2", conversation_id=conv_id, content=[ {"type": "text", "text": "这是数据图表:"}, {"type": "image", "source": {"type": "url", "url": "https://example.com/chart.png"}} ] ) print(f"对话 ID: {conv_id}") ``` ### 示例 3:使用 CLI 进行快速测试 ```bash # 终端 1:启动一个 Agent(模拟) gateway-cli register \ --id worker-1 \ --name "工作者 1" \ --type worker \ --capabilities process \ --description "数据处理工作者" # 终端 2:发送消息 gateway-cli send \ --from coordinator \ --to worker-1 \ --message "处理数据集 A" # 终端 3:检查状态 gateway-cli status worker-1 gateway-cli list ```