run_api.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. """
  2. Trace 控制 API — 新建 / 运行 / 停止 / 反思
  3. 提供 POST 端点触发 Agent 执行和控制。需要通过 set_runner() 注入 AgentRunner 实例。
  4. 执行在后台异步进行,客户端通过 WebSocket (/api/traces/{trace_id}/watch) 监听实时更新。
  5. 端点:
  6. POST /api/traces — 新建 Trace 并执行
  7. POST /api/traces/{id}/run — 运行(统一续跑 + 回溯)
  8. POST /api/traces/{id}/stop — 停止运行中的 Trace
  9. POST /api/traces/{id}/reflect — 反思,在 trace 末尾追加反思 prompt 运行,结果追加到 experiences 文件
  10. GET /api/traces/running — 列出正在运行的 Trace
  11. GET /api/experiences — 读取经验文件内容
  12. """
  13. import asyncio
  14. import logging
  15. import re
  16. import uuid
  17. import os
  18. from datetime import datetime
  19. from typing import Any, Dict, List, Optional
  20. from fastapi import APIRouter, HTTPException
  21. from pydantic import BaseModel, Field
  22. logger = logging.getLogger(__name__)
  23. router = APIRouter(prefix="/api/traces", tags=["run"])
  24. # 经验 API 使用独立 prefix
  25. experiences_router = APIRouter(prefix="/api", tags=["experiences"])
  26. # ===== 全局 Runner(由 api_server.py 注入)=====
  27. _runner = None
  28. def set_runner(runner):
  29. """注入 AgentRunner 实例"""
  30. global _runner
  31. _runner = runner
  32. def _get_runner():
  33. if _runner is None:
  34. raise HTTPException(
  35. status_code=503,
  36. detail="AgentRunner not configured. Server is in read-only mode.",
  37. )
  38. return _runner
  39. # ===== Request / Response 模型 =====
  40. class CreateRequest(BaseModel):
  41. """新建执行"""
  42. messages: List[Dict[str, Any]] = Field(
  43. ...,
  44. description="OpenAI SDK 格式的输入消息。可包含 system + user 消息;若无 system 消息则从 skills 自动构建",
  45. )
  46. model: str = Field("gpt-4o", description="模型名称")
  47. temperature: float = Field(0.3)
  48. max_iterations: int = Field(200)
  49. tools: Optional[List[str]] = Field(None, description="工具白名单(None = 全部)")
  50. name: Optional[str] = Field(None, description="任务名称(None = 自动生成)")
  51. uid: Optional[str] = Field(None)
  52. project_name: Optional[str] = Field(None, description="示例项目名称,若提供则动态加载其执行环境")
  53. class TraceRunRequest(BaseModel):
  54. """运行(统一续跑 + 回溯)"""
  55. messages: List[Dict[str, Any]] = Field(
  56. default_factory=list,
  57. description="追加的新消息(可为空,用于重新生成场景)",
  58. )
  59. after_message_id: Optional[str] = Field(
  60. None,
  61. description="从哪条消息后续跑。None = 从末尾续跑,message_id = 从该消息后运行(自动判断续跑/回溯)",
  62. )
  63. class ReflectRequest(BaseModel):
  64. """反思请求"""
  65. focus: Optional[str] = Field(None, description="反思重点(可选)")
  66. class RunResponse(BaseModel):
  67. """操作响应(立即返回,后台执行)"""
  68. trace_id: str
  69. status: str = "started"
  70. message: str = ""
  71. class StopResponse(BaseModel):
  72. """停止响应"""
  73. trace_id: str
  74. status: str # "stopping" | "not_running"
  75. class ReflectResponse(BaseModel):
  76. """反思响应"""
  77. trace_id: str
  78. reflection: str
  79. class CompactResponse(BaseModel):
  80. """压缩响应"""
  81. trace_id: str
  82. previous_count: int
  83. new_count: int
  84. message: str = ""
  85. # ===== 后台执行 =====
  86. _running_tasks: Dict[str, asyncio.Task] = {}
  87. async def _run_in_background(trace_id: str, messages: List[Dict], config, runner_instance=None):
  88. """后台执行 agent,消费 run() 的所有 yield"""
  89. runner = runner_instance or _get_runner()
  90. try:
  91. async for _item in runner.run(messages=messages, config=config):
  92. pass # WebSocket 广播由 runner 内部的 store 事件驱动
  93. except Exception as e:
  94. logger.error(f"Background run failed for {trace_id}: {e}")
  95. finally:
  96. _running_tasks.pop(trace_id, None)
  97. async def _run_with_trace_signal(
  98. messages: List[Dict], config, trace_id_future: asyncio.Future, runner_instance=None
  99. ):
  100. """后台执行 agent,通过 Future 将 trace_id 传回给等待的 endpoint"""
  101. from agent.trace.models import Trace
  102. runner = runner_instance or _get_runner()
  103. trace_id: Optional[str] = None
  104. try:
  105. async for item in runner.run(messages=messages, config=config):
  106. if isinstance(item, Trace) and not trace_id_future.done():
  107. trace_id = item.trace_id
  108. trace_id_future.set_result(trace_id)
  109. except Exception as e:
  110. if not trace_id_future.done():
  111. trace_id_future.set_exception(e)
  112. logger.error(f"Background run failed: {e}")
  113. finally:
  114. if trace_id:
  115. _running_tasks.pop(trace_id, None)
  116. # ===== 路由 =====
  117. @router.post("", response_model=RunResponse)
  118. async def create_and_run(req: CreateRequest):
  119. """
  120. 新建 Trace 并开始执行
  121. 立即返回 trace_id,后台异步执行。
  122. 通过 WebSocket /api/traces/{trace_id}/watch 监听实时更新。
  123. """
  124. import importlib
  125. from agent.core.runner import RunConfig
  126. runner = None
  127. config = None
  128. messages = req.messages
  129. if req.project_name:
  130. try:
  131. # 动态加载对应 example 的 run.py
  132. module_name = f"examples.{req.project_name}.run"
  133. example_module = importlib.import_module(module_name)
  134. if hasattr(example_module, "init_project_env"):
  135. # 获取该 example 专属的 runner, 带上下文 messages, 以及默认 config
  136. runner, example_messages, default_config = await example_module.init_project_env(req.messages)
  137. messages = example_messages
  138. # 合并请求配置和 example 默认配置
  139. config = RunConfig(
  140. model=req.model or default_config.model,
  141. temperature=req.temperature if req.temperature is not None else default_config.temperature,
  142. max_iterations=req.max_iterations or default_config.max_iterations,
  143. tools=req.tools or default_config.tools,
  144. name=req.name or default_config.name,
  145. uid=req.uid or default_config.uid,
  146. enable_research_flow=default_config.enable_research_flow,
  147. context={"project_name": req.project_name}
  148. )
  149. except ImportError as e:
  150. if getattr(e, "name", None) == module_name:
  151. logger.warning(f"Project '{req.project_name}' has no custom run.py, falling back to default.")
  152. else:
  153. import traceback
  154. logger.error(f"Error INSIDE {module_name}:\n{traceback.format_exc()}")
  155. except Exception as e:
  156. import traceback
  157. logger.error(f"Unexpected error loading project environment for {req.project_name}:\n{traceback.format_exc()}")
  158. if not runner:
  159. _get_runner() # 验证全局默认 Runner 已配置
  160. config = RunConfig(
  161. model=req.model,
  162. temperature=req.temperature,
  163. max_iterations=req.max_iterations,
  164. tools=req.tools,
  165. name=req.name,
  166. uid=req.uid,
  167. context={"project_name": req.project_name} if req.project_name else {}
  168. )
  169. # 启动后台执行,通过 Future 等待 trace_id(Phase 1 完成后即返回)
  170. trace_id_future: asyncio.Future[str] = asyncio.get_running_loop().create_future()
  171. task = asyncio.create_task(
  172. _run_with_trace_signal(messages, config, trace_id_future, runner_instance=runner)
  173. )
  174. trace_id = await trace_id_future
  175. _running_tasks[trace_id] = task
  176. return RunResponse(
  177. trace_id=trace_id,
  178. status="started",
  179. message=f"Execution started. Watch via WebSocket: /api/traces/{trace_id}/watch",
  180. )
  181. async def _cleanup_incomplete_tool_calls(store, trace_id: str, after_sequence: int) -> int:
  182. """
  183. 找到安全的插入点,保证不会把新消息插在一个不完整的工具调用序列中间。
  184. 场景:
  185. 1. after_sequence 刚好是一条带 tool_calls 的 assistant 消息,
  186. 但其部分/全部 tool response 还没生成 → 回退到该 assistant 之前。
  187. 2. after_sequence 是某条 tool response,但同一批 tool_calls 中
  188. 还有其他 response 未生成 → 回退到该 assistant 之前。
  189. 核心逻辑:从 after_sequence 往前找,定位到包含它的那条 assistant 消息,
  190. 检查该 assistant 的所有 tool_calls 是否都有对应的 tool response。
  191. 如果不完整,就把截断点回退到该 assistant 消息之前(即其 parent_sequence)。
  192. Args:
  193. store: TraceStore
  194. trace_id: Trace ID
  195. after_sequence: 用户指定的插入位置
  196. Returns:
  197. 调整后的安全截断点(<= after_sequence)
  198. """
  199. all_messages = await store.get_trace_messages(trace_id)
  200. if not all_messages:
  201. return after_sequence
  202. by_seq = {msg.sequence: msg for msg in all_messages}
  203. target = by_seq.get(after_sequence)
  204. if target is None:
  205. return after_sequence
  206. # 找到"所属的 assistant 消息":
  207. # - 如果 target 本身是 assistant → 就是它
  208. # - 如果 target 是 tool → 沿 parent_sequence 往上找 assistant
  209. assistant_msg = None
  210. if target.role == "assistant":
  211. assistant_msg = target
  212. elif target.role == "tool":
  213. cur = target
  214. while cur and cur.role == "tool":
  215. parent_seq = cur.parent_sequence
  216. cur = by_seq.get(parent_seq) if parent_seq is not None else None
  217. if cur and cur.role == "assistant":
  218. assistant_msg = cur
  219. if assistant_msg is None:
  220. return after_sequence
  221. # 该 assistant 是否带 tool_calls?
  222. content = assistant_msg.content
  223. if not isinstance(content, dict) or not content.get("tool_calls"):
  224. return after_sequence
  225. # 收集所有 tool_call_ids
  226. expected_ids = set()
  227. for tc in content["tool_calls"]:
  228. if isinstance(tc, dict) and tc.get("id"):
  229. expected_ids.add(tc["id"])
  230. if not expected_ids:
  231. return after_sequence
  232. # 查找已有的 tool responses
  233. found_ids = set()
  234. for msg in all_messages:
  235. if msg.role == "tool" and msg.tool_call_id in expected_ids:
  236. found_ids.add(msg.tool_call_id)
  237. missing = expected_ids - found_ids
  238. if not missing:
  239. # 全部 tool response 都在,这是一个完整的序列
  240. return after_sequence
  241. # 不完整 → 回退到 assistant 之前
  242. safe = assistant_msg.parent_sequence
  243. if safe is None:
  244. # assistant 已经是第一条消息,没有更早的位置
  245. safe = assistant_msg.sequence - 1
  246. logger.info(
  247. "检测到不完整的工具调用 (assistant seq=%d, 缺少 %d/%d tool responses),"
  248. "自动回退插入点:%d -> %d",
  249. assistant_msg.sequence, len(missing), len(expected_ids),
  250. after_sequence, safe,
  251. )
  252. return safe
  253. def _parse_sequence_from_message_id(message_id: str) -> int:
  254. """从 message_id 末尾解析 sequence 整数(格式:{trace_id}-{sequence:04d})"""
  255. try:
  256. return int(message_id.rsplit("-", 1)[-1])
  257. except (ValueError, IndexError):
  258. raise HTTPException(
  259. status_code=422,
  260. detail=f"Invalid after_message_id format: {message_id!r}",
  261. )
  262. @router.post("/{trace_id}/run", response_model=RunResponse)
  263. async def run_trace(trace_id: str, req: TraceRunRequest):
  264. """
  265. 运行已有 Trace(统一续跑 + 回溯)
  266. - after_message_id 为 null(或省略):从末尾续跑
  267. - after_message_id 为 message_id 字符串:从该消息后运行(Runner 自动判断续跑/回溯)
  268. - messages 为空 + after_message_id 有值:重新生成(从该位置重跑,不插入新消息)
  269. **自动清理不完整工具调用**:
  270. 如果人工插入 message 的位置打断了一个工具调用过程(assistant 消息有 tool_calls
  271. 但缺少对应的 tool responses),框架会自动检测并调整插入位置,确保不会产生不一致的状态。
  272. """
  273. from agent.core.runner import RunConfig
  274. import importlib
  275. runner = _get_runner()
  276. # 将 message_id 转换为内部使用的 sequence 整数
  277. after_sequence: Optional[int] = None
  278. if req.after_message_id is not None:
  279. after_sequence = _parse_sequence_from_message_id(req.after_message_id)
  280. # 验证 trace 存在
  281. if runner.trace_store:
  282. trace = await runner.trace_store.get_trace(trace_id)
  283. if not trace:
  284. raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
  285. # 自动检查并清理不完整的工具调用
  286. if after_sequence is not None and req.messages:
  287. adjusted_seq = await _cleanup_incomplete_tool_calls(
  288. runner.trace_store, trace_id, after_sequence
  289. )
  290. if adjusted_seq != after_sequence:
  291. logger.info(
  292. f"已自动调整插入位置:{after_sequence} -> {adjusted_seq}"
  293. )
  294. after_sequence = adjusted_seq
  295. # 检查是否已在运行
  296. if trace_id in _running_tasks and not _running_tasks[trace_id].done():
  297. # 竞态窗口修复:task 还没退出,但 store 里 trace 已经是 stopped/failed
  298. # 这发生在 cancel_event 触发后 → store 更新 → task finally 块还未执行之间
  299. # 此时可以安全地强制清除旧 task,允许续跑
  300. store_trace = None
  301. if runner.trace_store:
  302. store_trace = await runner.trace_store.get_trace(trace_id)
  303. if store_trace and store_trace.status in ("stopped", "failed", "completed"):
  304. logger.info(
  305. f"run_trace: task for {trace_id} not done yet but store status={store_trace.status!r}, "
  306. "forcing cleanup to allow resume"
  307. )
  308. _running_tasks[trace_id].cancel()
  309. _running_tasks.pop(trace_id, None)
  310. else:
  311. raise HTTPException(status_code=409, detail="Trace is already running")
  312. # 检测 trace 中是否包含 project_name 环境定义
  313. trace_context = trace.context or {}
  314. project_name = trace_context.get("project_name")
  315. if project_name:
  316. try:
  317. module_name = f"examples.{project_name}.run"
  318. example_module = importlib.import_module(module_name)
  319. if hasattr(example_module, "init_project_env"):
  320. logger.info(f"Trace {trace_id} 绑定了项目 {project_name},动态加载执行环境...")
  321. project_runner, project_msgs, default_config = await example_module.init_project_env()
  322. runner = project_runner # 发生替换
  323. except ImportError as e:
  324. if getattr(e, "name", None) == module_name:
  325. logger.warning(f"Project '{project_name}' has no custom run.py, keeping default runner.")
  326. else:
  327. import traceback
  328. logger.error(f"Error INSIDE {module_name} during resume:\n{traceback.format_exc()}")
  329. except Exception as e:
  330. import traceback
  331. logger.error(f"Unexpected error loading run.py environment for project {project_name} in trace {trace_id}:\n{traceback.format_exc()}")
  332. config = RunConfig(trace_id=trace_id, after_sequence=after_sequence)
  333. # 恢复运行时,将状态从 stopped 改回 running,并广播状态变化
  334. if runner.trace_store and trace_id:
  335. current_trace = await runner.trace_store.get_trace(trace_id)
  336. if current_trace and current_trace.status == "stopped":
  337. await runner.trace_store.update_trace(trace_id, status="running")
  338. # 广播状态变化给前端
  339. from agent.trace.websocket import broadcast_trace_status_changed
  340. await broadcast_trace_status_changed(trace_id, "running")
  341. task = asyncio.create_task(_run_in_background(trace_id, req.messages, config, runner_instance=runner))
  342. _running_tasks[trace_id] = task
  343. mode = "rewind" if after_sequence is not None else "continue"
  344. return RunResponse(
  345. trace_id=trace_id,
  346. status="started",
  347. message=f"Run ({mode}) started. Watch via WebSocket: /api/traces/{trace_id}/watch",
  348. )
  349. @router.post("/{trace_id}/stop", response_model=StopResponse)
  350. async def stop_trace(trace_id: str):
  351. """
  352. 停止运行中的 Trace
  353. 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。
  354. Trace 状态置为 "stopped"。
  355. """
  356. runner = _get_runner()
  357. # 通过 runner 的 stop 方法设置取消信号
  358. stopped = await runner.stop(trace_id)
  359. if not stopped:
  360. # 检查是否在 _running_tasks 但 runner 不知道(可能已完成)
  361. if trace_id in _running_tasks:
  362. task = _running_tasks[trace_id]
  363. if not task.done():
  364. task.cancel()
  365. _running_tasks.pop(trace_id, None)
  366. return StopResponse(trace_id=trace_id, status="stopping")
  367. return StopResponse(trace_id=trace_id, status="not_running")
  368. return StopResponse(trace_id=trace_id, status="stopping")
  369. @router.post("/{trace_id}/reflect", response_model=ReflectResponse)
  370. async def reflect_trace(trace_id: str, req: ReflectRequest):
  371. """
  372. 触发反思
  373. 通过 force_side_branch="reflection" 触发侧分支多轮 agent 模式,
  374. LLM 可以调用工具(如 knowledge_search, knowledge_save)进行多轮推理。
  375. 反思消息标记为侧分支(branch_type="reflection"),不在主路径上。
  376. """
  377. from agent.core.runner import RunConfig
  378. runner = _get_runner()
  379. if not runner.trace_store:
  380. raise HTTPException(status_code=503, detail="TraceStore not configured")
  381. # 验证 trace 存在
  382. trace = await runner.trace_store.get_trace(trace_id)
  383. if not trace:
  384. raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
  385. # 检查是否仍在运行
  386. if trace_id in _running_tasks and not _running_tasks[trace_id].done():
  387. raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.")
  388. # 使用 force_side_branch 触发反思侧分支
  389. config = RunConfig(
  390. trace_id=trace_id,
  391. model=trace.model or "gpt-4o",
  392. force_side_branch=["reflection"], # 使用列表格式
  393. max_iterations=20, # 给侧分支足够的轮次
  394. enable_prompt_caching=True,
  395. )
  396. # 如果有 focus,可以通过追加消息传递(可选)
  397. messages = []
  398. if req.focus:
  399. messages = [{"role": "user", "content": f"反思重点:{req.focus}"}]
  400. # 启动反思任务(后台执行)
  401. task = asyncio.create_task(_run_trace_background(runner, messages, config))
  402. _running_tasks[trace_id] = task
  403. return ReflectResponse(
  404. trace_id=trace_id,
  405. reflection="反思任务已启动,通过 WebSocket 监听实时更新",
  406. )
  407. @router.post("/{trace_id}/compact", response_model=CompactResponse)
  408. async def compact_trace(trace_id: str):
  409. """
  410. 压缩 Trace 的上下文 (Compact)
  411. 通过 force_side_branch="compression" 触发侧分支多轮 agent 模式,
  412. LLM 可以调用工具(如 goal)进行多轮推理。
  413. 压缩消息标记为侧分支(branch_type="compression"),不在主路径上。
  414. """
  415. from agent.core.runner import RunConfig
  416. runner = _get_runner()
  417. if not runner.trace_store:
  418. raise HTTPException(status_code=503, detail="TraceStore not configured")
  419. # 验证 trace 存在
  420. trace = await runner.trace_store.get_trace(trace_id)
  421. if not trace:
  422. raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
  423. # 检查是否仍在运行
  424. if trace_id in _running_tasks and not _running_tasks[trace_id].done():
  425. raise HTTPException(status_code=409, detail="Cannot compact a running trace. Stop it first.")
  426. # 使用 force_side_branch 触发压缩侧分支
  427. config = RunConfig(
  428. trace_id=trace_id,
  429. model=trace.model or "gpt-4o",
  430. force_side_branch=["compression"], # 使用列表格式
  431. max_iterations=20, # 给侧分支足够的轮次
  432. enable_prompt_caching=True,
  433. )
  434. # 启动压缩任务(后台执行)
  435. task = asyncio.create_task(_run_trace_background(runner, [], config))
  436. _running_tasks[trace_id] = task
  437. return CompactResponse(
  438. trace_id=trace_id,
  439. previous_count=0, # 无法立即获取,需通过 WebSocket 监听
  440. new_count=0,
  441. message="压缩任务已启动,通过 WebSocket 监听实时更新",
  442. )
  443. @router.get("/running", tags=["run"])
  444. async def list_running():
  445. """列出正在运行的 Trace(包含活跃状态判断)"""
  446. from datetime import datetime, timedelta
  447. runner = _get_runner()
  448. running = []
  449. for tid, task in list(_running_tasks.items()):
  450. if task.done():
  451. _running_tasks.pop(tid, None)
  452. else:
  453. # 获取trace详情,检查最后活动时间
  454. trace_info = {"trace_id": tid, "is_active": True}
  455. if runner.trace_store:
  456. try:
  457. trace = await runner.trace_store.get_trace(tid)
  458. if trace:
  459. # 判断是否真正活跃:最后活动时间在30秒内
  460. if hasattr(trace, 'last_activity_at') and trace.last_activity_at:
  461. time_since_activity = (datetime.now() - trace.last_activity_at).total_seconds()
  462. trace_info["is_active"] = time_since_activity < 30
  463. trace_info["seconds_since_activity"] = int(time_since_activity)
  464. trace_info["status"] = trace.status
  465. except Exception:
  466. pass
  467. running.append(trace_info)
  468. return {"running": running}
  469. async def reconcile_traces():
  470. """
  471. 状态对齐:启动时清理残留的 running 状态。
  472. 当服务异常停止或重启后,磁盘上的 trace 状态可能仍显示为 running,
  473. 但对应的内存任务已不存在。本函数将其强制标记为 stopped。
  474. """
  475. runner = _get_runner()
  476. if not runner or not runner.trace_store:
  477. logger.warning("[Reconciliation] Runner or TraceStore not initialized, skipping.")
  478. return
  479. try:
  480. # 获取所有 running 状态的 trace
  481. running_traces = await runner.trace_store.list_traces(status="running", limit=1000)
  482. if not running_traces:
  483. return
  484. count = 0
  485. for trace in running_traces:
  486. tid = trace.trace_id
  487. # 如果不在活跃任务字典中(服务初次启动时此字典为空),则视为异常残留
  488. if tid not in _running_tasks:
  489. logger.info(f"[Reconciliation] Fixing trace {tid}: running -> stopped")
  490. await runner.trace_store.update_trace(
  491. tid,
  492. status="stopped",
  493. result_summary="[Reconciliation] 任务由于服务重启或异常中断已自动停止。"
  494. )
  495. count += 1
  496. if count > 0:
  497. logger.info(f"[Reconciliation] Successfully reconciled {count} traces.")
  498. except Exception as e:
  499. logger.error(f"[Reconciliation] Failed to reconcile traces: {e}")
  500. # ===== 经验 API =====
  501. @experiences_router.get("/experiences")
  502. async def list_experiences():
  503. """读取经验文件内容"""
  504. runner = _get_runner()
  505. experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md")
  506. if not experiences_path or not os.path.exists(experiences_path):
  507. return {"content": "", "path": experiences_path}
  508. with open(experiences_path, "r", encoding="utf-8") as f:
  509. content = f.read()
  510. return {"content": content, "path": experiences_path}