""" Trace 控制 API — 新建 / 运行 / 停止 / 反思 提供 POST 端点触发 Agent 执行和控制。需要通过 set_runner() 注入 AgentRunner 实例。 执行在后台异步进行,客户端通过 WebSocket (/api/traces/{trace_id}/watch) 监听实时更新。 端点: POST /api/traces — 新建 Trace 并执行 POST /api/traces/{id}/run — 运行(统一续跑 + 回溯) POST /api/traces/{id}/stop — 停止运行中的 Trace POST /api/traces/{id}/reflect — 反思,在 trace 末尾追加反思 prompt 运行,结果追加到 experiences 文件 GET /api/traces/running — 列出正在运行的 Trace GET /api/experiences — 读取经验文件内容 """ import asyncio import logging import re import uuid import os from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/traces", tags=["run"]) # 经验 API 使用独立 prefix experiences_router = APIRouter(prefix="/api", tags=["experiences"]) # ===== 全局 Runner(由 api_server.py 注入)===== _runner = None def set_runner(runner): """注入 AgentRunner 实例""" global _runner _runner = runner def _get_runner(): if _runner is None: raise HTTPException( status_code=503, detail="AgentRunner not configured. Server is in read-only mode.", ) return _runner # ===== Request / Response 模型 ===== class CreateRequest(BaseModel): """新建执行""" messages: List[Dict[str, Any]] = Field( ..., description="OpenAI SDK 格式的输入消息。可包含 system + user 消息;若无 system 消息则从 skills 自动构建", ) model: str = Field("gpt-4o", description="模型名称") temperature: float = Field(0.3) max_iterations: int = Field(200) tools: Optional[List[str]] = Field(None, description="工具白名单(None = 全部)") name: Optional[str] = Field(None, description="任务名称(None = 自动生成)") uid: Optional[str] = Field(None) project_name: Optional[str] = Field(None, description="示例项目名称,若提供则动态加载其执行环境") class TraceRunRequest(BaseModel): """运行(统一续跑 + 回溯)""" messages: List[Dict[str, Any]] = Field( default_factory=list, description="追加的新消息(可为空,用于重新生成场景)", ) after_message_id: Optional[str] = Field( None, description="从哪条消息后续跑。None = 从末尾续跑,message_id = 从该消息后运行(自动判断续跑/回溯)", ) class ReflectRequest(BaseModel): """反思请求""" focus: Optional[str] = Field(None, description="反思重点(可选)") class RunResponse(BaseModel): """操作响应(立即返回,后台执行)""" trace_id: str status: str = "started" message: str = "" class StopResponse(BaseModel): """停止响应""" trace_id: str status: str # "stopping" | "not_running" class ReflectResponse(BaseModel): """反思响应""" trace_id: str reflection: str class CompactResponse(BaseModel): """压缩响应""" trace_id: str previous_count: int new_count: int message: str = "" # ===== 后台执行 ===== _running_tasks: Dict[str, asyncio.Task] = {} async def _run_in_background(trace_id: str, messages: List[Dict], config, runner_instance=None): """后台执行 agent,消费 run() 的所有 yield""" runner = runner_instance or _get_runner() try: async for _item in runner.run(messages=messages, config=config): pass # WebSocket 广播由 runner 内部的 store 事件驱动 except Exception as e: logger.error(f"Background run failed for {trace_id}: {e}") finally: _running_tasks.pop(trace_id, None) async def _run_with_trace_signal( messages: List[Dict], config, trace_id_future: asyncio.Future, runner_instance=None ): """后台执行 agent,通过 Future 将 trace_id 传回给等待的 endpoint""" from agent.trace.models import Trace runner = runner_instance or _get_runner() trace_id: Optional[str] = None try: async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace) and not trace_id_future.done(): trace_id = item.trace_id trace_id_future.set_result(trace_id) except Exception as e: if not trace_id_future.done(): trace_id_future.set_exception(e) logger.error(f"Background run failed: {e}") finally: if trace_id: _running_tasks.pop(trace_id, None) # ===== 路由 ===== @router.post("", response_model=RunResponse) async def create_and_run(req: CreateRequest): """ 新建 Trace 并开始执行 立即返回 trace_id,后台异步执行。 通过 WebSocket /api/traces/{trace_id}/watch 监听实时更新。 """ import importlib from agent.core.runner import RunConfig runner = None config = None messages = req.messages if req.project_name: try: # 动态加载对应 example 的 run.py module_name = f"examples.{req.project_name}.run" example_module = importlib.import_module(module_name) if hasattr(example_module, "init_project_env"): # 获取该 example 专属的 runner, 带上下文 messages, 以及默认 config runner, example_messages, default_config = await example_module.init_project_env(req.messages) messages = example_messages # 合并请求配置和 example 默认配置 config = RunConfig( model=req.model or default_config.model, temperature=req.temperature if req.temperature is not None else default_config.temperature, max_iterations=req.max_iterations or default_config.max_iterations, tools=req.tools or default_config.tools, name=req.name or default_config.name, uid=req.uid or default_config.uid, enable_research_flow=default_config.enable_research_flow, context={"project_name": req.project_name} ) except ImportError as e: if getattr(e, "name", None) == module_name: logger.warning(f"Project '{req.project_name}' has no custom run.py, falling back to default.") else: import traceback logger.error(f"Error INSIDE {module_name}:\n{traceback.format_exc()}") except Exception as e: import traceback logger.error(f"Unexpected error loading project environment for {req.project_name}:\n{traceback.format_exc()}") if not runner: _get_runner() # 验证全局默认 Runner 已配置 config = RunConfig( model=req.model, temperature=req.temperature, max_iterations=req.max_iterations, tools=req.tools, name=req.name, uid=req.uid, context={"project_name": req.project_name} if req.project_name else {} ) # 启动后台执行,通过 Future 等待 trace_id(Phase 1 完成后即返回) trace_id_future: asyncio.Future[str] = asyncio.get_running_loop().create_future() task = asyncio.create_task( _run_with_trace_signal(messages, config, trace_id_future, runner_instance=runner) ) trace_id = await trace_id_future _running_tasks[trace_id] = task return RunResponse( trace_id=trace_id, status="started", message=f"Execution started. Watch via WebSocket: /api/traces/{trace_id}/watch", ) async def _cleanup_incomplete_tool_calls(store, trace_id: str, after_sequence: int) -> int: """ 找到安全的插入点,保证不会把新消息插在一个不完整的工具调用序列中间。 场景: 1. after_sequence 刚好是一条带 tool_calls 的 assistant 消息, 但其部分/全部 tool response 还没生成 → 回退到该 assistant 之前。 2. after_sequence 是某条 tool response,但同一批 tool_calls 中 还有其他 response 未生成 → 回退到该 assistant 之前。 核心逻辑:从 after_sequence 往前找,定位到包含它的那条 assistant 消息, 检查该 assistant 的所有 tool_calls 是否都有对应的 tool response。 如果不完整,就把截断点回退到该 assistant 消息之前(即其 parent_sequence)。 Args: store: TraceStore trace_id: Trace ID after_sequence: 用户指定的插入位置 Returns: 调整后的安全截断点(<= after_sequence) """ all_messages = await store.get_trace_messages(trace_id) if not all_messages: return after_sequence by_seq = {msg.sequence: msg for msg in all_messages} target = by_seq.get(after_sequence) if target is None: return after_sequence # 找到"所属的 assistant 消息": # - 如果 target 本身是 assistant → 就是它 # - 如果 target 是 tool → 沿 parent_sequence 往上找 assistant assistant_msg = None if target.role == "assistant": assistant_msg = target elif target.role == "tool": cur = target while cur and cur.role == "tool": parent_seq = cur.parent_sequence cur = by_seq.get(parent_seq) if parent_seq is not None else None if cur and cur.role == "assistant": assistant_msg = cur if assistant_msg is None: return after_sequence # 该 assistant 是否带 tool_calls? content = assistant_msg.content if not isinstance(content, dict) or not content.get("tool_calls"): return after_sequence # 收集所有 tool_call_ids expected_ids = set() for tc in content["tool_calls"]: if isinstance(tc, dict) and tc.get("id"): expected_ids.add(tc["id"]) if not expected_ids: return after_sequence # 查找已有的 tool responses found_ids = set() for msg in all_messages: if msg.role == "tool" and msg.tool_call_id in expected_ids: found_ids.add(msg.tool_call_id) missing = expected_ids - found_ids if not missing: # 全部 tool response 都在,这是一个完整的序列 return after_sequence # 不完整 → 回退到 assistant 之前 safe = assistant_msg.parent_sequence if safe is None: # assistant 已经是第一条消息,没有更早的位置 safe = assistant_msg.sequence - 1 logger.info( "检测到不完整的工具调用 (assistant seq=%d, 缺少 %d/%d tool responses)," "自动回退插入点:%d -> %d", assistant_msg.sequence, len(missing), len(expected_ids), after_sequence, safe, ) return safe def _parse_sequence_from_message_id(message_id: str) -> int: """从 message_id 末尾解析 sequence 整数(格式:{trace_id}-{sequence:04d})""" try: return int(message_id.rsplit("-", 1)[-1]) except (ValueError, IndexError): raise HTTPException( status_code=422, detail=f"Invalid after_message_id format: {message_id!r}", ) @router.post("/{trace_id}/run", response_model=RunResponse) async def run_trace(trace_id: str, req: TraceRunRequest): """ 运行已有 Trace(统一续跑 + 回溯) - after_message_id 为 null(或省略):从末尾续跑 - after_message_id 为 message_id 字符串:从该消息后运行(Runner 自动判断续跑/回溯) - messages 为空 + after_message_id 有值:重新生成(从该位置重跑,不插入新消息) **自动清理不完整工具调用**: 如果人工插入 message 的位置打断了一个工具调用过程(assistant 消息有 tool_calls 但缺少对应的 tool responses),框架会自动检测并调整插入位置,确保不会产生不一致的状态。 """ from agent.core.runner import RunConfig import importlib runner = _get_runner() # 将 message_id 转换为内部使用的 sequence 整数 after_sequence: Optional[int] = None if req.after_message_id is not None: after_sequence = _parse_sequence_from_message_id(req.after_message_id) # 验证 trace 存在 if runner.trace_store: trace = await runner.trace_store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}") # 自动检查并清理不完整的工具调用 if after_sequence is not None and req.messages: adjusted_seq = await _cleanup_incomplete_tool_calls( runner.trace_store, trace_id, after_sequence ) if adjusted_seq != after_sequence: logger.info( f"已自动调整插入位置:{after_sequence} -> {adjusted_seq}" ) after_sequence = adjusted_seq # 检查是否已在运行 if trace_id in _running_tasks and not _running_tasks[trace_id].done(): # 竞态窗口修复:task 还没退出,但 store 里 trace 已经是 stopped/failed # 这发生在 cancel_event 触发后 → store 更新 → task finally 块还未执行之间 # 此时可以安全地强制清除旧 task,允许续跑 store_trace = None if runner.trace_store: store_trace = await runner.trace_store.get_trace(trace_id) if store_trace and store_trace.status in ("stopped", "failed", "completed"): logger.info( f"run_trace: task for {trace_id} not done yet but store status={store_trace.status!r}, " "forcing cleanup to allow resume" ) _running_tasks[trace_id].cancel() _running_tasks.pop(trace_id, None) else: raise HTTPException(status_code=409, detail="Trace is already running") # 检测 trace 中是否包含 project_name 环境定义 trace_context = trace.context or {} project_name = trace_context.get("project_name") if project_name: try: module_name = f"examples.{project_name}.run" example_module = importlib.import_module(module_name) if hasattr(example_module, "init_project_env"): logger.info(f"Trace {trace_id} 绑定了项目 {project_name},动态加载执行环境...") project_runner, project_msgs, default_config = await example_module.init_project_env() runner = project_runner # 发生替换 except ImportError as e: if getattr(e, "name", None) == module_name: logger.warning(f"Project '{project_name}' has no custom run.py, keeping default runner.") else: import traceback logger.error(f"Error INSIDE {module_name} during resume:\n{traceback.format_exc()}") except Exception as e: import traceback logger.error(f"Unexpected error loading run.py environment for project {project_name} in trace {trace_id}:\n{traceback.format_exc()}") config = RunConfig(trace_id=trace_id, after_sequence=after_sequence) # 恢复运行时,将状态从 stopped 改回 running,并广播状态变化 if runner.trace_store and trace_id: current_trace = await runner.trace_store.get_trace(trace_id) if current_trace and current_trace.status == "stopped": await runner.trace_store.update_trace(trace_id, status="running") # 广播状态变化给前端 from agent.trace.websocket import broadcast_trace_status_changed await broadcast_trace_status_changed(trace_id, "running") task = asyncio.create_task(_run_in_background(trace_id, req.messages, config, runner_instance=runner)) _running_tasks[trace_id] = task mode = "rewind" if after_sequence is not None else "continue" return RunResponse( trace_id=trace_id, status="started", message=f"Run ({mode}) started. Watch via WebSocket: /api/traces/{trace_id}/watch", ) @router.post("/{trace_id}/stop", response_model=StopResponse) async def stop_trace(trace_id: str): """ 停止运行中的 Trace 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。 Trace 状态置为 "stopped"。 """ runner = _get_runner() # 通过 runner 的 stop 方法设置取消信号 stopped = await runner.stop(trace_id) if not stopped: # 检查是否在 _running_tasks 但 runner 不知道(可能已完成) if trace_id in _running_tasks: task = _running_tasks[trace_id] if not task.done(): task.cancel() _running_tasks.pop(trace_id, None) return StopResponse(trace_id=trace_id, status="stopping") return StopResponse(trace_id=trace_id, status="not_running") return StopResponse(trace_id=trace_id, status="stopping") @router.post("/{trace_id}/reflect", response_model=ReflectResponse) async def reflect_trace(trace_id: str, req: ReflectRequest): """ 触发反思 通过 force_side_branch="reflection" 触发侧分支多轮 agent 模式, LLM 可以调用工具(如 knowledge_search, knowledge_save)进行多轮推理。 反思消息标记为侧分支(branch_type="reflection"),不在主路径上。 """ from agent.core.runner import RunConfig runner = _get_runner() if not runner.trace_store: raise HTTPException(status_code=503, detail="TraceStore not configured") # 验证 trace 存在 trace = await runner.trace_store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}") # 检查是否仍在运行 if trace_id in _running_tasks and not _running_tasks[trace_id].done(): raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.") # 使用 force_side_branch 触发反思侧分支 config = RunConfig( trace_id=trace_id, model=trace.model or "gpt-4o", force_side_branch=["reflection"], # 使用列表格式 max_iterations=20, # 给侧分支足够的轮次 enable_prompt_caching=True, ) # 如果有 focus,可以通过追加消息传递(可选) messages = [] if req.focus: messages = [{"role": "user", "content": f"反思重点:{req.focus}"}] # 启动反思任务(后台执行) task = asyncio.create_task(_run_trace_background(runner, messages, config)) _running_tasks[trace_id] = task return ReflectResponse( trace_id=trace_id, reflection="反思任务已启动,通过 WebSocket 监听实时更新", ) @router.post("/{trace_id}/compact", response_model=CompactResponse) async def compact_trace(trace_id: str): """ 压缩 Trace 的上下文 (Compact) 通过 force_side_branch="compression" 触发侧分支多轮 agent 模式, LLM 可以调用工具(如 goal)进行多轮推理。 压缩消息标记为侧分支(branch_type="compression"),不在主路径上。 """ from agent.core.runner import RunConfig runner = _get_runner() if not runner.trace_store: raise HTTPException(status_code=503, detail="TraceStore not configured") # 验证 trace 存在 trace = await runner.trace_store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}") # 检查是否仍在运行 if trace_id in _running_tasks and not _running_tasks[trace_id].done(): raise HTTPException(status_code=409, detail="Cannot compact a running trace. Stop it first.") # 使用 force_side_branch 触发压缩侧分支 config = RunConfig( trace_id=trace_id, model=trace.model or "gpt-4o", force_side_branch=["compression"], # 使用列表格式 max_iterations=20, # 给侧分支足够的轮次 enable_prompt_caching=True, ) # 启动压缩任务(后台执行) task = asyncio.create_task(_run_trace_background(runner, [], config)) _running_tasks[trace_id] = task return CompactResponse( trace_id=trace_id, previous_count=0, # 无法立即获取,需通过 WebSocket 监听 new_count=0, message="压缩任务已启动,通过 WebSocket 监听实时更新", ) @router.get("/running", tags=["run"]) async def list_running(): """列出正在运行的 Trace(包含活跃状态判断)""" from datetime import datetime, timedelta runner = _get_runner() running = [] for tid, task in list(_running_tasks.items()): if task.done(): _running_tasks.pop(tid, None) else: # 获取trace详情,检查最后活动时间 trace_info = {"trace_id": tid, "is_active": True} if runner.trace_store: try: trace = await runner.trace_store.get_trace(tid) if trace: # 判断是否真正活跃:最后活动时间在30秒内 if hasattr(trace, 'last_activity_at') and trace.last_activity_at: time_since_activity = (datetime.now() - trace.last_activity_at).total_seconds() trace_info["is_active"] = time_since_activity < 30 trace_info["seconds_since_activity"] = int(time_since_activity) trace_info["status"] = trace.status except Exception: pass running.append(trace_info) return {"running": running} async def reconcile_traces(): """ 状态对齐:启动时清理残留的 running 状态。 当服务异常停止或重启后,磁盘上的 trace 状态可能仍显示为 running, 但对应的内存任务已不存在。本函数将其强制标记为 stopped。 """ runner = _get_runner() if not runner or not runner.trace_store: logger.warning("[Reconciliation] Runner or TraceStore not initialized, skipping.") return try: # 获取所有 running 状态的 trace running_traces = await runner.trace_store.list_traces(status="running", limit=1000) if not running_traces: return count = 0 for trace in running_traces: tid = trace.trace_id # 如果不在活跃任务字典中(服务初次启动时此字典为空),则视为异常残留 if tid not in _running_tasks: logger.info(f"[Reconciliation] Fixing trace {tid}: running -> stopped") await runner.trace_store.update_trace( tid, status="stopped", result_summary="[Reconciliation] 任务由于服务重启或异常中断已自动停止。" ) count += 1 if count > 0: logger.info(f"[Reconciliation] Successfully reconciled {count} traces.") except Exception as e: logger.error(f"[Reconciliation] Failed to reconcile traces: {e}") # ===== 经验 API ===== @experiences_router.get("/experiences") async def list_experiences(): """读取经验文件内容""" runner = _get_runner() experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md") if not experiences_path or not os.path.exists(experiences_path): return {"content": "", "path": experiences_path} with open(experiences_path, "r", encoding="utf-8") as f: content = f.read() return {"content": content, "path": experiences_path}