| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- """
- Trace 控制 API — 新建 / 运行 / 停止 / 反思
- 提供 POST 端点触发 Agent 执行和控制。需要通过 set_runner() 注入 AgentRunner 实例。
- 执行在后台异步进行,客户端通过 WebSocket (/api/traces/{trace_id}/watch) 监听实时更新。
- 端点:
- POST /api/traces — 新建 Trace 并执行
- POST /api/traces/{id}/run — 运行(统一续跑 + 回溯)
- POST /api/traces/{id}/stop — 停止运行中的 Trace
- POST /api/traces/{id}/reflect — 反思,在 trace 末尾追加反思 prompt 运行,结果追加到 experiences 文件
- GET /api/traces/running — 列出正在运行的 Trace
- GET /api/experiences — 读取经验文件内容
- """
- import asyncio
- import logging
- import re
- import uuid
- import os
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel, Field
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/api/traces", tags=["run"])
- # 经验 API 使用独立 prefix
- experiences_router = APIRouter(prefix="/api", tags=["experiences"])
- # ===== 全局 Runner(由 api_server.py 注入)=====
- _runner = None
- def set_runner(runner):
- """注入 AgentRunner 实例"""
- global _runner
- _runner = runner
- def _get_runner():
- if _runner is None:
- raise HTTPException(
- status_code=503,
- detail="AgentRunner not configured. Server is in read-only mode.",
- )
- return _runner
- # ===== Request / Response 模型 =====
- class CreateRequest(BaseModel):
- """新建执行"""
- messages: List[Dict[str, Any]] = Field(
- ...,
- description="OpenAI SDK 格式的输入消息。可包含 system + user 消息;若无 system 消息则从 skills 自动构建",
- )
- model: str = Field("gpt-4o", description="模型名称")
- temperature: float = Field(0.3)
- max_iterations: int = Field(200)
- tools: Optional[List[str]] = Field(None, description="工具白名单(None = 全部)")
- name: Optional[str] = Field(None, description="任务名称(None = 自动生成)")
- uid: Optional[str] = Field(None)
- project_name: Optional[str] = Field(None, description="示例项目名称,若提供则动态加载其执行环境")
- class TraceRunRequest(BaseModel):
- """运行(统一续跑 + 回溯)"""
- messages: List[Dict[str, Any]] = Field(
- default_factory=list,
- description="追加的新消息(可为空,用于重新生成场景)",
- )
- after_message_id: Optional[str] = Field(
- None,
- description="从哪条消息后续跑。None = 从末尾续跑,message_id = 从该消息后运行(自动判断续跑/回溯)",
- )
- class ReflectRequest(BaseModel):
- """反思请求"""
- focus: Optional[str] = Field(None, description="反思重点(可选)")
- class RunResponse(BaseModel):
- """操作响应(立即返回,后台执行)"""
- trace_id: str
- status: str = "started"
- message: str = ""
- class StopResponse(BaseModel):
- """停止响应"""
- trace_id: str
- status: str # "stopping" | "not_running"
- class ReflectResponse(BaseModel):
- """反思响应"""
- trace_id: str
- reflection: str
- class CompactResponse(BaseModel):
- """压缩响应"""
- trace_id: str
- previous_count: int
- new_count: int
- message: str = ""
- # ===== 后台执行 =====
- _running_tasks: Dict[str, asyncio.Task] = {}
- async def _run_in_background(trace_id: str, messages: List[Dict], config, runner_instance=None):
- """后台执行 agent,消费 run() 的所有 yield"""
- runner = runner_instance or _get_runner()
- try:
- async for _item in runner.run(messages=messages, config=config):
- pass # WebSocket 广播由 runner 内部的 store 事件驱动
- except Exception as e:
- logger.error(f"Background run failed for {trace_id}: {e}")
- finally:
- _running_tasks.pop(trace_id, None)
- async def _run_with_trace_signal(
- messages: List[Dict], config, trace_id_future: asyncio.Future, runner_instance=None
- ):
- """后台执行 agent,通过 Future 将 trace_id 传回给等待的 endpoint"""
- from agent.trace.models import Trace
- runner = runner_instance or _get_runner()
- trace_id: Optional[str] = None
- try:
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace) and not trace_id_future.done():
- trace_id = item.trace_id
- trace_id_future.set_result(trace_id)
- except Exception as e:
- if not trace_id_future.done():
- trace_id_future.set_exception(e)
- logger.error(f"Background run failed: {e}")
- finally:
- if trace_id:
- _running_tasks.pop(trace_id, None)
- # ===== 路由 =====
- @router.post("", response_model=RunResponse)
- async def create_and_run(req: CreateRequest):
- """
- 新建 Trace 并开始执行
- 立即返回 trace_id,后台异步执行。
- 通过 WebSocket /api/traces/{trace_id}/watch 监听实时更新。
- """
- import importlib
- from agent.core.runner import RunConfig
- runner = None
- config = None
- messages = req.messages
- if req.project_name:
- try:
- # 动态加载对应 example 的 run.py
- module_name = f"examples.{req.project_name}.run"
- example_module = importlib.import_module(module_name)
- if hasattr(example_module, "init_project_env"):
- # 获取该 example 专属的 runner, 带上下文 messages, 以及默认 config
- runner, example_messages, default_config = await example_module.init_project_env(req.messages)
- messages = example_messages
-
- # 合并请求配置和 example 默认配置
- config = RunConfig(
- model=req.model or default_config.model,
- temperature=req.temperature if req.temperature is not None else default_config.temperature,
- max_iterations=req.max_iterations or default_config.max_iterations,
- tools=req.tools or default_config.tools,
- name=req.name or default_config.name,
- uid=req.uid or default_config.uid,
- enable_research_flow=default_config.enable_research_flow,
- context={"project_name": req.project_name}
- )
- except ImportError as e:
- if getattr(e, "name", None) == module_name:
- logger.warning(f"Project '{req.project_name}' has no custom run.py, falling back to default.")
- else:
- import traceback
- logger.error(f"Error INSIDE {module_name}:\n{traceback.format_exc()}")
- except Exception as e:
- import traceback
- logger.error(f"Unexpected error loading project environment for {req.project_name}:\n{traceback.format_exc()}")
-
- if not runner:
- _get_runner() # 验证全局默认 Runner 已配置
- config = RunConfig(
- model=req.model,
- temperature=req.temperature,
- max_iterations=req.max_iterations,
- tools=req.tools,
- name=req.name,
- uid=req.uid,
- context={"project_name": req.project_name} if req.project_name else {}
- )
- # 启动后台执行,通过 Future 等待 trace_id(Phase 1 完成后即返回)
- trace_id_future: asyncio.Future[str] = asyncio.get_running_loop().create_future()
- task = asyncio.create_task(
- _run_with_trace_signal(messages, config, trace_id_future, runner_instance=runner)
- )
- trace_id = await trace_id_future
- _running_tasks[trace_id] = task
- return RunResponse(
- trace_id=trace_id,
- status="started",
- message=f"Execution started. Watch via WebSocket: /api/traces/{trace_id}/watch",
- )
- async def _cleanup_incomplete_tool_calls(store, trace_id: str, after_sequence: int) -> int:
- """
- 找到安全的插入点,保证不会把新消息插在一个不完整的工具调用序列中间。
- 场景:
- 1. after_sequence 刚好是一条带 tool_calls 的 assistant 消息,
- 但其部分/全部 tool response 还没生成 → 回退到该 assistant 之前。
- 2. after_sequence 是某条 tool response,但同一批 tool_calls 中
- 还有其他 response 未生成 → 回退到该 assistant 之前。
- 核心逻辑:从 after_sequence 往前找,定位到包含它的那条 assistant 消息,
- 检查该 assistant 的所有 tool_calls 是否都有对应的 tool response。
- 如果不完整,就把截断点回退到该 assistant 消息之前(即其 parent_sequence)。
- Args:
- store: TraceStore
- trace_id: Trace ID
- after_sequence: 用户指定的插入位置
- Returns:
- 调整后的安全截断点(<= after_sequence)
- """
- all_messages = await store.get_trace_messages(trace_id)
- if not all_messages:
- return after_sequence
- by_seq = {msg.sequence: msg for msg in all_messages}
- target = by_seq.get(after_sequence)
- if target is None:
- return after_sequence
- # 找到"所属的 assistant 消息":
- # - 如果 target 本身是 assistant → 就是它
- # - 如果 target 是 tool → 沿 parent_sequence 往上找 assistant
- assistant_msg = None
- if target.role == "assistant":
- assistant_msg = target
- elif target.role == "tool":
- cur = target
- while cur and cur.role == "tool":
- parent_seq = cur.parent_sequence
- cur = by_seq.get(parent_seq) if parent_seq is not None else None
- if cur and cur.role == "assistant":
- assistant_msg = cur
- if assistant_msg is None:
- return after_sequence
- # 该 assistant 是否带 tool_calls?
- content = assistant_msg.content
- if not isinstance(content, dict) or not content.get("tool_calls"):
- return after_sequence
- # 收集所有 tool_call_ids
- expected_ids = set()
- for tc in content["tool_calls"]:
- if isinstance(tc, dict) and tc.get("id"):
- expected_ids.add(tc["id"])
- if not expected_ids:
- return after_sequence
- # 查找已有的 tool responses
- found_ids = set()
- for msg in all_messages:
- if msg.role == "tool" and msg.tool_call_id in expected_ids:
- found_ids.add(msg.tool_call_id)
- missing = expected_ids - found_ids
- if not missing:
- # 全部 tool response 都在,这是一个完整的序列
- return after_sequence
- # 不完整 → 回退到 assistant 之前
- safe = assistant_msg.parent_sequence
- if safe is None:
- # assistant 已经是第一条消息,没有更早的位置
- safe = assistant_msg.sequence - 1
- logger.info(
- "检测到不完整的工具调用 (assistant seq=%d, 缺少 %d/%d tool responses),"
- "自动回退插入点:%d -> %d",
- assistant_msg.sequence, len(missing), len(expected_ids),
- after_sequence, safe,
- )
- return safe
- def _parse_sequence_from_message_id(message_id: str) -> int:
- """从 message_id 末尾解析 sequence 整数(格式:{trace_id}-{sequence:04d})"""
- try:
- return int(message_id.rsplit("-", 1)[-1])
- except (ValueError, IndexError):
- raise HTTPException(
- status_code=422,
- detail=f"Invalid after_message_id format: {message_id!r}",
- )
- @router.post("/{trace_id}/run", response_model=RunResponse)
- async def run_trace(trace_id: str, req: TraceRunRequest):
- """
- 运行已有 Trace(统一续跑 + 回溯)
- - after_message_id 为 null(或省略):从末尾续跑
- - after_message_id 为 message_id 字符串:从该消息后运行(Runner 自动判断续跑/回溯)
- - messages 为空 + after_message_id 有值:重新生成(从该位置重跑,不插入新消息)
- **自动清理不完整工具调用**:
- 如果人工插入 message 的位置打断了一个工具调用过程(assistant 消息有 tool_calls
- 但缺少对应的 tool responses),框架会自动检测并调整插入位置,确保不会产生不一致的状态。
- """
- from agent.core.runner import RunConfig
- import importlib
- runner = _get_runner()
- # 将 message_id 转换为内部使用的 sequence 整数
- after_sequence: Optional[int] = None
- if req.after_message_id is not None:
- after_sequence = _parse_sequence_from_message_id(req.after_message_id)
- # 验证 trace 存在
- if runner.trace_store:
- trace = await runner.trace_store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
- # 自动检查并清理不完整的工具调用
- if after_sequence is not None and req.messages:
- adjusted_seq = await _cleanup_incomplete_tool_calls(
- runner.trace_store, trace_id, after_sequence
- )
- if adjusted_seq != after_sequence:
- logger.info(
- f"已自动调整插入位置:{after_sequence} -> {adjusted_seq}"
- )
- after_sequence = adjusted_seq
- # 检查是否已在运行
- if trace_id in _running_tasks and not _running_tasks[trace_id].done():
- # 竞态窗口修复:task 还没退出,但 store 里 trace 已经是 stopped/failed
- # 这发生在 cancel_event 触发后 → store 更新 → task finally 块还未执行之间
- # 此时可以安全地强制清除旧 task,允许续跑
- store_trace = None
- if runner.trace_store:
- store_trace = await runner.trace_store.get_trace(trace_id)
- if store_trace and store_trace.status in ("stopped", "failed", "completed"):
- logger.info(
- f"run_trace: task for {trace_id} not done yet but store status={store_trace.status!r}, "
- "forcing cleanup to allow resume"
- )
- _running_tasks[trace_id].cancel()
- _running_tasks.pop(trace_id, None)
- else:
- raise HTTPException(status_code=409, detail="Trace is already running")
- # 检测 trace 中是否包含 project_name 环境定义
- trace_context = trace.context or {}
- project_name = trace_context.get("project_name")
-
- if project_name:
- try:
- module_name = f"examples.{project_name}.run"
- example_module = importlib.import_module(module_name)
- if hasattr(example_module, "init_project_env"):
- logger.info(f"Trace {trace_id} 绑定了项目 {project_name},动态加载执行环境...")
- project_runner, project_msgs, default_config = await example_module.init_project_env()
- runner = project_runner # 发生替换
- except ImportError as e:
- if getattr(e, "name", None) == module_name:
- logger.warning(f"Project '{project_name}' has no custom run.py, keeping default runner.")
- else:
- import traceback
- logger.error(f"Error INSIDE {module_name} during resume:\n{traceback.format_exc()}")
- except Exception as e:
- import traceback
- logger.error(f"Unexpected error loading run.py environment for project {project_name} in trace {trace_id}:\n{traceback.format_exc()}")
- config = RunConfig(trace_id=trace_id, after_sequence=after_sequence)
- # 恢复运行时,将状态从 stopped 改回 running,并广播状态变化
- if runner.trace_store and trace_id:
- current_trace = await runner.trace_store.get_trace(trace_id)
- if current_trace and current_trace.status == "stopped":
- await runner.trace_store.update_trace(trace_id, status="running")
- # 广播状态变化给前端
- from agent.trace.websocket import broadcast_trace_status_changed
- await broadcast_trace_status_changed(trace_id, "running")
- task = asyncio.create_task(_run_in_background(trace_id, req.messages, config, runner_instance=runner))
- _running_tasks[trace_id] = task
- mode = "rewind" if after_sequence is not None else "continue"
- return RunResponse(
- trace_id=trace_id,
- status="started",
- message=f"Run ({mode}) started. Watch via WebSocket: /api/traces/{trace_id}/watch",
- )
- @router.post("/{trace_id}/stop", response_model=StopResponse)
- async def stop_trace(trace_id: str):
- """
- 停止运行中的 Trace
- 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。
- Trace 状态置为 "stopped"。
- """
- runner = _get_runner()
- # 通过 runner 的 stop 方法设置取消信号
- stopped = await runner.stop(trace_id)
- if not stopped:
- # 检查是否在 _running_tasks 但 runner 不知道(可能已完成)
- if trace_id in _running_tasks:
- task = _running_tasks[trace_id]
- if not task.done():
- task.cancel()
- _running_tasks.pop(trace_id, None)
- return StopResponse(trace_id=trace_id, status="stopping")
- return StopResponse(trace_id=trace_id, status="not_running")
- return StopResponse(trace_id=trace_id, status="stopping")
- @router.post("/{trace_id}/reflect", response_model=ReflectResponse)
- async def reflect_trace(trace_id: str, req: ReflectRequest):
- """
- 触发反思
- 通过 force_side_branch="reflection" 触发侧分支多轮 agent 模式,
- LLM 可以调用工具(如 knowledge_search, knowledge_save)进行多轮推理。
- 反思消息标记为侧分支(branch_type="reflection"),不在主路径上。
- """
- from agent.core.runner import RunConfig
- runner = _get_runner()
- if not runner.trace_store:
- raise HTTPException(status_code=503, detail="TraceStore not configured")
- # 验证 trace 存在
- trace = await runner.trace_store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
- # 检查是否仍在运行
- if trace_id in _running_tasks and not _running_tasks[trace_id].done():
- raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.")
- # 使用 force_side_branch 触发反思侧分支
- config = RunConfig(
- trace_id=trace_id,
- model=trace.model or "gpt-4o",
- force_side_branch=["reflection"], # 使用列表格式
- max_iterations=20, # 给侧分支足够的轮次
- enable_prompt_caching=True,
- )
- # 如果有 focus,可以通过追加消息传递(可选)
- messages = []
- if req.focus:
- messages = [{"role": "user", "content": f"反思重点:{req.focus}"}]
- # 启动反思任务(后台执行)
- task = asyncio.create_task(_run_trace_background(runner, messages, config))
- _running_tasks[trace_id] = task
- return ReflectResponse(
- trace_id=trace_id,
- reflection="反思任务已启动,通过 WebSocket 监听实时更新",
- )
- @router.post("/{trace_id}/compact", response_model=CompactResponse)
- async def compact_trace(trace_id: str):
- """
- 压缩 Trace 的上下文 (Compact)
- 通过 force_side_branch="compression" 触发侧分支多轮 agent 模式,
- LLM 可以调用工具(如 goal)进行多轮推理。
- 压缩消息标记为侧分支(branch_type="compression"),不在主路径上。
- """
- from agent.core.runner import RunConfig
- runner = _get_runner()
- if not runner.trace_store:
- raise HTTPException(status_code=503, detail="TraceStore not configured")
- # 验证 trace 存在
- trace = await runner.trace_store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
- # 检查是否仍在运行
- if trace_id in _running_tasks and not _running_tasks[trace_id].done():
- raise HTTPException(status_code=409, detail="Cannot compact a running trace. Stop it first.")
- # 使用 force_side_branch 触发压缩侧分支
- config = RunConfig(
- trace_id=trace_id,
- model=trace.model or "gpt-4o",
- force_side_branch=["compression"], # 使用列表格式
- max_iterations=20, # 给侧分支足够的轮次
- enable_prompt_caching=True,
- )
- # 启动压缩任务(后台执行)
- task = asyncio.create_task(_run_trace_background(runner, [], config))
- _running_tasks[trace_id] = task
- return CompactResponse(
- trace_id=trace_id,
- previous_count=0, # 无法立即获取,需通过 WebSocket 监听
- new_count=0,
- message="压缩任务已启动,通过 WebSocket 监听实时更新",
- )
- @router.get("/running", tags=["run"])
- async def list_running():
- """列出正在运行的 Trace(包含活跃状态判断)"""
- from datetime import datetime, timedelta
- runner = _get_runner()
- running = []
- for tid, task in list(_running_tasks.items()):
- if task.done():
- _running_tasks.pop(tid, None)
- else:
- # 获取trace详情,检查最后活动时间
- trace_info = {"trace_id": tid, "is_active": True}
- if runner.trace_store:
- try:
- trace = await runner.trace_store.get_trace(tid)
- if trace:
- # 判断是否真正活跃:最后活动时间在30秒内
- if hasattr(trace, 'last_activity_at') and trace.last_activity_at:
- time_since_activity = (datetime.now() - trace.last_activity_at).total_seconds()
- trace_info["is_active"] = time_since_activity < 30
- trace_info["seconds_since_activity"] = int(time_since_activity)
- trace_info["status"] = trace.status
- except Exception:
- pass
- running.append(trace_info)
- return {"running": running}
- async def reconcile_traces():
- """
- 状态对齐:启动时清理残留的 running 状态。
-
- 当服务异常停止或重启后,磁盘上的 trace 状态可能仍显示为 running,
- 但对应的内存任务已不存在。本函数将其强制标记为 stopped。
- """
- runner = _get_runner()
- if not runner or not runner.trace_store:
- logger.warning("[Reconciliation] Runner or TraceStore not initialized, skipping.")
- return
- try:
- # 获取所有 running 状态的 trace
- running_traces = await runner.trace_store.list_traces(status="running", limit=1000)
- if not running_traces:
- return
- count = 0
- for trace in running_traces:
- tid = trace.trace_id
- # 如果不在活跃任务字典中(服务初次启动时此字典为空),则视为异常残留
- if tid not in _running_tasks:
- logger.info(f"[Reconciliation] Fixing trace {tid}: running -> stopped")
- await runner.trace_store.update_trace(
- tid,
- status="stopped",
- result_summary="[Reconciliation] 任务由于服务重启或异常中断已自动停止。"
- )
- count += 1
-
- if count > 0:
- logger.info(f"[Reconciliation] Successfully reconciled {count} traces.")
- except Exception as e:
- logger.error(f"[Reconciliation] Failed to reconcile traces: {e}")
- # ===== 经验 API =====
- @experiences_router.get("/experiences")
- async def list_experiences():
- """读取经验文件内容"""
- runner = _get_runner()
- experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md")
- if not experiences_path or not os.path.exists(experiences_path):
- return {"content": "", "path": experiences_path}
- with open(experiences_path, "r", encoding="utf-8") as f:
- content = f.read()
- return {"content": content, "path": experiences_path}
|