更新日期: 2026-03-04
module/file.py:function_namedocs/decisions.md另行记录本文档描述 Agent 间即时通讯(A2A IM)系统的架构和实现。
相关文档:
A2A IM 是一个任务导向的 Agent 即时通讯系统,支持:
与传统 IM 的区别:
A2A IM 在整体架构中的定位:
┌─────────────────────────────────────────────────────────────┐
│ Enterprise Layer(组织级)- 可选 │
│ - 认证和授权(飞书 OAuth、API Key、JWT) │
│ - 审计和监控(操作日志、成本记录、安全事件) │
│ - 多租户和权限控制(角色验证、资源访问控制) │
│ - 成本管理和限额(用户级/组织级限额、超限告警) │
│ │
│ 实现位置: gateway/enterprise/ │
│ 文档: gateway/docs/enterprise/overview.md │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ A2A IM Gateway(通讯层)★ 本文档 │
│ - Agent 注册和发现(Registry) │
│ - 消息路由(Gateway Router) │
│ - 活跃协作者管理(Collaborators) │
│ - 在线状态管理(Heartbeat) │
│ - 联系人管理(ContactStore) │
│ │
│ 实现位置: gateway/core/ │
│ 文档: docs/a2a-im.md(本文档) │
└─────────────────────────────────────────────────────────────┘
↕ 使用(单向依赖)
┌─────────────────────────────────────────────────────────────┐
│ Agent Core(核心层) │
│ - Trace、Message、Goal 管理 │
│ - 工具系统(文件、命令、网络、浏览器) │
│ - LLM 集成(Gemini、OpenRouter、Yescode) │
│ - Skills 和 Memory(跨会话知识) │
│ - 子 Agent 机制(agent 工具) │
│ │
│ 实现位置: agent/ │
│ 文档: docs/README.md │
└─────────────────────────────────────────────────────────────┘
Agent Core(核心层):
agent 工具)A2A IM Gateway(通讯层):
Enterprise(组织层):
Enterprise → Gateway → Agent Core
(可选) (通讯) (核心)
- Agent Core 不依赖任何其他层(独立)
- Gateway 依赖 Agent Core(单向依赖)
- Enterprise 依赖 Gateway(可选扩展)
方式 1:单体部署(个人/小团队)
一个进程:
├─ Agent Core
└─ Gateway(包含 Enterprise 模块)
方式 2:分离部署(中等规模)
进程 1:Agent Core
进程 2:Gateway(包含 Enterprise 模块)
方式 3:分层部署(大规模/企业)
进程 1:Agent Core
进程 2:Gateway Core
进程 3:Enterprise Gateway
┌─────────────────────────────────────────────────┐
│ Layer 3: Agent 逻辑层 │
│ - Trace, Goal, Messages │
│ - 工具调用和执行 │
└─────────────────────────────────────────────────┘
↕
┌─────────────────────────────────────────────────┐
│ Layer 2: A2A IM 层 │
│ - 活跃协作者管理 │
│ - 全局联系人管理 │
│ - conversation_id ↔ trace_id 映射 │
└─────────────────────────────────────────────────┘
↕
┌─────────────────────────────────────────────────┐
│ Layer 1: Gateway 层 │
│ - Agent 注册和发现 │
│ - 消息路由 │
│ - 在线状态管理 │
│ - WebSocket 长连接 │
└─────────────────────────────────────────────────┘
模式 1:内部 Agent(同进程)
Agent A → 直接调用 → Agent B
(复用现有 agent 工具)
模式 2:跨设备 Agent(组织内)
PC Agent → WebSocket → Gateway → 云端 Agent
(反向连接,无需公网 IP)
模式 3:外部 Agent(跨组织)
Agent A → MAMP 协议 → Agent B
(点对点 HTTP)
存储在 trace.context["collaborators"],记录当前任务的协作者。
{
"name": "code-analyst",
"type": "agent", # agent | human
"agent_uri": "agent://other.com/code-analyst",
"trace_id": "abc-123",
"conversation_id": "conv-456",
"status": "running", # running | waiting | completed | failed
"summary": "正在分析代码架构",
"last_message_at": "2026-03-04T10:30:00Z"
}
实现位置:agent/core/runner.py:AgentRunner._build_context_injection
存储在 .trace/contacts.json,记录所有历史联系过的 Agent。
{
"agent_uri": "agent://other.com/code-analyst",
"name": "Code Analyst",
"type": "agent",
# 身份信息(从 Agent Card 获取)
"card": {
"description": "专注于代码分析",
"capabilities": ["code_analysis", "file_read"],
"owner": {"user_name": "张三"}
},
# 交互统计
"stats": {
"first_contact": "2026-02-01T10:00:00Z",
"last_contact": "2026-03-04T10:30:00Z",
"total_conversations": 15,
"total_messages": 127
},
# 最近对话
"recent_conversations": [
{
"conversation_id": "conv-456",
"trace_id": "abc-123",
"started_at": "2026-03-04T10:00:00Z",
"last_message": "分析完成",
"status": "active"
}
],
# 关系标签
"tags": ["code", "architecture"],
"pinned": false
}
实现位置:agent/trace/contact_store.py
存储在 Gateway,记录在线 Agent 的连接信息。
{
"agent_uri": "agent://internal/code-analyst",
"connection_type": "websocket", # websocket | http
"websocket": <WebSocket>, # WebSocket 连接对象
"http_endpoint": "http://localhost:8001", # HTTP 端点
"last_heartbeat": "2026-03-04T10:30:00Z",
"capabilities": ["code_analysis", "file_read"]
}
实现位置:gateway/core/registry.py
PC Agent 启动时注册:
# 建立 WebSocket 长连接
ws = await websockets.connect("wss://gateway.com/gateway/connect")
# 注册
await ws.send(json.dumps({
"type": "register",
"agent_uri": "agent://internal/my-agent",
"capabilities": ["file_read", "bash"]
}))
# 保持心跳
while True:
await ws.send(json.dumps({"type": "heartbeat"}))
await asyncio.sleep(30)
实现位置:gateway/core/client.py
通过 Gateway 发送消息:
# 发送方
POST /gateway/send
{
"to": "agent://internal/code-analyst",
"content": "帮我分析代码"
}
# Gateway 查找目标 Agent
agent_info = registry.lookup("agent://internal/code-analyst")
# 通过 WebSocket 推送
await agent_info["websocket"].send(json.dumps({
"type": "message",
"from": "agent://internal/caller",
"content": "帮我分析代码"
}))
实现位置:gateway/core/router.py
发送消息时自动更新:
# agent/tools/builtin/a2a_im.py
async def send_to_agent(...):
# 发送消息
response = await gateway_client.send(...)
# 更新活跃协作者
await update_active_collaborator(
trace_id=ctx.trace_id,
agent_uri=target_agent,
conversation_id=response["conversation_id"],
status="waiting"
)
周期性注入到 Agent 上下文:
# agent/core/runner.py
if iteration % 10 == 0:
collaborators = trace.context.get("collaborators", [])
inject_collaborators_markdown(collaborators)
查询联系人:
# 通过工具查询
contacts = await get_contacts(
type="agent",
status="online",
tags=["code"]
)
自动维护:
# 发送/接收消息时自动更新
await contact_store.update(
agent_uri=target_agent,
last_contact=datetime.now(),
increment_message_count=True
)
实现位置:agent/trace/contact_store.py
查询 Agent 在线状态:
GET /gateway/status/{agent_uri}
返回:
{
"agent_uri": "agent://internal/code-analyst",
"status": "online", # online | offline
"last_seen": "2026-03-04T10:30:00Z"
}
实现位置:gateway/core/router.py:get_agent_status
| 方法 | 路径 | 说明 |
|---|---|---|
| WS | /gateway/connect |
Agent 注册和保持连接 |
| POST | /gateway/send |
发送消息到其他 Agent |
| GET | /gateway/status/{agent_uri} |
查询 Agent 在线状态 |
| GET | /gateway/agents |
列出所有在线 Agent |
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/traces/{id}/collaborators |
查询活跃协作者 |
| GET | /api/contacts |
查询全局联系人 |
| GET | /api/contacts/{agent_uri} |
查询特定联系人详情 |
| GET | /api/contacts/{agent_uri}/conversations |
查询对话历史 |
发送消息到其他 Agent(内部或外部)。
@tool(description="发送消息到其他 Agent")
async def send_to_agent(
target_agent: str, # agent://domain/id
message: str,
conversation_id: Optional[str] = None,
ctx: ToolContext = None
) -> ToolResult
实现位置:agent/tools/builtin/a2a_im.py:send_to_agent
查询当前任务的活跃协作者。
@tool(description="查询当前任务的活跃协作者")
async def get_active_collaborators(
ctx: ToolContext
) -> ToolResult
实现位置:agent/tools/builtin/a2a_im.py:get_active_collaborators
查询全局联系人列表。
@tool(description="查询所有联系过的 Agent")
async def get_contacts(
type: Optional[str] = None, # agent | human
status: Optional[str] = None, # online | offline
tags: Optional[List[str]] = None,
ctx: ToolContext = None
) -> ToolResult
实现位置:agent/tools/builtin/a2a_im.py:get_contacts
提供 A2A IM 使用指南,注入到 Agent 的 system prompt。
内容:
实现位置:agent/memory/skills/a2a_im.md
# Agent A 需要代码分析帮助
result = await send_to_agent(
target_agent="agent://internal/code-analyst",
message="帮我分析 /path/to/project 的架构"
)
# 继续对话
result2 = await send_to_agent(
target_agent="agent://internal/code-analyst",
message="重点分析 core 模块",
conversation_id=result["conversation_id"]
)
# 查看当前任务中有哪些 Agent 在协作
collaborators = await get_active_collaborators()
# 输出:
# ## 活跃协作者
# - code-analyst [agent, completed]: 分析完成,发现3个问题
# - test-runner [agent, running]: 正在运行测试
# 查找擅长代码分析的 Agent
contacts = await get_contacts(
type="agent",
tags=["code", "architecture"]
)
# 输出:
# ## 联系人列表
# 🟢 code-analyst - agent://internal/code-analyst
# 最后联系: 2026-03-04 10:30
# 对话次数: 15
问题:Gateway 应该放在 agent/ 内部还是与 agent/ 并列?
决策:与 agent/ 并列
理由:
实现:
gateway/ 与 agent/ 并列from gateway.core import ...问题:Enterprise 应该是 Gateway 的上层(分层架构)还是 Gateway 的模块(模块化架构)?
决策:根据阶段选择
MVP 阶段(当前):模块化架构
大规模阶段(未来):可选分层架构
理由:
实现:
gateway/enterprise/ 作为可选模块enterprise_gateway/ 服务问题:活跃协作者信息应该如何存储和管理?
决策:存储在 trace.context["collaborators"],由工具自动维护
理由:
实现位置:
trace.context["collaborators"]agent/core/runner.py:AgentRunner._build_context_injectionagent/tools/builtin/a2a_im.py:_update_active_collaborator目标:实现核心通信能力
任务:
实现位置:
gateway/core/agent/tools/builtin/a2a_im.pyagent/memory/skills/a2a_im.md目标:完善联系人和历史记录
任务:
实现位置:
agent/trace/contact_store.pyagent/tools/builtin/a2a_im.py目标:提升用户体验
任务: