""" Trace 控制 API — 新建 / 运行 / 停止 / 反思 提供 POST 端点触发 Agent 执行和控制。需要通过 set_runner() 注入 AgentRunner 实例。 执行在后台异步进行,客户端通过 WebSocket (/api/traces/{trace_id}/watch) 监听实时更新。 端点: POST /api/traces — 新建 Trace 并执行 POST /api/traces/{id}/run — 运行(统一续跑 + 回溯) POST /api/traces/{id}/stop — 停止运行中的 Trace POST /api/traces/{id}/reflect — 反思,在 trace 末尾追加反思 prompt 运行,结果追加到 experiences 文件 GET /api/traces/running — 列出正在运行的 Trace GET /api/experiences — 读取经验文件内容 """ import asyncio import logging import os from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/traces", tags=["run"]) # 经验 API 使用独立 prefix experiences_router = APIRouter(prefix="/api", tags=["experiences"]) # ===== 全局 Runner(由 api_server.py 注入)===== _runner = None def set_runner(runner): """注入 AgentRunner 实例""" global _runner _runner = runner def _get_runner(): if _runner is None: raise HTTPException( status_code=503, detail="AgentRunner not configured. Server is in read-only mode.", ) return _runner # ===== Request / Response 模型 ===== class CreateRequest(BaseModel): """新建执行""" messages: List[Dict[str, Any]] = Field( ..., description="OpenAI SDK 格式的输入消息。可包含 system + user 消息;若无 system 消息则从 skills 自动构建", ) model: str = Field("gpt-4o", description="模型名称") temperature: float = Field(0.3) max_iterations: int = Field(200) tools: Optional[List[str]] = Field(None, description="工具白名单(None = 全部)") name: Optional[str] = Field(None, description="任务名称(None = 自动生成)") uid: Optional[str] = Field(None) class TraceRunRequest(BaseModel): """运行(统一续跑 + 回溯)""" messages: List[Dict[str, Any]] = Field( default_factory=list, description="追加的新消息(可为空,用于重新生成场景)", ) after_message_id: Optional[str] = Field( None, description="从哪条消息后续跑。None = 从末尾续跑,message_id = 从该消息后运行(自动判断续跑/回溯)", ) class ReflectRequest(BaseModel): """反思请求""" focus: Optional[str] = Field(None, description="反思重点(可选)") class RunResponse(BaseModel): """操作响应(立即返回,后台执行)""" trace_id: str status: str = "started" message: str = "" class StopResponse(BaseModel): """停止响应""" trace_id: str status: str # "stopping" | "not_running" class ReflectResponse(BaseModel): """反思响应""" trace_id: str reflection: str # ===== 后台执行 ===== _running_tasks: Dict[str, asyncio.Task] = {} async def _run_in_background(trace_id: str, messages: List[Dict], config): """后台执行 agent,消费 run() 的所有 yield""" runner = _get_runner() try: async for _item in runner.run(messages=messages, config=config): pass # WebSocket 广播由 runner 内部的 store 事件驱动 except Exception as e: logger.error(f"Background run failed for {trace_id}: {e}") finally: _running_tasks.pop(trace_id, None) async def _run_with_trace_signal( messages: List[Dict], config, trace_id_future: asyncio.Future, ): """后台执行 agent,通过 Future 将 trace_id 传回给等待的 endpoint""" from agent.trace.models import Trace runner = _get_runner() trace_id: Optional[str] = None try: async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace) and not trace_id_future.done(): trace_id = item.trace_id trace_id_future.set_result(trace_id) except Exception as e: if not trace_id_future.done(): trace_id_future.set_exception(e) logger.error(f"Background run failed: {e}") finally: if trace_id: _running_tasks.pop(trace_id, None) # ===== 路由 ===== @router.post("", response_model=RunResponse) async def create_and_run(req: CreateRequest): """ 新建 Trace 并开始执行 立即返回 trace_id,后台异步执行。 通过 WebSocket /api/traces/{trace_id}/watch 监听实时更新。 """ from agent.core.runner import RunConfig _get_runner() # 验证 Runner 已配置 config = RunConfig( model=req.model, temperature=req.temperature, max_iterations=req.max_iterations, tools=req.tools, name=req.name, uid=req.uid, ) # 启动后台执行,通过 Future 等待 trace_id(Phase 1 完成后即返回) trace_id_future: asyncio.Future[str] = asyncio.get_running_loop().create_future() task = asyncio.create_task( _run_with_trace_signal(req.messages, config, trace_id_future) ) trace_id = await trace_id_future _running_tasks[trace_id] = task return RunResponse( trace_id=trace_id, status="started", message=f"Execution started. Watch via WebSocket: /api/traces/{trace_id}/watch", ) async def _cleanup_incomplete_tool_calls(store, trace_id: str, after_sequence: int) -> int: """ 找到安全的插入点,保证不会把新消息插在一个不完整的工具调用序列中间。 场景: 1. after_sequence 刚好是一条带 tool_calls 的 assistant 消息, 但其部分/全部 tool response 还没生成 → 回退到该 assistant 之前。 2. after_sequence 是某条 tool response,但同一批 tool_calls 中 还有其他 response 未生成 → 回退到该 assistant 之前。 核心逻辑:从 after_sequence 往前找,定位到包含它的那条 assistant 消息, 检查该 assistant 的所有 tool_calls 是否都有对应的 tool response。 如果不完整,就把截断点回退到该 assistant 消息之前(即其 parent_sequence)。 Args: store: TraceStore trace_id: Trace ID after_sequence: 用户指定的插入位置 Returns: 调整后的安全截断点(<= after_sequence) """ all_messages = await store.get_trace_messages(trace_id) if not all_messages: return after_sequence by_seq = {msg.sequence: msg for msg in all_messages} target = by_seq.get(after_sequence) if target is None: return after_sequence # 找到"所属的 assistant 消息": # - 如果 target 本身是 assistant → 就是它 # - 如果 target 是 tool → 沿 parent_sequence 往上找 assistant assistant_msg = None if target.role == "assistant": assistant_msg = target elif target.role == "tool": cur = target while cur and cur.role == "tool": parent_seq = cur.parent_sequence cur = by_seq.get(parent_seq) if parent_seq is not None else None if cur and cur.role == "assistant": assistant_msg = cur if assistant_msg is None: return after_sequence # 该 assistant 是否带 tool_calls? content = assistant_msg.content if not isinstance(content, dict) or not content.get("tool_calls"): return after_sequence # 收集所有 tool_call_ids expected_ids = set() for tc in content["tool_calls"]: if isinstance(tc, dict) and tc.get("id"): expected_ids.add(tc["id"]) if not expected_ids: return after_sequence # 查找已有的 tool responses found_ids = set() for msg in all_messages: if msg.role == "tool" and msg.tool_call_id in expected_ids: found_ids.add(msg.tool_call_id) missing = expected_ids - found_ids if not missing: # 全部 tool response 都在,这是一个完整的序列 return after_sequence # 不完整 → 回退到 assistant 之前 safe = assistant_msg.parent_sequence if safe is None: # assistant 已经是第一条消息,没有更早的位置 safe = assistant_msg.sequence - 1 logger.info( "检测到不完整的工具调用 (assistant seq=%d, 缺少 %d/%d tool responses)," "自动回退插入点:%d -> %d", assistant_msg.sequence, len(missing), len(expected_ids), after_sequence, safe, ) return safe def _parse_sequence_from_message_id(message_id: str) -> int: """从 message_id 末尾解析 sequence 整数(格式:{trace_id}-{sequence:04d})""" try: return int(message_id.rsplit("-", 1)[-1]) except (ValueError, IndexError): raise HTTPException( status_code=422, detail=f"Invalid after_message_id format: {message_id!r}", ) @router.post("/{trace_id}/run", response_model=RunResponse) async def run_trace(trace_id: str, req: TraceRunRequest): """ 运行已有 Trace(统一续跑 + 回溯) - after_message_id 为 null(或省略):从末尾续跑 - after_message_id 为 message_id 字符串:从该消息后运行(Runner 自动判断续跑/回溯) - messages 为空 + after_message_id 有值:重新生成(从该位置重跑,不插入新消息) **自动清理不完整工具调用**: 如果人工插入 message 的位置打断了一个工具调用过程(assistant 消息有 tool_calls 但缺少对应的 tool responses),框架会自动检测并调整插入位置,确保不会产生不一致的状态。 """ from agent.core.runner import RunConfig runner = _get_runner() # 将 message_id 转换为内部使用的 sequence 整数 after_sequence: Optional[int] = None if req.after_message_id is not None: after_sequence = _parse_sequence_from_message_id(req.after_message_id) # 验证 trace 存在 if runner.trace_store: trace = await runner.trace_store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}") # 自动检查并清理不完整的工具调用 if after_sequence is not None and req.messages: adjusted_seq = await _cleanup_incomplete_tool_calls( runner.trace_store, trace_id, after_sequence ) if adjusted_seq != after_sequence: logger.info( f"已自动调整插入位置:{after_sequence} -> {adjusted_seq}" ) after_sequence = adjusted_seq # 检查是否已在运行 if trace_id in _running_tasks and not _running_tasks[trace_id].done(): raise HTTPException(status_code=409, detail="Trace is already running") config = RunConfig(trace_id=trace_id, after_sequence=after_sequence) task = asyncio.create_task(_run_in_background(trace_id, req.messages, config)) _running_tasks[trace_id] = task mode = "rewind" if after_sequence is not None else "continue" return RunResponse( trace_id=trace_id, status="started", message=f"Run ({mode}) started. Watch via WebSocket: /api/traces/{trace_id}/watch", ) @router.post("/{trace_id}/stop", response_model=StopResponse) async def stop_trace(trace_id: str): """ 停止运行中的 Trace 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。 Trace 状态置为 "stopped"。 """ runner = _get_runner() # 通过 runner 的 stop 方法设置取消信号 stopped = await runner.stop(trace_id) if not stopped: # 检查是否在 _running_tasks 但 runner 不知道(可能已完成) if trace_id in _running_tasks: task = _running_tasks[trace_id] if not task.done(): task.cancel() _running_tasks.pop(trace_id, None) return StopResponse(trace_id=trace_id, status="stopping") return StopResponse(trace_id=trace_id, status="not_running") return StopResponse(trace_id=trace_id, status="stopping") @router.post("/{trace_id}/reflect", response_model=ReflectResponse) async def reflect_trace(trace_id: str, req: ReflectRequest): """ 触发反思 在 trace 末尾追加一条包含反思 prompt 的 user message,单轮无工具 LLM 调用获取反思结果, 将结果追加到 experiences 文件(默认 ./.cache/experiences.md)。 反思消息作为侧枝(side branch):运行前保存 head_sequence,运行后恢复(try/finally 保证)。 使用 max_iterations=1, tools=[] 确保反思不会产生副作用。 """ from agent.core.runner import RunConfig from agent.trace.compaction import build_reflect_prompt runner = _get_runner() if not runner.trace_store: raise HTTPException(status_code=503, detail="TraceStore not configured") # 验证 trace 存在 trace = await runner.trace_store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}") # 检查是否仍在运行 if trace_id in _running_tasks and not _running_tasks[trace_id].done(): raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.") # 保存当前 head_sequence(反思完成后恢复,使反思消息成为侧枝) saved_head_sequence = trace.head_sequence # 构建反思 prompt prompt = build_reflect_prompt() if req.focus: prompt += f"\n\n请特别关注:{req.focus}" # 以续跑方式运行:单轮无工具 LLM 调用 config = RunConfig(trace_id=trace_id, max_iterations=1, tools=[]) reflection_text = "" try: result = await runner.run_result( messages=[{"role": "user", "content": prompt}], config=config, ) reflection_text = result.get("summary", "") finally: # 恢复 head_sequence(反思消息成为侧枝,不影响主路径) await runner.trace_store.update_trace(trace_id, head_sequence=saved_head_sequence) # 追加到 experiences 文件 if reflection_text: experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md") if experiences_path: os.makedirs(os.path.dirname(experiences_path), exist_ok=True) header = f"\n\n---\n\n## {trace_id} ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n" with open(experiences_path, "a", encoding="utf-8") as f: f.write(header + reflection_text + "\n") logger.info(f"Reflection appended to {experiences_path}") return ReflectResponse( trace_id=trace_id, reflection=reflection_text, ) @router.get("/running", tags=["run"]) async def list_running(): """列出正在运行的 Trace""" running = [] for tid, task in list(_running_tasks.items()): if task.done(): _running_tasks.pop(tid, None) else: running.append(tid) return {"running": running} # ===== 经验 API ===== @experiences_router.get("/experiences") async def list_experiences(): """读取经验文件内容""" runner = _get_runner() experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md") if not experiences_path or not os.path.exists(experiences_path): return {"content": "", "path": experiences_path} with open(experiences_path, "r", encoding="utf-8") as f: content = f.read() return {"content": content, "path": experiences_path}