更新日期: 2026-03-03
示例: 通用助理 → 爬虫运维Agent
agent工具(内存调用)示例:
需求:
挑战:
需要HTTP接口!
示例: 团队成员的Agent互相协作
云端Agent 终端Agent
| |
| HTTP POST /api/a2a/call |
|--------------------------->|
| |
| 创建Trace + 执行任务 |
| |
| WebSocket /api/a2a/watch |
|<---------------------------|
| 实时进度推送 |
| |
| HTTP GET /api/a2a/result |
|--------------------------->|
| 返回最终结果 |
|<---------------------------|
1. 简化的A2A端点
# agent/api/a2a.py
@app.post("/api/a2a/call")
async def a2a_call(request: A2ACallRequest):
"""
简化的Agent调用接口
请求:
{
"task": "分析这个项目的架构",
"agent_type": "explore", # 可选
"context": { # 可选
"files": [...],
"previous_results": {...}
},
"callback_url": "https://..." # 可选,完成后回调
}
响应:
{
"call_id": "a2a-xxx",
"status": "started",
"watch_url": "ws://host/api/a2a/watch/a2a-xxx"
}
"""
# 1. 认证和授权检查
# 2. 转换为内部格式
messages = [{"role": "user", "content": request.task}]
if request.context:
messages[0]["content"] += f"\n\n上下文:{json.dumps(request.context)}"
# 3. 调用现有runner(复用所有逻辑)
config = RunConfig(
agent_type=request.agent_type or "default",
trace_id=None # 新建
)
# 4. 后台执行
task_id = await start_background_task(runner.run(messages, config))
# 5. 返回call_id(映射到trace_id)
return {
"call_id": f"a2a-{task_id}",
"status": "started",
"watch_url": f"ws://{host}/api/a2a/watch/{task_id}"
}
@app.websocket("/api/a2a/watch/{call_id}")
async def a2a_watch(websocket: WebSocket, call_id: str):
"""
实时监听执行进度(复用现有WebSocket)
推送消息:
{
"type": "progress",
"data": {
"goal": "正在分析文件结构",
"progress": 0.3
}
}
{
"type": "completed",
"data": {
"result": "...",
"stats": {...}
}
}
"""
# 复用现有的 /api/traces/{id}/watch 逻辑
trace_id = call_id.replace("a2a-", "")
await watch_trace(websocket, trace_id)
@app.get("/api/a2a/result/{call_id}")
async def a2a_result(call_id: str):
"""
获取执行结果
响应:
{
"status": "completed",
"result": {
"summary": "...",
"details": {...}
},
"stats": {
"duration_ms": 5000,
"tokens": 1500,
"cost": 0.05
}
}
"""
trace_id = call_id.replace("a2a-", "")
trace = await store.get_trace(trace_id)
messages = await store.get_main_path_messages(trace_id, trace.head_sequence)
# 提取最后的assistant消息作为结果
result = extract_final_result(messages)
return {
"status": trace.status,
"result": result,
"stats": {
"duration_ms": trace.total_duration_ms,
"tokens": trace.total_tokens,
"cost": trace.total_cost
}
}
2. 客户端SDK(终端Agent使用)
# agent/client/a2a_client.py
class A2AClient:
"""A2A客户端,用于调用远程Agent"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
async def call(
self,
task: str,
agent_type: Optional[str] = None,
context: Optional[dict] = None,
wait: bool = True
) -> Dict[str, Any]:
"""
调用远程Agent
Args:
task: 任务描述
agent_type: Agent类型
context: 上下文信息
wait: 是否等待完成(False则立即返回call_id)
"""
# 1. 发起调用
response = await self._post("/api/a2a/call", {
"task": task,
"agent_type": agent_type,
"context": context
})
call_id = response["call_id"]
if not wait:
return {"call_id": call_id, "status": "started"}
# 2. 等待完成(通过WebSocket或轮询)
result = await self._wait_for_completion(call_id)
return result
async def _wait_for_completion(self, call_id: str):
"""通过WebSocket监听完成"""
async with websockets.connect(
f"{self.ws_url}/api/a2a/watch/{call_id}",
extra_headers={"Authorization": f"Bearer {self.api_key}"}
) as ws:
async for message in ws:
data = json.loads(message)
if data["type"] == "completed":
return data["data"]
elif data["type"] == "failed":
raise A2AError(data["data"]["error"])
async def get_result(self, call_id: str) -> Dict[str, Any]:
"""获取执行结果(轮询方式)"""
return await self._get(f"/api/a2a/result/{call_id}")
3. 作为工具集成到Agent
# agent/tools/builtin/remote_agent.py
@tool(description="调用远程Agent执行任务")
async def remote_agent(
task: str,
agent_url: str,
agent_type: Optional[str] = None,
context: Optional[dict] = None,
ctx: ToolContext = None
) -> ToolResult:
"""
调用远程Agent(云端或其他终端)
Args:
task: 任务描述
agent_url: 远程Agent的URL
agent_type: Agent类型
context: 上下文信息
"""
# 1. 创建客户端
client = A2AClient(
base_url=agent_url,
api_key=ctx.config.get("remote_agent_api_key")
)
# 2. 调用远程Agent
result = await client.call(
task=task,
agent_type=agent_type,
context=context,
wait=True
)
# 3. 返回结果
return ToolResult(
title=f"远程Agent完成: {task[:50]}",
output=result["result"]["summary"],
long_term_memory=f"调用远程Agent完成任务,耗时{result['stats']['duration_ms']}ms"
)
# agent/api/a2a_standard.py
@app.post("/api/a2a/v1/tasks")
async def create_task(request: A2ATaskRequest):
"""
符合Google A2A协议的端点
请求格式(A2A标准):
{
"header": {
"message_id": "msg_001",
"timestamp": "2026-03-03T10:30:00Z"
},
"task": {
"description": "分析项目架构",
"capabilities_required": ["file_read", "code_analysis"]
},
"context": {...}
}
"""
# 转换A2A格式到内部格式
# 调用runner
# 转换结果为A2A格式
云端Gateway
|
+-------+-------+
| | |
通用 爬虫 成本
助理 运维 统计
|
+-- 调用终端Agent(HTTP)
|
用户终端Agent
特点:
用户终端Agent
|
| WebSocket长连接
|
云端Gateway
|
+-- 通过连接推送任务
特点:
云端Agent <--HTTP--> 云端Agent(内存调用)
|
| WebSocket双向
|
终端Agent <--HTTP--> 终端Agent(如果需要)
特点:
# 终端Agent启动时注册
POST /api/a2a/register
{
"agent_id": "user123-laptop",
"capabilities": ["file_read", "bash", "browser"],
"device_info": {...}
}
# 返回API Key
{
"api_key": "ak_xxx",
"agent_id": "user123-laptop"
}
# 后续调用携带API Key
Authorization: Bearer ak_xxx
# config/a2a_permissions.yaml
agents:
user123-laptop:
can_access:
- conversations/user123/*
- resources/public/*
cannot_access:
- conversations/other_users/*
- agents/*/memory/*
目标: 云端Agent ↔ 终端Agent基础通信
实现:
/api/a2a/call - 简化调用接口/api/a2a/watch - WebSocket监听/api/a2a/result - 获取结果A2AClient - 客户端SDKremote_agent - 工具集成时间: 1-2周
目标: 完善A2A能力
实现:
时间: 2-3周
目标: 兼容A2A标准协议
实现:
时间: 3-4周
1. 用户在飞书问: "帮我分析一下我笔记本上的项目架构"
2. 云端通用助理:
# 识别需要访问用户终端
result = await remote_agent(
task="分析 /Users/sunlit/Code/Agent 的项目架构",
agent_url="https://user123-laptop.agent.local",
agent_type="explore"
)
3. 终端Agent:
4. 云端助理:
1. 用户在终端运行:
agent-cli ask "公司的爬虫部署规范是什么?"
2. 终端Agent:
# 识别需要查询组织知识库
result = await remote_agent(
task="查询爬虫部署规范",
agent_url="https://org.agent.cloud",
agent_type="knowledge_query"
)
3. 云端知识库Agent:
4. 终端Agent:
方案A:终端主动连接(推荐)
# 终端Agent启动时建立WebSocket长连接
ws = await websockets.connect("wss://org.agent.cloud/api/a2a/connect")
# 云端通过连接推送任务
await ws.send(json.dumps({
"type": "task",
"task_id": "xxx",
"data": {...}
}))
# 终端执行并返回结果
result = await execute_task(task)
await ws.send(json.dumps({
"type": "result",
"task_id": "xxx",
"data": result
}))
方案B:使用ngrok等隧道服务
# 简化格式(内部使用)
{
"task": "string",
"context": {...}
}
# 标准A2A格式(外部互操作)
{
"header": {...},
"task": {...},
"capabilities": [...]
}
# 自动转换
def to_a2a_format(internal_msg):
return {
"header": generate_header(),
"task": {"description": internal_msg["task"]},
"context": internal_msg.get("context", {})
}
# 支持流式返回中间结果
@app.websocket("/api/a2a/stream/{call_id}")
async def a2a_stream(websocket: WebSocket, call_id: str):
async for event in runner.run(...):
if isinstance(event, Message):
await websocket.send_json({
"type": "message",
"data": event.to_dict()
})
# 每次A2A调用记录成本
{
"call_id": "a2a-xxx",
"caller": "user123-laptop",
"callee": "org-cloud-agent",
"tokens": 1500,
"cost": 0.05,
"duration_ms": 5000
}
# 限额检查
if user_cost_today > user_limit:
raise CostLimitExceeded()
Phase 1(MVP): 方案A - 基于现有API封装
Phase 3+: 可选实现标准A2A协议