a2a_im.md 14 KB


name: a2a-im description: 向其他 Agent 发送消息、查询在线 Agent、检查 Agent 状态、接收来自其他 Agent 的消息。当你需要与其他 Agent 通信或发现 Gateway 系统中的可用 Agent 时使用。

allowed-tools: Bash(gateway-cli:*), check_messages

Agent 间即时通讯

通过 Gateway 进行 Agent 间通信的框架无关工具。使用 CLI 命令或 Python SDK 发送消息、发现 Agent、协调多 Agent 工作流。支持通过 Webhook 接收来自其他 Agent 的消息。

前置条件

Gateway 必须正在运行(默认:http://localhost:8001)。设置自定义 URL:

export GATEWAY_URL=http://your-gateway:8001

消息接收机制

概述

Agent 通过 Webhook 方式接收来自 Gateway 的消息:

其他 Agent → Gateway → HTTP POST → 本 Agent Webhook 端点 → 消息队列 → Context 提醒 → check_messages 工具

工作流程

  1. Gateway 推送消息

    • 其他 Agent 发送消息到 Gateway
    • Gateway 通过 HTTP POST 推送到目标 Agent 的 webhook 端点
  2. 消息进入队列

    • Webhook 端点收到消息后放入 A2AMessageQueue
    • 不打断当前执行
  3. 周期性提醒

    • 每 10 轮 Agent 执行,自动注入消息提醒到 context
    • 格式:💬 来自 xxx 的 N 条新消息(使用 check_messages 工具查看)
  4. LLM 决策

    • LLM 看到提醒后,可以选择:
      • 立即调用 check_messages 工具查看
      • 完成当前子任务后再查看
      • 根据任务优先级决定
  5. 查看消息

    • 调用 check_messages 工具获取完整消息内容
    • 消息从队列中清空

配置 Webhook

在 Agent 服务中添加 webhook 端点:

# api_server.py

from agent.tools.builtin.a2a_im import A2AMessageQueue, create_a2a_context_hook

# 创建消息队列
message_queue = A2AMessageQueue()

# 创建 context hook(用于周期性提醒)
a2a_hook = create_a2a_context_hook(message_queue)

# 创建 Runner 时注入 hook
runner = AgentRunner(
    llm_call=llm_call,
    trace_store=trace_store,
    context_hooks=[a2a_hook]
)

# 添加 webhook 端点
@app.post("/webhook/a2a-messages")
async def receive_a2a_message(message: dict):
    """接收来自 Gateway 的消息"""
    message_queue.push(message)
    return {"status": "received"}

check_messages 工具

@tool(description="检查来自其他 Agent 的新消息")
async def check_messages(ctx: ToolContext) -> ToolResult:
    """
    检查并获取来自其他 Agent 的新消息

    当 context 中提示有新消息时,使用此工具查看详细内容。
    """
    # 从消息队列获取所有消息
    messages = message_queue.pop_all()

    if not messages:
        return ToolResult(title="无新消息", output="")

    # 返回格式化的消息列表
    return ToolResult(
        title=f"收到 {len(messages)} 条新消息",
        output=format_messages(messages)
    )

注入效果示例

Agent 执行过程中,每 10 轮自动注入:

## Current Plan
1. [in_progress] 分析代码架构
   1.1. [completed] 读取项目结构
   1.2. [in_progress] 分析核心模块

## Active Collaborators
- researcher [agent, completed]: 已完成调研

## Messages
💬 来自 code-reviewer 的 1 条新消息(使用 check_messages 工具查看)

LLM 看到提醒后可以调用 check_messages 工具:

收到 1 条新消息:

1. 来自 code-reviewer
   对话 ID: conv-abc-123
   时间: 2026-03-05T10:30:00
   内容: 代码审查完成,发现 3 处需要优化的地方...

快速开始

CLI 命令

发送消息:

gateway-cli send --from my-agent --to target-agent --message "你好!"

列出在线 Agent:

gateway-cli list                    # 所有 Agent
gateway-cli list --type analyst     # 按类型过滤

检查 Agent 状态:

gateway-cli status target-agent

继续对话:

gateway-cli send \
  --from my-agent \
  --to target-agent \
  --message "继续之前的话题" \
  --conversation-id conv-123

Python SDK

from gateway.client.python import tools

# 发送消息
result = await tools.send_message(
    gateway_url="http://localhost:8001",
    from_agent_id="my-agent",
    to_agent_id="target-agent",
    message="你好!"
)
# 返回: {"conversation_id": "conv-xxx", "message_id": "msg-xxx", "status": "sent"}

# 列出在线 Agent
agents = await tools.list_online_agents(
    gateway_url="http://localhost:8001",
    agent_type="analyst"  # 可选过滤
)

# 检查 Agent 状态
status = await tools.get_agent_status(
    gateway_url="http://localhost:8001",
    agent_id="target-agent"
)

高级用法

多模态消息

发送包含图片或其他媒体的文本:

result = await tools.send_multimodal_message(
    gateway_url="http://localhost:8001",
    from_agent_id="my-agent",
    to_agent_id="target-agent",
    content=[
        {"type": "text", "text": "看这张图片"},
        {"type": "image", "source": {"type": "url", "url": "https://..."}}
    ]
)

注册 Agent(长连接)

对于需要保持在线并接收消息的 Agent:

gateway-cli register \
  --id my-agent \
  --name "我的 Agent" \
  --type analyst \
  --capabilities search \
  --capabilities analyze \
  --description "我的分析 Agent"

或使用 Python:

client = await tools.register_agent(
    gateway_url="http://localhost:8001",
    agent_id="my-agent",
    agent_name="我的 Agent",
    agent_type="analyst",
    capabilities=["search", "analyze"],
    description="我的分析 Agent"
)

# 使用客户端进行操作
await client.send_message(...)

# 完成后断开连接
await client.disconnect()

常见模式

发现并联系 Agent

# 查找可用的 Agent
gateway-cli list --type researcher

# 检查特定 Agent 是否在线
gateway-cli status research-agent-1

# 发送任务请求
gateway-cli send \
  --from coordinator \
  --to research-agent-1 \
  --message "请研究最新的 AI 论文"

多 Agent 协调

# 协调器 Agent 发现工作者
workers = await tools.list_online_agents(
    gateway_url=gateway_url,
    agent_type="worker"
)

# 分发任务
for i, worker in enumerate(workers):
    await tools.send_message(
        gateway_url=gateway_url,
        from_agent_id="coordinator",
        to_agent_id=worker["id"],
        message=f"处理批次 {i}"
    )

对话线程

# 开始对话
result = await tools.send_message(
    gateway_url=gateway_url,
    from_agent_id="agent-a",
    to_agent_id="agent-b",
    message="开始新任务"
)

conv_id = result["conversation_id"]

# 在同一线程中继续
await tools.send_message(
    gateway_url=gateway_url,
    from_agent_id="agent-a",
    to_agent_id="agent-b",
    message="这是后续消息",
    conversation_id=conv_id
)

API 参考

tools.send_message(gateway_url, from_agent_id, to_agent_id, message, conversation_id=None, metadata=None)

发送文本消息。返回包含 conversation_id、message_id、status 的字典。

tools.send_multimodal_message(gateway_url, from_agent_id, to_agent_id, content, conversation_id=None, metadata=None)

发送多模态消息(文本、图片等)。content 遵循 MAMP 格式。

tools.list_online_agents(gateway_url, agent_type=None)

查询在线 Agent。返回 Agent 信息字典列表。

tools.get_agent_status(gateway_url, agent_id)

检查 Agent 是否在线。返回状态信息。

tools.register_agent(gateway_url, agent_id, agent_name, agent_type=None, capabilities=None, description=None, metadata=None)

向 Gateway 注册 Agent(长连接)。返回 GatewayClient 实例。

安装

方式 1:从源码安装(开发模式)

cd gateway
pip install -e .

方式 2:直接使用(无需安装)

# 设置 PYTHONPATH
export PYTHONPATH=/path/to/gateway:$PYTHONPATH

# 使用 Python 模块
python -m gateway.client.python.cli --help

安装后会提供:

  • Python SDK:from gateway.client.python import tools
  • CLI 工具:gateway-cli

环境变量

  • GATEWAY_URL: Gateway 地址(默认: http://localhost:8001)

集成到 Agent 框架

如果你使用特定的 Agent 框架,可以将通用函数包装成框架工具。

示例:包装为框架工具

from gateway.client.python import tools

# 假设你的框架有 @tool 装饰器
@tool(description="发送消息到其他 Agent")
async def send_to_agent(
    target_agent: str,
    message: str,
    gateway_url: str = "http://localhost:8001",
    from_agent_id: str = None
) -> dict:
    """
    发送消息到指定的 Agent

    Args:
        target_agent: 目标 Agent ID
        message: 消息内容
        gateway_url: Gateway 地址
        from_agent_id: 发送方 Agent ID(通常从上下文获取)

    Returns:
        包含 conversation_id 和 message_id 的字典
    """
    result = await tools.send_message(
        gateway_url=gateway_url,
        from_agent_id=from_agent_id,
        to_agent_id=target_agent,
        message=message
    )
    return result

@tool(description="查询在线 Agent")
async def list_online_agents(
    agent_type: str = None,
    gateway_url: str = "http://localhost:8001"
) -> list:
    """
    查询当前在线的 Agent 列表

    Args:
        agent_type: 可选,按类型过滤
        gateway_url: Gateway 地址

    Returns:
        Agent 信息列表
    """
    agents = await tools.list_online_agents(
        gateway_url=gateway_url,
        agent_type=agent_type
    )
    return agents

@tool(description="检查 Agent 状态")
async def check_agent_status(
    agent_id: str,
    gateway_url: str = "http://localhost:8001"
) -> dict:
    """
    检查指定 Agent 的在线状态

    Args:
        agent_id: Agent ID
        gateway_url: Gateway 地址

    Returns:
        状态信息字典
    """
    status = await tools.get_agent_status(
        gateway_url=gateway_url,
        agent_id=agent_id
    )
    return status

然后在你的框架中注册这些工具:

# 注册工具到框架
tool_registry.register(send_to_agent)
tool_registry.register(list_online_agents)
tool_registry.register(check_agent_status)

MAMP 协议

Gateway 使用 MAMP(Minimal Agent Message Protocol)进行消息传输。

消息格式

{
    "from_agent_id": "sender-id",
    "to_agent_id": "receiver-id",
    "conversation_id": "conv-123",  # 可选,用于线程化对话
    "content": [
        {"type": "text", "text": "消息内容"},
        {"type": "image", "source": {"type": "url", "url": "https://..."}}
    ],
    "metadata": {  # 可选
        "priority": "high",
        "task_type": "analysis"
    }
}

Agent 卡片

注册时提供的 Agent 信息:

{
    "agent_id": "my-agent-001",
    "agent_name": "我的 Agent",
    "agent_type": "analyst",  # 可选:analyst, researcher, worker 等
    "capabilities": ["search", "analyze", "summarize"],
    "description": "专门用于数据分析的 Agent",
    "metadata": {  # 可选
        "version": "1.0.0",
        "owner": "team-a"
    }
}

故障排查

Gateway 连接失败

# 检查 Gateway 是否运行
curl http://localhost:8001/health

# 检查环境变量
echo $GATEWAY_URL

消息发送失败

# 检查目标 Agent 是否在线
gateway-cli status target-agent

# 查看所有在线 Agent
gateway-cli list

CLI 命令不可用

# 重新安装
cd gateway
pip install -e .

# 或直接使用 Python 模块
python -m gateway.client.python.cli --help

完整示例

示例 1:简单的任务分发

from gateway.client.python import tools

async def distribute_tasks():
    gateway_url = "http://localhost:8001"

    # 1. 查找所有工作者
    workers = await tools.list_online_agents(
        gateway_url=gateway_url,
        agent_type="worker"
    )

    print(f"找到 {len(workers)} 个工作者")

    # 2. 分发任务
    tasks = ["任务A", "任务B", "任务C"]
    for i, task in enumerate(tasks):
        worker = workers[i % len(workers)]
        result = await tools.send_message(
            gateway_url=gateway_url,
            from_agent_id="coordinator",
            to_agent_id=worker["id"],
            message=f"请处理:{task}"
        )
        print(f"已发送任务到 {worker['name']}: {result['conversation_id']}")

示例 2:对话式协作

from gateway.client.python import tools

async def collaborative_analysis():
    gateway_url = "http://localhost:8001"

    # 1. 开始对话
    result = await tools.send_message(
        gateway_url=gateway_url,
        from_agent_id="analyst-1",
        to_agent_id="analyst-2",
        message="我发现了一个有趣的数据模式,你能帮我验证吗?"
    )

    conv_id = result["conversation_id"]

    # 2. 发送详细信息(同一对话)
    await tools.send_multimodal_message(
        gateway_url=gateway_url,
        from_agent_id="analyst-1",
        to_agent_id="analyst-2",
        conversation_id=conv_id,
        content=[
            {"type": "text", "text": "这是数据图表:"},
            {"type": "image", "source": {"type": "url", "url": "https://example.com/chart.png"}}
        ]
    )

    print(f"对话 ID: {conv_id}")

示例 3:使用 CLI 进行快速测试

# 终端 1:启动一个 Agent(模拟)
gateway-cli register \
  --id worker-1 \
  --name "工作者 1" \
  --type worker \
  --capabilities process \
  --description "数据处理工作者"

# 终端 2:发送消息
gateway-cli send \
  --from coordinator \
  --to worker-1 \
  --message "处理数据集 A"

# 终端 3:检查状态
gateway-cli status worker-1
gateway-cli list