a2a-continuous-dialogue.md 20 KB

Agent2Agent 持续对话方案

更新日期: 2026-03-03

问题定义

单次任务 vs 持续对话

单次任务(之前的方案):

云端Agent: 请分析本地项目
    ↓
终端Agent: [执行分析] → 返回结果
    ↓
结束

持续对话(新需求):

云端Agent: 请分析本地项目
    ↓
终端Agent: 我看到有3个模块,你想重点分析哪个?
    ↓
云端Agent: 重点分析core模块
    ↓
终端Agent: core模块使用了X架构,需要我详细说明吗?
    ↓
云端Agent: 是的,请详细说明
    ↓
终端Agent: [详细分析] → 返回结果
    ↓
结束(或继续)

核心挑战

  1. 上下文延续 - 如何维护多轮对话的上下文?
  2. 状态管理 - 对话进行到哪一步?谁在等待谁?
  3. 消息路由 - 如何确保消息发送到正确的Agent?
  4. 会话生命周期 - 何时开始?何时结束?
  5. 异步通信 - Agent可能不在线,如何处理?

方案对比

方案1:基于Trace的持续对话(推荐)

核心思想

利用现有的Trace机制作为对话容器

  • 每个A2A对话创建一个共享的Trace
  • 双方Agent都可以向这个Trace追加消息
  • 通过WebSocket实时同步消息
  • 利用现有的续跑机制(continue_from

架构设计

云端Agent                共享Trace                终端Agent
    |                       |                         |
    | 创建对话              |                         |
    |--------------------->|                         |
    |                      |                         |
    | 发送消息1            |                         |
    |--------------------->|                         |
    |                      |----WebSocket推送------->|
    |                      |                         |
    |                      |<----追加消息2-----------|
    |<--WebSocket推送------|                         |
    |                      |                         |
    | 发送消息3            |                         |
    |--------------------->|                         |
    |                      |----WebSocket推送------->|
    |                      |                         |
    ...持续对话...

API设计

# 1. 创建对话会话
POST /api/a2a/sessions
{
    "participants": ["cloud-agent-123", "terminal-agent-456"],
    "initial_message": "请分析本地项目",
    "context": {...}
}

响应:
{
    "session_id": "sess-xxx",
    "trace_id": "trace-yyy",  # 底层使用Trace
    "ws_url": "wss://host/api/a2a/sessions/sess-xxx/stream"
}

# 2. 发送消息(追加到Trace)
POST /api/a2a/sessions/{session_id}/messages
{
    "from": "cloud-agent-123",
    "content": "重点分析core模块",
    "wait_for_response": true  # 是否等待对方回复
}

响应:
{
    "message_id": "msg-xxx",
    "status": "sent"
}

# 3. WebSocket监听(实时接收消息)
WS /api/a2a/sessions/{session_id}/stream
{
    "type": "message",
    "from": "terminal-agent-456",
    "content": "我看到有3个模块,你想重点分析哪个?",
    "message_id": "msg-yyy"
}

# 4. 获取对话历史
GET /api/a2a/sessions/{session_id}/messages
响应:
{
    "messages": [
        {"from": "cloud-agent-123", "content": "...", "timestamp": "..."},
        {"from": "terminal-agent-456", "content": "...", "timestamp": "..."},
        ...
    ]
}

# 5. 结束对话
POST /api/a2a/sessions/{session_id}/close
{
    "reason": "completed"
}

实现细节

# agent/api/a2a_session.py

class A2ASession:
    """A2A对话会话,基于Trace实现"""

    def __init__(self, session_id: str, trace_id: str, participants: List[str]):
        self.session_id = session_id
        self.trace_id = trace_id
        self.participants = participants
        self.ws_connections = {}  # agent_id -> WebSocket

    async def send_message(
        self,
        from_agent: str,
        content: str,
        wait_for_response: bool = False
    ) -> Dict[str, Any]:
        """发送消息到对话"""
        # 1. 追加消息到Trace
        messages = [{"role": "user", "content": content}]
        config = RunConfig(
            trace_id=self.trace_id,
            after_sequence=None,  # 续跑模式
            uid=from_agent
        )

        # 2. 执行(可能触发对方Agent的响应)
        async for event in runner.run(messages, config):
            if isinstance(event, Message):
                # 3. 通过WebSocket推送给其他参与者
                await self._broadcast_message(event, exclude=from_agent)

                if wait_for_response and event.role == "assistant":
                    # 等待对方回复
                    return {"message_id": event.message_id, "status": "sent"}

        return {"status": "completed"}

    async def _broadcast_message(self, message: Message, exclude: str = None):
        """广播消息给所有参与者(除了发送者)"""
        for agent_id, ws in self.ws_connections.items():
            if agent_id != exclude:
                await ws.send_json({
                    "type": "message",
                    "from": exclude,
                    "content": message.content,
                    "message_id": message.message_id,
                    "timestamp": message.created_at.isoformat()
                })


@app.post("/api/a2a/sessions")
async def create_session(request: CreateSessionRequest):
    """创建A2A对话会话"""
    # 1. 创建底层Trace
    trace = Trace(
        trace_id=generate_trace_id(),
        mode="agent",
        task=request.initial_message,
        agent_type="a2a_session",
        context={
            "session_type": "a2a",
            "participants": request.participants
        }
    )
    await store.create_trace(trace)

    # 2. 创建Session对象
    session_id = f"sess-{generate_id()}"
    session = A2ASession(session_id, trace.trace_id, request.participants)

    # 3. 存储Session(内存或Redis)
    sessions[session_id] = session

    # 4. 发送初始消息
    if request.initial_message:
        await session.send_message(
            from_agent=request.participants[0],
            content=request.initial_message
        )

    return {
        "session_id": session_id,
        "trace_id": trace.trace_id,
        "ws_url": f"wss://{host}/api/a2a/sessions/{session_id}/stream"
    }


@app.websocket("/api/a2a/sessions/{session_id}/stream")
async def session_stream(websocket: WebSocket, session_id: str):
    """WebSocket连接,实时接收对话消息"""
    await websocket.accept()

    # 1. 获取Session
    session = sessions.get(session_id)
    if not session:
        await websocket.close(code=404)
        return

    # 2. 识别连接的Agent
    agent_id = await authenticate_websocket(websocket)

    # 3. 注册WebSocket连接
    session.ws_connections[agent_id] = websocket

    try:
        # 4. 保持连接,接收消息
        async for message in websocket:
            data = json.loads(message)
            if data["type"] == "message":
                # 发送消息到对话
                await session.send_message(
                    from_agent=agent_id,
                    content=data["content"]
                )
    finally:
        # 5. 清理连接
        del session.ws_connections[agent_id]


@app.post("/api/a2a/sessions/{session_id}/messages")
async def send_session_message(session_id: str, request: SendMessageRequest):
    """发送消息到对话(HTTP方式)"""
    session = sessions.get(session_id)
    if not session:
        raise HTTPException(404, "Session not found")

    result = await session.send_message(
        from_agent=request.from_agent,
        content=request.content,
        wait_for_response=request.wait_for_response
    )

    return result

客户端SDK

# agent/client/a2a_session_client.py

class A2ASessionClient:
    """A2A持续对话客户端"""

    def __init__(self, base_url: str, agent_id: str, api_key: str):
        self.base_url = base_url
        self.agent_id = agent_id
        self.api_key = api_key
        self.ws = None
        self.message_handlers = []

    async def create_session(
        self,
        other_agent: str,
        initial_message: str
    ) -> str:
        """创建对话会话"""
        response = await self._post("/api/a2a/sessions", {
            "participants": [self.agent_id, other_agent],
            "initial_message": initial_message
        })

        session_id = response["session_id"]

        # 自动连接WebSocket
        await self._connect_websocket(session_id)

        return session_id

    async def _connect_websocket(self, session_id: str):
        """连接WebSocket接收消息"""
        ws_url = f"{self.ws_url}/api/a2a/sessions/{session_id}/stream"
        self.ws = await websockets.connect(
            ws_url,
            extra_headers={"Authorization": f"Bearer {self.api_key}"}
        )

        # 启动消息接收循环
        asyncio.create_task(self._receive_messages())

    async def _receive_messages(self):
        """接收WebSocket消息"""
        async for message in self.ws:
            data = json.loads(message)
            if data["type"] == "message":
                # 调用注册的消息处理器
                for handler in self.message_handlers:
                    await handler(data)

    async def send_message(self, session_id: str, content: str):
        """发送消息"""
        if self.ws:
            # 通过WebSocket发送(实时)
            await self.ws.send(json.dumps({
                "type": "message",
                "content": content
            }))
        else:
            # 通过HTTP发送(备用)
            await self._post(f"/api/a2a/sessions/{session_id}/messages", {
                "from_agent": self.agent_id,
                "content": content
            })

    def on_message(self, handler):
        """注册消息处理器"""
        self.message_handlers.append(handler)

    async def close_session(self, session_id: str):
        """关闭对话"""
        await self._post(f"/api/a2a/sessions/{session_id}/close", {})
        if self.ws:
            await self.ws.close()

使用示例

# 云端Agent使用
client = A2ASessionClient(
    base_url="https://org.agent.cloud",
    agent_id="cloud-agent-123",
    api_key="ak_xxx"
)

# 创建对话
session_id = await client.create_session(
    other_agent="terminal-agent-456",
    initial_message="请分析本地项目"
)

# 注册消息处理器
@client.on_message
async def handle_message(message):
    print(f"收到消息: {message['content']}")

    # 根据消息内容决定如何回复
    if "哪个模块" in message['content']:
        await client.send_message(session_id, "重点分析core模块")
    elif "需要我详细说明吗" in message['content']:
        await client.send_message(session_id, "是的,请详细说明")

# 等待对话完成
await asyncio.sleep(60)  # 或其他结束条件

# 关闭对话
await client.close_session(session_id)

优势

  1. 复用Trace机制 - 所有消息管理、压缩、存储都复用
  2. 完整历史 - 对话历史自动保存在Trace中
  3. 实时通信 - WebSocket保证低延迟
  4. 状态追踪 - 利用Trace的状态管理
  5. 可回溯 - 可以查看完整的对话历史

劣势

  1. Trace概念泄露 - 外部需要理解session_id和trace_id的关系
  2. 复杂度 - 需要管理WebSocket连接

方案2:独立的对话管理器

核心思想

创建独立的对话管理系统,不依赖Trace

class Conversation:
    """独立的对话对象"""
    conversation_id: str
    participants: List[str]
    messages: List[ConversationMessage]
    status: str  # active, waiting, completed
    created_at: datetime

class ConversationMessage:
    """对话消息"""
    message_id: str
    from_agent: str
    to_agent: Optional[str]  # None表示广播
    content: str
    timestamp: datetime
    metadata: Dict

优势

  1. 概念清晰 - 对话就是对话,不混淆Trace
  2. 轻量级 - 不需要Trace的重量级机制
  3. 灵活 - 可以自定义对话逻辑

劣势

  1. 重复实现 - 需要重新实现消息管理、存储、压缩
  2. 不一致 - 与现有Trace机制不一致
  3. 维护成本 - 需要维护两套系统

方案3:混合模式(推荐)

核心思想

对话层(Session)+ 执行层(Trace)分离

对话层(Session)
    - 管理对话状态
    - 路由消息
    - WebSocket连接
    |
    | 每条消息触发
    ↓
执行层(Trace)
    - 执行具体任务
    - 调用工具
    - 管理上下文

架构

class A2ASession:
    """对话会话(轻量级)"""
    session_id: str
    participants: List[str]
    current_speaker: str
    waiting_for: Optional[str]
    context: Dict  # 共享上下文
    message_queue: List[Message]

    async def send_message(self, from_agent: str, content: str):
        """发送消息"""
        # 1. 添加到消息队列
        self.message_queue.append(Message(from_agent, content))

        # 2. 如果需要执行(不是简单问答),创建Trace
        if self._needs_execution(content):
            trace_id = await self._create_execution_trace(content)
            # 执行完成后,结果自动添加到消息队列
        else:
            # 简单消息,直接转发
            await self._forward_message(content, to=self._get_other_agent(from_agent))

    def _needs_execution(self, content: str) -> bool:
        """判断是否需要创建Trace执行"""
        # 例如:包含工具调用、复杂任务等
        return "分析" in content or "执行" in content or "查询" in content

优势

  1. 分层清晰 - 对话管理和任务执行分离
  2. 灵活 - 简单消息不需要Trace,复杂任务才创建
  3. 高效 - 避免为每条消息创建Trace
  4. 复用 - 复杂任务仍然复用Trace机制

实现示例

@app.post("/api/a2a/sessions/{session_id}/messages")
async def send_message(session_id: str, request: SendMessageRequest):
    session = sessions[session_id]

    # 1. 判断消息类型
    if request.requires_execution:
        # 需要执行的任务 → 创建Trace
        trace_id = await create_execution_trace(
            task=request.content,
            parent_session=session_id,
            agent_id=request.from_agent
        )

        # 执行完成后,结果自动推送到Session
        result = await runner.run_result(
            messages=[{"role": "user", "content": request.content}],
            config=RunConfig(trace_id=trace_id)
        )

        # 将结果作为消息发送给对方
        await session.send_message(
            from_agent=request.from_agent,
            content=result["summary"]
        )
    else:
        # 简单消息 → 直接转发
        await session.send_message(
            from_agent=request.from_agent,
            content=request.content
        )

    return {"status": "sent"}

关键设计决策

1. 消息类型分类

类型 示例 处理方式
简单问答 "你好"、"收到"、"明白了" 直接转发,不创建Trace
信息查询 "当前进度如何?" 查询Session状态,返回
任务请求 "分析core模块" 创建Trace执行
工具调用 "读取文件X" 创建Trace执行

2. 上下文管理

Session级上下文(轻量):

session.context = {
    "current_topic": "项目分析",
    "focus_module": "core",
    "previous_results": {...}
}

Trace级上下文(完整):

  • 完整的消息历史
  • 工具调用记录
  • Goal树

3. 生命周期管理

# Session生命周期
created → active → waiting → active → ... → completed/timeout

# Trace生命周期(每个任务)
created → running → completed

4. 超时和重连

class A2ASession:
    timeout: int = 300  # 5分钟无活动则超时
    last_activity: datetime

    async def check_timeout(self):
        if datetime.now() - self.last_activity > timedelta(seconds=self.timeout):
            await self.close(reason="timeout")

    async def reconnect(self, agent_id: str, ws: WebSocket):
        """Agent重连"""
        self.ws_connections[agent_id] = ws
        # 发送未读消息
        await self._send_unread_messages(agent_id)

实现路线图

Phase 1:基础对话能力(2-3周)

  1. Session管理

    • 创建/关闭Session
    • 消息路由
    • WebSocket连接管理
  2. 简单消息转发

    • 不涉及Trace
    • 纯消息传递
  3. 客户端SDK

    • A2ASessionClient
    • 消息处理器

Phase 2:集成Trace执行(2-3周)

  1. 任务识别

    • 判断消息是否需要执行
    • 自动创建Trace
  2. 结果集成

    • Trace结果转换为消息
    • 自动推送给对方
  3. 上下文共享

    • Session上下文传递给Trace
    • Trace结果更新Session上下文

Phase 3:高级功能(3-4周)

  1. 多方对话

    • 支持3个以上Agent
    • 群聊模式
  2. 对话分支

    • 子对话
    • 并行对话
  3. 持久化和恢复

    • Session持久化
    • 断线重连

示例场景

场景:云端助理与终端Agent的持续对话

# 云端助理
client = A2ASessionClient("https://cloud", "cloud-agent", "ak_xxx")

# 1. 创建对话
session_id = await client.create_session(
    other_agent="terminal-agent-456",
    initial_message="请分析本地项目"
)

# 2. 注册消息处理器(自动响应)
@client.on_message
async def handle_message(msg):
    content = msg['content']

    if "哪个模块" in content:
        # 简单回复,不需要执行
        await client.send_message(session_id, "重点分析core模块")

    elif "详细说明" in content:
        # 需要进一步分析,触发执行
        await client.send_message(
            session_id,
            "是的,请详细说明架构设计和关键组件",
            requires_execution=True  # 标记需要执行
        )

# 3. 等待对话完成
await client.wait_for_completion(session_id)
# 终端Agent
client = A2ASessionClient("https://terminal", "terminal-agent-456", "ak_yyy")

# 1. 监听新对话
@client.on_new_session
async def handle_new_session(session_id, initial_message):
    # 分析项目
    modules = await analyze_project()

    # 询问用户
    await client.send_message(
        session_id,
        f"我看到有{len(modules)}个模块:{', '.join(modules)},你想重点分析哪个?"
    )

# 2. 处理后续消息
@client.on_message
async def handle_message(msg):
    if "core模块" in msg['content']:
        # 执行分析
        result = await analyze_module("core")

        # 返回结果并询问
        await client.send_message(
            msg['session_id'],
            f"core模块使用了{result['architecture']}架构,需要我详细说明吗?"
        )

    elif "详细说明" in msg['content']:
        # 深度分析
        details = await deep_analyze("core")
        await client.send_message(
            msg['session_id'],
            f"详细架构:\n{details}"
        )

总结

推荐方案:混合模式

对话层(Session):

  • 轻量级消息路由
  • WebSocket连接管理
  • 简单问答直接转发

执行层(Trace):

  • 复杂任务创建Trace
  • 复用所有现有能力
  • 结果自动集成到对话

关键优势

  1. 灵活 - 简单消息不需要Trace开销
  2. 强大 - 复杂任务复用Trace能力
  3. 清晰 - 对话和执行分层
  4. 高效 - 避免不必要的资源消耗

实现优先级

  1. Phase 1 - 基础Session + 简单消息(MVP)
  2. Phase 2 - 集成Trace执行(核心能力)
  3. Phase 3 - 高级功能(按需)