更新日期: 2026-03-03
单次任务(之前的方案):
云端Agent: 请分析本地项目
↓
终端Agent: [执行分析] → 返回结果
↓
结束
持续对话(新需求):
云端Agent: 请分析本地项目
↓
终端Agent: 我看到有3个模块,你想重点分析哪个?
↓
云端Agent: 重点分析core模块
↓
终端Agent: core模块使用了X架构,需要我详细说明吗?
↓
云端Agent: 是的,请详细说明
↓
终端Agent: [详细分析] → 返回结果
↓
结束(或继续)
利用现有的Trace机制作为对话容器
continue_from)云端Agent 共享Trace 终端Agent
| | |
| 创建对话 | |
|--------------------->| |
| | |
| 发送消息1 | |
|--------------------->| |
| |----WebSocket推送------->|
| | |
| |<----追加消息2-----------|
|<--WebSocket推送------| |
| | |
| 发送消息3 | |
|--------------------->| |
| |----WebSocket推送------->|
| | |
...持续对话...
# 1. 创建对话会话
POST /api/a2a/sessions
{
"participants": ["cloud-agent-123", "terminal-agent-456"],
"initial_message": "请分析本地项目",
"context": {...}
}
响应:
{
"session_id": "sess-xxx",
"trace_id": "trace-yyy", # 底层使用Trace
"ws_url": "wss://host/api/a2a/sessions/sess-xxx/stream"
}
# 2. 发送消息(追加到Trace)
POST /api/a2a/sessions/{session_id}/messages
{
"from": "cloud-agent-123",
"content": "重点分析core模块",
"wait_for_response": true # 是否等待对方回复
}
响应:
{
"message_id": "msg-xxx",
"status": "sent"
}
# 3. WebSocket监听(实时接收消息)
WS /api/a2a/sessions/{session_id}/stream
{
"type": "message",
"from": "terminal-agent-456",
"content": "我看到有3个模块,你想重点分析哪个?",
"message_id": "msg-yyy"
}
# 4. 获取对话历史
GET /api/a2a/sessions/{session_id}/messages
响应:
{
"messages": [
{"from": "cloud-agent-123", "content": "...", "timestamp": "..."},
{"from": "terminal-agent-456", "content": "...", "timestamp": "..."},
...
]
}
# 5. 结束对话
POST /api/a2a/sessions/{session_id}/close
{
"reason": "completed"
}
# agent/api/a2a_session.py
class A2ASession:
"""A2A对话会话,基于Trace实现"""
def __init__(self, session_id: str, trace_id: str, participants: List[str]):
self.session_id = session_id
self.trace_id = trace_id
self.participants = participants
self.ws_connections = {} # agent_id -> WebSocket
async def send_message(
self,
from_agent: str,
content: str,
wait_for_response: bool = False
) -> Dict[str, Any]:
"""发送消息到对话"""
# 1. 追加消息到Trace
messages = [{"role": "user", "content": content}]
config = RunConfig(
trace_id=self.trace_id,
after_sequence=None, # 续跑模式
uid=from_agent
)
# 2. 执行(可能触发对方Agent的响应)
async for event in runner.run(messages, config):
if isinstance(event, Message):
# 3. 通过WebSocket推送给其他参与者
await self._broadcast_message(event, exclude=from_agent)
if wait_for_response and event.role == "assistant":
# 等待对方回复
return {"message_id": event.message_id, "status": "sent"}
return {"status": "completed"}
async def _broadcast_message(self, message: Message, exclude: str = None):
"""广播消息给所有参与者(除了发送者)"""
for agent_id, ws in self.ws_connections.items():
if agent_id != exclude:
await ws.send_json({
"type": "message",
"from": exclude,
"content": message.content,
"message_id": message.message_id,
"timestamp": message.created_at.isoformat()
})
@app.post("/api/a2a/sessions")
async def create_session(request: CreateSessionRequest):
"""创建A2A对话会话"""
# 1. 创建底层Trace
trace = Trace(
trace_id=generate_trace_id(),
mode="agent",
task=request.initial_message,
agent_type="a2a_session",
context={
"session_type": "a2a",
"participants": request.participants
}
)
await store.create_trace(trace)
# 2. 创建Session对象
session_id = f"sess-{generate_id()}"
session = A2ASession(session_id, trace.trace_id, request.participants)
# 3. 存储Session(内存或Redis)
sessions[session_id] = session
# 4. 发送初始消息
if request.initial_message:
await session.send_message(
from_agent=request.participants[0],
content=request.initial_message
)
return {
"session_id": session_id,
"trace_id": trace.trace_id,
"ws_url": f"wss://{host}/api/a2a/sessions/{session_id}/stream"
}
@app.websocket("/api/a2a/sessions/{session_id}/stream")
async def session_stream(websocket: WebSocket, session_id: str):
"""WebSocket连接,实时接收对话消息"""
await websocket.accept()
# 1. 获取Session
session = sessions.get(session_id)
if not session:
await websocket.close(code=404)
return
# 2. 识别连接的Agent
agent_id = await authenticate_websocket(websocket)
# 3. 注册WebSocket连接
session.ws_connections[agent_id] = websocket
try:
# 4. 保持连接,接收消息
async for message in websocket:
data = json.loads(message)
if data["type"] == "message":
# 发送消息到对话
await session.send_message(
from_agent=agent_id,
content=data["content"]
)
finally:
# 5. 清理连接
del session.ws_connections[agent_id]
@app.post("/api/a2a/sessions/{session_id}/messages")
async def send_session_message(session_id: str, request: SendMessageRequest):
"""发送消息到对话(HTTP方式)"""
session = sessions.get(session_id)
if not session:
raise HTTPException(404, "Session not found")
result = await session.send_message(
from_agent=request.from_agent,
content=request.content,
wait_for_response=request.wait_for_response
)
return result
# agent/client/a2a_session_client.py
class A2ASessionClient:
"""A2A持续对话客户端"""
def __init__(self, base_url: str, agent_id: str, api_key: str):
self.base_url = base_url
self.agent_id = agent_id
self.api_key = api_key
self.ws = None
self.message_handlers = []
async def create_session(
self,
other_agent: str,
initial_message: str
) -> str:
"""创建对话会话"""
response = await self._post("/api/a2a/sessions", {
"participants": [self.agent_id, other_agent],
"initial_message": initial_message
})
session_id = response["session_id"]
# 自动连接WebSocket
await self._connect_websocket(session_id)
return session_id
async def _connect_websocket(self, session_id: str):
"""连接WebSocket接收消息"""
ws_url = f"{self.ws_url}/api/a2a/sessions/{session_id}/stream"
self.ws = await websockets.connect(
ws_url,
extra_headers={"Authorization": f"Bearer {self.api_key}"}
)
# 启动消息接收循环
asyncio.create_task(self._receive_messages())
async def _receive_messages(self):
"""接收WebSocket消息"""
async for message in self.ws:
data = json.loads(message)
if data["type"] == "message":
# 调用注册的消息处理器
for handler in self.message_handlers:
await handler(data)
async def send_message(self, session_id: str, content: str):
"""发送消息"""
if self.ws:
# 通过WebSocket发送(实时)
await self.ws.send(json.dumps({
"type": "message",
"content": content
}))
else:
# 通过HTTP发送(备用)
await self._post(f"/api/a2a/sessions/{session_id}/messages", {
"from_agent": self.agent_id,
"content": content
})
def on_message(self, handler):
"""注册消息处理器"""
self.message_handlers.append(handler)
async def close_session(self, session_id: str):
"""关闭对话"""
await self._post(f"/api/a2a/sessions/{session_id}/close", {})
if self.ws:
await self.ws.close()
# 云端Agent使用
client = A2ASessionClient(
base_url="https://org.agent.cloud",
agent_id="cloud-agent-123",
api_key="ak_xxx"
)
# 创建对话
session_id = await client.create_session(
other_agent="terminal-agent-456",
initial_message="请分析本地项目"
)
# 注册消息处理器
@client.on_message
async def handle_message(message):
print(f"收到消息: {message['content']}")
# 根据消息内容决定如何回复
if "哪个模块" in message['content']:
await client.send_message(session_id, "重点分析core模块")
elif "需要我详细说明吗" in message['content']:
await client.send_message(session_id, "是的,请详细说明")
# 等待对话完成
await asyncio.sleep(60) # 或其他结束条件
# 关闭对话
await client.close_session(session_id)
创建独立的对话管理系统,不依赖Trace
class Conversation:
"""独立的对话对象"""
conversation_id: str
participants: List[str]
messages: List[ConversationMessage]
status: str # active, waiting, completed
created_at: datetime
class ConversationMessage:
"""对话消息"""
message_id: str
from_agent: str
to_agent: Optional[str] # None表示广播
content: str
timestamp: datetime
metadata: Dict
对话层(Session)+ 执行层(Trace)分离
对话层(Session)
- 管理对话状态
- 路由消息
- WebSocket连接
|
| 每条消息触发
↓
执行层(Trace)
- 执行具体任务
- 调用工具
- 管理上下文
class A2ASession:
"""对话会话(轻量级)"""
session_id: str
participants: List[str]
current_speaker: str
waiting_for: Optional[str]
context: Dict # 共享上下文
message_queue: List[Message]
async def send_message(self, from_agent: str, content: str):
"""发送消息"""
# 1. 添加到消息队列
self.message_queue.append(Message(from_agent, content))
# 2. 如果需要执行(不是简单问答),创建Trace
if self._needs_execution(content):
trace_id = await self._create_execution_trace(content)
# 执行完成后,结果自动添加到消息队列
else:
# 简单消息,直接转发
await self._forward_message(content, to=self._get_other_agent(from_agent))
def _needs_execution(self, content: str) -> bool:
"""判断是否需要创建Trace执行"""
# 例如:包含工具调用、复杂任务等
return "分析" in content or "执行" in content or "查询" in content
@app.post("/api/a2a/sessions/{session_id}/messages")
async def send_message(session_id: str, request: SendMessageRequest):
session = sessions[session_id]
# 1. 判断消息类型
if request.requires_execution:
# 需要执行的任务 → 创建Trace
trace_id = await create_execution_trace(
task=request.content,
parent_session=session_id,
agent_id=request.from_agent
)
# 执行完成后,结果自动推送到Session
result = await runner.run_result(
messages=[{"role": "user", "content": request.content}],
config=RunConfig(trace_id=trace_id)
)
# 将结果作为消息发送给对方
await session.send_message(
from_agent=request.from_agent,
content=result["summary"]
)
else:
# 简单消息 → 直接转发
await session.send_message(
from_agent=request.from_agent,
content=request.content
)
return {"status": "sent"}
| 类型 | 示例 | 处理方式 |
|---|---|---|
| 简单问答 | "你好"、"收到"、"明白了" | 直接转发,不创建Trace |
| 信息查询 | "当前进度如何?" | 查询Session状态,返回 |
| 任务请求 | "分析core模块" | 创建Trace执行 |
| 工具调用 | "读取文件X" | 创建Trace执行 |
Session级上下文(轻量):
session.context = {
"current_topic": "项目分析",
"focus_module": "core",
"previous_results": {...}
}
Trace级上下文(完整):
# Session生命周期
created → active → waiting → active → ... → completed/timeout
# Trace生命周期(每个任务)
created → running → completed
class A2ASession:
timeout: int = 300 # 5分钟无活动则超时
last_activity: datetime
async def check_timeout(self):
if datetime.now() - self.last_activity > timedelta(seconds=self.timeout):
await self.close(reason="timeout")
async def reconnect(self, agent_id: str, ws: WebSocket):
"""Agent重连"""
self.ws_connections[agent_id] = ws
# 发送未读消息
await self._send_unread_messages(agent_id)
Session管理
简单消息转发
客户端SDK
A2ASessionClient任务识别
结果集成
上下文共享
多方对话
对话分支
持久化和恢复
# 云端助理
client = A2ASessionClient("https://cloud", "cloud-agent", "ak_xxx")
# 1. 创建对话
session_id = await client.create_session(
other_agent="terminal-agent-456",
initial_message="请分析本地项目"
)
# 2. 注册消息处理器(自动响应)
@client.on_message
async def handle_message(msg):
content = msg['content']
if "哪个模块" in content:
# 简单回复,不需要执行
await client.send_message(session_id, "重点分析core模块")
elif "详细说明" in content:
# 需要进一步分析,触发执行
await client.send_message(
session_id,
"是的,请详细说明架构设计和关键组件",
requires_execution=True # 标记需要执行
)
# 3. 等待对话完成
await client.wait_for_completion(session_id)
# 终端Agent
client = A2ASessionClient("https://terminal", "terminal-agent-456", "ak_yyy")
# 1. 监听新对话
@client.on_new_session
async def handle_new_session(session_id, initial_message):
# 分析项目
modules = await analyze_project()
# 询问用户
await client.send_message(
session_id,
f"我看到有{len(modules)}个模块:{', '.join(modules)},你想重点分析哪个?"
)
# 2. 处理后续消息
@client.on_message
async def handle_message(msg):
if "core模块" in msg['content']:
# 执行分析
result = await analyze_module("core")
# 返回结果并询问
await client.send_message(
msg['session_id'],
f"core模块使用了{result['architecture']}架构,需要我详细说明吗?"
)
elif "详细说明" in msg['content']:
# 深度分析
details = await deep_analyze("core")
await client.send_message(
msg['session_id'],
f"详细架构:\n{details}"
)
对话层(Session):
执行层(Trace):