run_api.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. """
  2. Trace 控制 API — 新建 / 运行 / 停止 / 反思
  3. 提供 POST 端点触发 Agent 执行和控制。需要通过 set_runner() 注入 AgentRunner 实例。
  4. 执行在后台异步进行,客户端通过 WebSocket (/api/traces/{trace_id}/watch) 监听实时更新。
  5. 端点:
  6. POST /api/traces — 新建 Trace 并执行
  7. POST /api/traces/{id}/run — 运行(统一续跑 + 回溯)
  8. POST /api/traces/{id}/stop — 停止运行中的 Trace
  9. POST /api/traces/{id}/reflect — 反思,在 trace 末尾追加反思 prompt 运行,结果追加到 experiences 文件
  10. GET /api/traces/running — 列出正在运行的 Trace
  11. GET /api/experiences — 读取经验文件内容
  12. """
  13. import asyncio
  14. import logging
  15. import re
  16. import uuid
  17. import os
  18. from datetime import datetime
  19. from typing import Any, Dict, List, Optional
  20. from fastapi import APIRouter, HTTPException
  21. from pydantic import BaseModel, Field
  22. logger = logging.getLogger(__name__)
  23. router = APIRouter(prefix="/api/traces", tags=["run"])
  24. # 经验 API 使用独立 prefix
  25. experiences_router = APIRouter(prefix="/api", tags=["experiences"])
  26. # ===== 全局 Runner(由 api_server.py 注入)=====
  27. _runner = None
  28. def set_runner(runner):
  29. """注入 AgentRunner 实例"""
  30. global _runner
  31. _runner = runner
  32. def _get_runner():
  33. if _runner is None:
  34. raise HTTPException(
  35. status_code=503,
  36. detail="AgentRunner not configured. Server is in read-only mode.",
  37. )
  38. return _runner
  39. # ===== Request / Response 模型 =====
  40. class CreateRequest(BaseModel):
  41. """新建执行"""
  42. messages: List[Dict[str, Any]] = Field(
  43. ...,
  44. description="OpenAI SDK 格式的输入消息。可包含 system + user 消息;若无 system 消息则从 skills 自动构建",
  45. )
  46. model: str = Field("gpt-4o", description="模型名称")
  47. temperature: float = Field(0.3)
  48. max_iterations: int = Field(200)
  49. tools: Optional[List[str]] = Field(None, description="工具白名单(None = 全部)")
  50. name: Optional[str] = Field(None, description="任务名称(None = 自动生成)")
  51. uid: Optional[str] = Field(None)
  52. project_name: Optional[str] = Field(None, description="示例项目名称,若提供则动态加载其执行环境")
  53. class TraceRunRequest(BaseModel):
  54. """运行(统一续跑 + 回溯)"""
  55. messages: List[Dict[str, Any]] = Field(
  56. default_factory=list,
  57. description="追加的新消息(可为空,用于重新生成场景)",
  58. )
  59. after_message_id: Optional[str] = Field(
  60. None,
  61. description="从哪条消息后续跑。None = 从末尾续跑,message_id = 从该消息后运行(自动判断续跑/回溯)",
  62. )
  63. class ReflectRequest(BaseModel):
  64. """反思请求"""
  65. focus: Optional[str] = Field(None, description="反思重点(可选)")
  66. class RunResponse(BaseModel):
  67. """操作响应(立即返回,后台执行)"""
  68. trace_id: str
  69. status: str = "started"
  70. message: str = ""
  71. class StopResponse(BaseModel):
  72. """停止响应"""
  73. trace_id: str
  74. status: str # "stopping" | "not_running"
  75. class ReflectResponse(BaseModel):
  76. """反思响应"""
  77. trace_id: str
  78. reflection: str
  79. class CompactResponse(BaseModel):
  80. """压缩响应"""
  81. trace_id: str
  82. previous_count: int
  83. new_count: int
  84. message: str = ""
  85. # ===== 提取审核(见 agent/docs/memory-plan.md 第三节) =====
  86. class PendingExtractionModel(BaseModel):
  87. extraction_id: str
  88. sequence: Optional[int] = None
  89. goal_id: Optional[str] = None
  90. branch_id: Optional[str] = None
  91. payload: Dict[str, Any]
  92. reviewed: bool = False
  93. decision: Optional[str] = None
  94. committed: bool = False
  95. class ListExtractionsResponse(BaseModel):
  96. trace_id: str
  97. count: int
  98. items: List[PendingExtractionModel]
  99. class ReviewRequest(BaseModel):
  100. decision: str = Field(..., description="approve / edit / discard")
  101. edited_payload: Optional[Dict[str, Any]] = Field(
  102. None, description="decision=edit 时必填;只对本次 review 生效"
  103. )
  104. class ReviewResponse(BaseModel):
  105. trace_id: str
  106. extraction_id: str
  107. decision: str
  108. class CommitResponse(BaseModel):
  109. trace_id: str
  110. committed_count: int
  111. failed_count: int
  112. skipped_count: int
  113. committed: List[str]
  114. knowledge_ids: List[str]
  115. failed: List[Dict[str, str]]
  116. skipped: List[str]
  117. # ===== 后台执行 =====
  118. _running_tasks: Dict[str, asyncio.Task] = {}
  119. async def _run_in_background(trace_id: str, messages: List[Dict], config, runner_instance=None):
  120. """后台执行 agent,消费 run() 的所有 yield"""
  121. runner = runner_instance or _get_runner()
  122. try:
  123. async for _item in runner.run(messages=messages, config=config):
  124. pass # WebSocket 广播由 runner 内部的 store 事件驱动
  125. except Exception as e:
  126. logger.error(f"Background run failed for {trace_id}: {e}")
  127. finally:
  128. _running_tasks.pop(trace_id, None)
  129. async def _run_with_trace_signal(
  130. messages: List[Dict], config, trace_id_future: asyncio.Future, runner_instance=None
  131. ):
  132. """后台执行 agent,通过 Future 将 trace_id 传回给等待的 endpoint"""
  133. from agent.trace.models import Trace
  134. runner = runner_instance or _get_runner()
  135. trace_id: Optional[str] = None
  136. try:
  137. async for item in runner.run(messages=messages, config=config):
  138. if isinstance(item, Trace) and not trace_id_future.done():
  139. trace_id = item.trace_id
  140. trace_id_future.set_result(trace_id)
  141. except Exception as e:
  142. if not trace_id_future.done():
  143. trace_id_future.set_exception(e)
  144. logger.error(f"Background run failed: {e}")
  145. finally:
  146. if trace_id:
  147. _running_tasks.pop(trace_id, None)
  148. # ===== 路由 =====
  149. @router.post("", response_model=RunResponse)
  150. async def create_and_run(req: CreateRequest):
  151. """
  152. 新建 Trace 并开始执行
  153. 立即返回 trace_id,后台异步执行。
  154. 通过 WebSocket /api/traces/{trace_id}/watch 监听实时更新。
  155. """
  156. import importlib
  157. from agent.core.runner import RunConfig
  158. runner = None
  159. config = None
  160. messages = req.messages
  161. if req.project_name:
  162. try:
  163. # 动态加载对应 example 的 run.py
  164. module_name = f"examples.{req.project_name}.run"
  165. example_module = importlib.import_module(module_name)
  166. if hasattr(example_module, "init_project_env"):
  167. # 获取该 example 专属的 runner, 带上下文 messages, 以及默认 config
  168. runner, example_messages, default_config = await example_module.init_project_env(req.messages)
  169. messages = example_messages
  170. # 合并请求配置和 example 默认配置
  171. config = RunConfig(
  172. model=req.model or default_config.model,
  173. temperature=req.temperature if req.temperature is not None else default_config.temperature,
  174. max_iterations=req.max_iterations or default_config.max_iterations,
  175. tools=req.tools or default_config.tools,
  176. name=req.name or default_config.name,
  177. uid=req.uid or default_config.uid,
  178. enable_research_flow=default_config.enable_research_flow,
  179. context={"project_name": req.project_name}
  180. )
  181. except ImportError as e:
  182. if getattr(e, "name", None) == module_name:
  183. logger.warning(f"Project '{req.project_name}' has no custom run.py, falling back to default.")
  184. else:
  185. import traceback
  186. logger.error(f"Error INSIDE {module_name}:\n{traceback.format_exc()}")
  187. except Exception as e:
  188. import traceback
  189. logger.error(f"Unexpected error loading project environment for {req.project_name}:\n{traceback.format_exc()}")
  190. if not runner:
  191. _get_runner() # 验证全局默认 Runner 已配置
  192. config = RunConfig(
  193. model=req.model,
  194. temperature=req.temperature,
  195. max_iterations=req.max_iterations,
  196. tools=req.tools,
  197. name=req.name,
  198. uid=req.uid,
  199. context={"project_name": req.project_name} if req.project_name else {}
  200. )
  201. # 启动后台执行,通过 Future 等待 trace_id(Phase 1 完成后即返回)
  202. trace_id_future: asyncio.Future[str] = asyncio.get_running_loop().create_future()
  203. task = asyncio.create_task(
  204. _run_with_trace_signal(messages, config, trace_id_future, runner_instance=runner)
  205. )
  206. trace_id = await trace_id_future
  207. _running_tasks[trace_id] = task
  208. return RunResponse(
  209. trace_id=trace_id,
  210. status="started",
  211. message=f"Execution started. Watch via WebSocket: /api/traces/{trace_id}/watch",
  212. )
  213. async def _cleanup_incomplete_tool_calls(store, trace_id: str, after_sequence: int) -> int:
  214. """
  215. 找到安全的插入点,保证不会把新消息插在一个不完整的工具调用序列中间。
  216. 场景:
  217. 1. after_sequence 刚好是一条带 tool_calls 的 assistant 消息,
  218. 但其部分/全部 tool response 还没生成 → 回退到该 assistant 之前。
  219. 2. after_sequence 是某条 tool response,但同一批 tool_calls 中
  220. 还有其他 response 未生成 → 回退到该 assistant 之前。
  221. 核心逻辑:从 after_sequence 往前找,定位到包含它的那条 assistant 消息,
  222. 检查该 assistant 的所有 tool_calls 是否都有对应的 tool response。
  223. 如果不完整,就把截断点回退到该 assistant 消息之前(即其 parent_sequence)。
  224. Args:
  225. store: TraceStore
  226. trace_id: Trace ID
  227. after_sequence: 用户指定的插入位置
  228. Returns:
  229. 调整后的安全截断点(<= after_sequence)
  230. """
  231. all_messages = await store.get_trace_messages(trace_id)
  232. if not all_messages:
  233. return after_sequence
  234. by_seq = {msg.sequence: msg for msg in all_messages}
  235. target = by_seq.get(after_sequence)
  236. if target is None:
  237. return after_sequence
  238. # 找到"所属的 assistant 消息":
  239. # - 如果 target 本身是 assistant → 就是它
  240. # - 如果 target 是 tool → 沿 parent_sequence 往上找 assistant
  241. assistant_msg = None
  242. if target.role == "assistant":
  243. assistant_msg = target
  244. elif target.role == "tool":
  245. cur = target
  246. while cur and cur.role == "tool":
  247. parent_seq = cur.parent_sequence
  248. cur = by_seq.get(parent_seq) if parent_seq is not None else None
  249. if cur and cur.role == "assistant":
  250. assistant_msg = cur
  251. if assistant_msg is None:
  252. return after_sequence
  253. # 该 assistant 是否带 tool_calls?
  254. content = assistant_msg.content
  255. if not isinstance(content, dict) or not content.get("tool_calls"):
  256. return after_sequence
  257. # 收集所有 tool_call_ids
  258. expected_ids = set()
  259. for tc in content["tool_calls"]:
  260. if isinstance(tc, dict) and tc.get("id"):
  261. expected_ids.add(tc["id"])
  262. if not expected_ids:
  263. return after_sequence
  264. # 查找已有的 tool responses
  265. found_ids = set()
  266. for msg in all_messages:
  267. if msg.role == "tool" and msg.tool_call_id in expected_ids:
  268. found_ids.add(msg.tool_call_id)
  269. missing = expected_ids - found_ids
  270. if not missing:
  271. # 全部 tool response 都在,这是一个完整的序列
  272. return after_sequence
  273. # 不完整 → 回退到 assistant 之前
  274. safe = assistant_msg.parent_sequence
  275. if safe is None:
  276. # assistant 已经是第一条消息,没有更早的位置
  277. safe = assistant_msg.sequence - 1
  278. logger.info(
  279. "检测到不完整的工具调用 (assistant seq=%d, 缺少 %d/%d tool responses),"
  280. "自动回退插入点:%d -> %d",
  281. assistant_msg.sequence, len(missing), len(expected_ids),
  282. after_sequence, safe,
  283. )
  284. return safe
  285. def _parse_sequence_from_message_id(message_id: str) -> int:
  286. """从 message_id 末尾解析 sequence 整数(格式:{trace_id}-{sequence:04d})"""
  287. try:
  288. return int(message_id.rsplit("-", 1)[-1])
  289. except (ValueError, IndexError):
  290. raise HTTPException(
  291. status_code=422,
  292. detail=f"Invalid after_message_id format: {message_id!r}",
  293. )
  294. @router.post("/{trace_id}/run", response_model=RunResponse)
  295. async def run_trace(trace_id: str, req: TraceRunRequest):
  296. """
  297. 运行已有 Trace(统一续跑 + 回溯)
  298. - after_message_id 为 null(或省略):从末尾续跑
  299. - after_message_id 为 message_id 字符串:从该消息后运行(Runner 自动判断续跑/回溯)
  300. - messages 为空 + after_message_id 有值:重新生成(从该位置重跑,不插入新消息)
  301. **自动清理不完整工具调用**:
  302. 如果人工插入 message 的位置打断了一个工具调用过程(assistant 消息有 tool_calls
  303. 但缺少对应的 tool responses),框架会自动检测并调整插入位置,确保不会产生不一致的状态。
  304. """
  305. from agent.core.runner import RunConfig
  306. import importlib
  307. runner = _get_runner()
  308. # 将 message_id 转换为内部使用的 sequence 整数
  309. after_sequence: Optional[int] = None
  310. if req.after_message_id is not None:
  311. after_sequence = _parse_sequence_from_message_id(req.after_message_id)
  312. # 验证 trace 存在
  313. if runner.trace_store:
  314. trace = await runner.trace_store.get_trace(trace_id)
  315. if not trace:
  316. raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
  317. # 自动检查并清理不完整的工具调用
  318. if after_sequence is not None and req.messages:
  319. adjusted_seq = await _cleanup_incomplete_tool_calls(
  320. runner.trace_store, trace_id, after_sequence
  321. )
  322. if adjusted_seq != after_sequence:
  323. logger.info(
  324. f"已自动调整插入位置:{after_sequence} -> {adjusted_seq}"
  325. )
  326. after_sequence = adjusted_seq
  327. # 检查是否已在运行
  328. if trace_id in _running_tasks and not _running_tasks[trace_id].done():
  329. # 竞态窗口修复:task 还没退出,但 store 里 trace 已经是 stopped/failed
  330. # 这发生在 cancel_event 触发后 → store 更新 → task finally 块还未执行之间
  331. # 此时可以安全地强制清除旧 task,允许续跑
  332. store_trace = None
  333. if runner.trace_store:
  334. store_trace = await runner.trace_store.get_trace(trace_id)
  335. if store_trace and store_trace.status in ("stopped", "failed", "completed"):
  336. logger.info(
  337. f"run_trace: task for {trace_id} not done yet but store status={store_trace.status!r}, "
  338. "forcing cleanup to allow resume"
  339. )
  340. _running_tasks[trace_id].cancel()
  341. _running_tasks.pop(trace_id, None)
  342. else:
  343. raise HTTPException(status_code=409, detail="Trace is already running")
  344. # 检测 trace 中是否包含 project_name 环境定义
  345. trace_context = trace.context or {}
  346. project_name = trace_context.get("project_name")
  347. if project_name:
  348. try:
  349. module_name = f"examples.{project_name}.run"
  350. example_module = importlib.import_module(module_name)
  351. if hasattr(example_module, "init_project_env"):
  352. logger.info(f"Trace {trace_id} 绑定了项目 {project_name},动态加载执行环境...")
  353. project_runner, project_msgs, default_config = await example_module.init_project_env()
  354. runner = project_runner # 发生替换
  355. except ImportError as e:
  356. if getattr(e, "name", None) == module_name:
  357. logger.warning(f"Project '{project_name}' has no custom run.py, keeping default runner.")
  358. else:
  359. import traceback
  360. logger.error(f"Error INSIDE {module_name} during resume:\n{traceback.format_exc()}")
  361. except Exception as e:
  362. import traceback
  363. logger.error(f"Unexpected error loading run.py environment for project {project_name} in trace {trace_id}:\n{traceback.format_exc()}")
  364. config = RunConfig(trace_id=trace_id, after_sequence=after_sequence)
  365. # 恢复运行时,将状态从 stopped 改回 running,并广播状态变化
  366. if runner.trace_store and trace_id:
  367. current_trace = await runner.trace_store.get_trace(trace_id)
  368. if current_trace and current_trace.status == "stopped":
  369. await runner.trace_store.update_trace(trace_id, status="running")
  370. # 广播状态变化给前端
  371. from agent.trace.websocket import broadcast_trace_status_changed
  372. await broadcast_trace_status_changed(trace_id, "running")
  373. task = asyncio.create_task(_run_in_background(trace_id, req.messages, config, runner_instance=runner))
  374. _running_tasks[trace_id] = task
  375. mode = "rewind" if after_sequence is not None else "continue"
  376. return RunResponse(
  377. trace_id=trace_id,
  378. status="started",
  379. message=f"Run ({mode}) started. Watch via WebSocket: /api/traces/{trace_id}/watch",
  380. )
  381. @router.post("/{trace_id}/stop", response_model=StopResponse)
  382. async def stop_trace(trace_id: str):
  383. """
  384. 停止运行中的 Trace
  385. 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。
  386. Trace 状态置为 "stopped"。
  387. """
  388. runner = _get_runner()
  389. # 通过 runner 的 stop 方法设置取消信号
  390. stopped = await runner.stop(trace_id)
  391. if not stopped:
  392. # 检查是否在 _running_tasks 但 runner 不知道(可能已完成)
  393. if trace_id in _running_tasks:
  394. task = _running_tasks[trace_id]
  395. if not task.done():
  396. task.cancel()
  397. _running_tasks.pop(trace_id, None)
  398. return StopResponse(trace_id=trace_id, status="stopping")
  399. return StopResponse(trace_id=trace_id, status="not_running")
  400. return StopResponse(trace_id=trace_id, status="stopping")
  401. @router.post("/{trace_id}/reflect", response_model=ReflectResponse)
  402. async def reflect_trace(trace_id: str, req: ReflectRequest):
  403. """
  404. 触发反思
  405. 通过 force_side_branch="reflection" 触发侧分支多轮 agent 模式,
  406. LLM 可以调用工具(如 knowledge_search, knowledge_save)进行多轮推理。
  407. 反思消息标记为侧分支(branch_type="reflection"),不在主路径上。
  408. """
  409. from agent.core.runner import RunConfig
  410. runner = _get_runner()
  411. if not runner.trace_store:
  412. raise HTTPException(status_code=503, detail="TraceStore not configured")
  413. # 验证 trace 存在
  414. trace = await runner.trace_store.get_trace(trace_id)
  415. if not trace:
  416. raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
  417. # 检查是否仍在运行
  418. if trace_id in _running_tasks and not _running_tasks[trace_id].done():
  419. raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.")
  420. # 使用 force_side_branch 触发反思侧分支
  421. config = RunConfig(
  422. trace_id=trace_id,
  423. model=trace.model or "gpt-4o",
  424. force_side_branch=["reflection"], # 使用列表格式
  425. max_iterations=20, # 给侧分支足够的轮次
  426. enable_prompt_caching=True,
  427. )
  428. # 如果有 focus,可以通过追加消息传递(可选)
  429. messages = []
  430. if req.focus:
  431. messages = [{"role": "user", "content": f"反思重点:{req.focus}"}]
  432. # 启动反思任务(后台执行)
  433. task = asyncio.create_task(_run_trace_background(runner, messages, config))
  434. _running_tasks[trace_id] = task
  435. return ReflectResponse(
  436. trace_id=trace_id,
  437. reflection="反思任务已启动,通过 WebSocket 监听实时更新",
  438. )
  439. @router.get("/{trace_id}/extractions", response_model=ListExtractionsResponse)
  440. async def list_extractions(trace_id: str, include_reviewed: bool = False):
  441. """列出 trace 的待审核提取条目。"""
  442. runner = _get_runner()
  443. if not runner.trace_store:
  444. raise HTTPException(status_code=503, detail="TraceStore not configured")
  445. from agent.trace.extraction_review import list_pending
  446. pendings = await list_pending(
  447. runner.trace_store, trace_id, include_reviewed=include_reviewed
  448. )
  449. return ListExtractionsResponse(
  450. trace_id=trace_id,
  451. count=len(pendings),
  452. items=[
  453. PendingExtractionModel(
  454. extraction_id=p.extraction_id,
  455. sequence=p.sequence,
  456. goal_id=p.goal_id,
  457. branch_id=p.branch_id,
  458. payload=p.payload,
  459. reviewed=p.reviewed,
  460. decision=p.decision,
  461. committed=p.committed,
  462. )
  463. for p in pendings
  464. ],
  465. )
  466. @router.post(
  467. "/{trace_id}/extractions/{extraction_id}/review",
  468. response_model=ReviewResponse,
  469. )
  470. async def review_extraction(trace_id: str, extraction_id: str, req: ReviewRequest):
  471. """对单条 pending 提交 review 决策(approve/edit/discard)。"""
  472. runner = _get_runner()
  473. if not runner.trace_store:
  474. raise HTTPException(status_code=503, detail="TraceStore not configured")
  475. if req.decision not in ("approve", "edit", "discard"):
  476. raise HTTPException(
  477. status_code=400,
  478. detail=f"decision must be approve/edit/discard, got {req.decision}",
  479. )
  480. if req.decision == "edit" and not req.edited_payload:
  481. raise HTTPException(
  482. status_code=400, detail="decision=edit 必须提供 edited_payload"
  483. )
  484. from agent.trace.extraction_review import review_one
  485. await review_one(
  486. runner.trace_store,
  487. trace_id,
  488. extraction_id,
  489. req.decision, # type: ignore[arg-type]
  490. edited_payload=req.edited_payload,
  491. )
  492. return ReviewResponse(
  493. trace_id=trace_id, extraction_id=extraction_id, decision=req.decision
  494. )
  495. @router.post("/{trace_id}/extractions/commit", response_model=CommitResponse)
  496. async def commit_extractions(trace_id: str):
  497. """批量把已 approved/edited 的条目上传到 KnowHub。"""
  498. runner = _get_runner()
  499. if not runner.trace_store:
  500. raise HTTPException(status_code=503, detail="TraceStore not configured")
  501. from agent.trace.extraction_review import commit_approved
  502. report = await commit_approved(runner.trace_store, trace_id)
  503. return CommitResponse(
  504. trace_id=trace_id,
  505. committed_count=len(report.committed),
  506. failed_count=len(report.failed),
  507. skipped_count=len(report.skipped),
  508. committed=report.committed,
  509. knowledge_ids=report.knowledge_ids,
  510. failed=report.failed,
  511. skipped=report.skipped,
  512. )
  513. @router.post("/{trace_id}/compact", response_model=CompactResponse)
  514. async def compact_trace(trace_id: str):
  515. """
  516. 压缩 Trace 的上下文 (Compact)
  517. 通过 force_side_branch="compression" 触发侧分支多轮 agent 模式,
  518. LLM 可以调用工具(如 goal)进行多轮推理。
  519. 压缩消息标记为侧分支(branch_type="compression"),不在主路径上。
  520. """
  521. from agent.core.runner import RunConfig
  522. runner = _get_runner()
  523. if not runner.trace_store:
  524. raise HTTPException(status_code=503, detail="TraceStore not configured")
  525. # 验证 trace 存在
  526. trace = await runner.trace_store.get_trace(trace_id)
  527. if not trace:
  528. raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
  529. # 检查是否仍在运行
  530. if trace_id in _running_tasks and not _running_tasks[trace_id].done():
  531. raise HTTPException(status_code=409, detail="Cannot compact a running trace. Stop it first.")
  532. # 使用 force_side_branch 触发压缩侧分支
  533. config = RunConfig(
  534. trace_id=trace_id,
  535. model=trace.model or "gpt-4o",
  536. force_side_branch=["compression"], # 使用列表格式
  537. max_iterations=20, # 给侧分支足够的轮次
  538. enable_prompt_caching=True,
  539. )
  540. # 启动压缩任务(后台执行)
  541. task = asyncio.create_task(_run_trace_background(runner, [], config))
  542. _running_tasks[trace_id] = task
  543. return CompactResponse(
  544. trace_id=trace_id,
  545. previous_count=0, # 无法立即获取,需通过 WebSocket 监听
  546. new_count=0,
  547. message="压缩任务已启动,通过 WebSocket 监听实时更新",
  548. )
  549. @router.get("/running", tags=["run"])
  550. async def list_running():
  551. """列出正在运行的 Trace(包含活跃状态判断)"""
  552. from datetime import datetime, timedelta
  553. runner = _get_runner()
  554. running = []
  555. for tid, task in list(_running_tasks.items()):
  556. if task.done():
  557. _running_tasks.pop(tid, None)
  558. else:
  559. # 获取trace详情,检查最后活动时间
  560. trace_info = {"trace_id": tid, "is_active": True}
  561. if runner.trace_store:
  562. try:
  563. trace = await runner.trace_store.get_trace(tid)
  564. if trace:
  565. # 判断是否真正活跃:最后活动时间在30秒内
  566. if hasattr(trace, 'last_activity_at') and trace.last_activity_at:
  567. time_since_activity = (datetime.now() - trace.last_activity_at).total_seconds()
  568. trace_info["is_active"] = time_since_activity < 30
  569. trace_info["seconds_since_activity"] = int(time_since_activity)
  570. trace_info["status"] = trace.status
  571. except Exception:
  572. pass
  573. running.append(trace_info)
  574. return {"running": running}
  575. async def reconcile_traces():
  576. """
  577. 状态对齐:启动时清理残留的 running 状态。
  578. 当服务异常停止或重启后,磁盘上的 trace 状态可能仍显示为 running,
  579. 但对应的内存任务已不存在。本函数将其强制标记为 stopped。
  580. """
  581. runner = _get_runner()
  582. if not runner or not runner.trace_store:
  583. logger.warning("[Reconciliation] Runner or TraceStore not initialized, skipping.")
  584. return
  585. try:
  586. # 获取所有 running 状态的 trace
  587. running_traces = await runner.trace_store.list_traces(status="running", limit=1000)
  588. if not running_traces:
  589. return
  590. count = 0
  591. for trace in running_traces:
  592. tid = trace.trace_id
  593. # 如果不在活跃任务字典中(服务初次启动时此字典为空),则视为异常残留
  594. if tid not in _running_tasks:
  595. logger.info(f"[Reconciliation] Fixing trace {tid}: running -> stopped")
  596. await runner.trace_store.update_trace(
  597. tid,
  598. status="stopped",
  599. result_summary="[Reconciliation] 任务由于服务重启或异常中断已自动停止。"
  600. )
  601. count += 1
  602. if count > 0:
  603. logger.info(f"[Reconciliation] Successfully reconciled {count} traces.")
  604. except Exception as e:
  605. logger.error(f"[Reconciliation] Failed to reconcile traces: {e}")
  606. # ===== 经验 API =====
  607. @experiences_router.get("/experiences")
  608. async def list_experiences():
  609. """读取经验文件内容"""
  610. runner = _get_runner()
  611. experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md")
  612. if not experiences_path or not os.path.exists(experiences_path):
  613. return {"content": "", "path": experiences_path}
  614. with open(experiences_path, "r", encoding="utf-8") as f:
  615. content = f.read()
  616. return {"content": content, "path": experiences_path}