store.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. """
  2. FileSystem Trace Store - 文件系统存储实现
  3. 用于跨进程数据共享,数据持久化到 .trace/ 目录
  4. 目录结构:
  5. .trace/{trace_id}/
  6. ├── meta.json # Trace 元数据
  7. ├── goal.json # GoalTree(扁平 JSON,通过 parent_id 构建层级)
  8. ├── messages/ # Messages(每条独立文件)
  9. │ ├── {message_id}.json
  10. │ └── ...
  11. └── events.jsonl # 事件流(WebSocket 续传)
  12. Sub-Trace 是完全独立的 Trace,有自己的目录:
  13. .trace/{parent_id}@{mode}-{timestamp}-{seq}/
  14. ├── meta.json # parent_trace_id 指向父 Trace
  15. ├── goal.json
  16. ├── messages/
  17. └── events.jsonl
  18. """
  19. import json
  20. import os
  21. import logging
  22. from pathlib import Path
  23. from typing import Dict, List, Optional, Any
  24. from datetime import datetime
  25. from .models import Trace, Message
  26. from .goal_models import GoalTree, Goal, GoalStats
  27. logger = logging.getLogger(__name__)
  28. class FileSystemTraceStore:
  29. """文件系统 Trace 存储"""
  30. def __init__(self, base_path: str = ".trace"):
  31. self.base_path = Path(base_path)
  32. self.base_path.mkdir(exist_ok=True)
  33. def _get_trace_dir(self, trace_id: str) -> Path:
  34. """获取 trace 目录"""
  35. return self.base_path / trace_id
  36. def _get_meta_file(self, trace_id: str) -> Path:
  37. """获取 meta.json 文件路径"""
  38. return self._get_trace_dir(trace_id) / "meta.json"
  39. def _get_goal_file(self, trace_id: str) -> Path:
  40. """获取 goal.json 文件路径"""
  41. return self._get_trace_dir(trace_id) / "goal.json"
  42. def _get_messages_dir(self, trace_id: str) -> Path:
  43. """获取 messages 目录"""
  44. return self._get_trace_dir(trace_id) / "messages"
  45. def _get_message_file(self, trace_id: str, message_id: str) -> Path:
  46. """获取 message 文件路径"""
  47. return self._get_messages_dir(trace_id) / f"{message_id}.json"
  48. def _get_events_file(self, trace_id: str) -> Path:
  49. """获取 events.jsonl 文件路径"""
  50. return self._get_trace_dir(trace_id) / "events.jsonl"
  51. def _get_model_usage_file(self, trace_id: str) -> Path:
  52. """获取 model_usage.json 文件路径"""
  53. return self._get_trace_dir(trace_id) / "model_usage.json"
  54. # ===== Trace 操作 =====
  55. async def create_trace(self, trace: Trace) -> str:
  56. """创建新的 Trace"""
  57. trace_dir = self._get_trace_dir(trace.trace_id)
  58. trace_dir.mkdir(exist_ok=True)
  59. # 创建 messages 目录
  60. messages_dir = self._get_messages_dir(trace.trace_id)
  61. messages_dir.mkdir(exist_ok=True)
  62. # 写入 meta.json
  63. meta_file = self._get_meta_file(trace.trace_id)
  64. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
  65. # 创建空的 events.jsonl
  66. events_file = self._get_events_file(trace.trace_id)
  67. events_file.touch()
  68. return trace.trace_id
  69. async def get_trace(self, trace_id: str) -> Optional[Trace]:
  70. """获取 Trace"""
  71. meta_file = self._get_meta_file(trace_id)
  72. if not meta_file.exists():
  73. return None
  74. data = json.loads(meta_file.read_text(encoding="utf-8"))
  75. # 解析 datetime 字段
  76. if data.get("created_at"):
  77. data["created_at"] = datetime.fromisoformat(data["created_at"])
  78. if data.get("completed_at"):
  79. data["completed_at"] = datetime.fromisoformat(data["completed_at"])
  80. return Trace.from_dict(data)
  81. async def update_trace(self, trace_id: str, **updates) -> None:
  82. """更新 Trace"""
  83. trace = await self.get_trace(trace_id)
  84. if not trace:
  85. return
  86. # 更新字段
  87. for key, value in updates.items():
  88. if hasattr(trace, key):
  89. setattr(trace, key, value)
  90. # 写回文件
  91. meta_file = self._get_meta_file(trace_id)
  92. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
  93. async def list_traces(
  94. self,
  95. mode: Optional[str] = None,
  96. agent_type: Optional[str] = None,
  97. uid: Optional[str] = None,
  98. status: Optional[str] = None,
  99. limit: int = 50
  100. ) -> List[Trace]:
  101. """列出 Traces"""
  102. traces = []
  103. if not self.base_path.exists():
  104. return []
  105. for trace_dir in self.base_path.iterdir():
  106. if not trace_dir.is_dir():
  107. continue
  108. meta_file = trace_dir / "meta.json"
  109. if not meta_file.exists():
  110. continue
  111. try:
  112. data = json.loads(meta_file.read_text(encoding="utf-8"))
  113. # 过滤
  114. if mode and data.get("mode") != mode:
  115. continue
  116. if agent_type and data.get("agent_type") != agent_type:
  117. continue
  118. if uid and data.get("uid") != uid:
  119. continue
  120. if status and data.get("status") != status:
  121. continue
  122. # 解析 datetime
  123. if data.get("created_at"):
  124. data["created_at"] = datetime.fromisoformat(data["created_at"])
  125. if data.get("completed_at"):
  126. data["completed_at"] = datetime.fromisoformat(data["completed_at"])
  127. traces.append(Trace.from_dict(data))
  128. except Exception:
  129. continue
  130. # 排序(最新的在前)
  131. traces.sort(key=lambda t: t.created_at, reverse=True)
  132. return traces[:limit]
  133. # ===== GoalTree 操作 =====
  134. async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]:
  135. """获取 GoalTree"""
  136. goal_file = self._get_goal_file(trace_id)
  137. if not goal_file.exists():
  138. return None
  139. try:
  140. data = json.loads(goal_file.read_text(encoding="utf-8"))
  141. return GoalTree.from_dict(data)
  142. except Exception:
  143. return None
  144. async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None:
  145. """更新完整 GoalTree"""
  146. goal_file = self._get_goal_file(trace_id)
  147. goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
  148. async def add_goal(self, trace_id: str, goal: Goal) -> None:
  149. """添加 Goal 到 GoalTree"""
  150. tree = await self.get_goal_tree(trace_id)
  151. if not tree:
  152. return
  153. tree.goals.append(goal)
  154. await self.update_goal_tree(trace_id, tree)
  155. # 推送 goal_added 事件
  156. event_data = {
  157. "goal": goal.to_dict(),
  158. "parent_id": goal.parent_id
  159. }
  160. await self.append_event(trace_id, "goal_added", event_data)
  161. # 打印详细的 goal 信息
  162. desc_preview = goal.description[:80] + "..." if len(goal.description) > 80 else goal.description
  163. print(f"[Goal Added] ID={goal.id}, Parent={goal.parent_id or 'root'}")
  164. print(f" 📝 {desc_preview}")
  165. if goal.reason:
  166. reason_preview = goal.reason[:60] + "..." if len(goal.reason) > 60 else goal.reason
  167. print(f" 💡 {reason_preview}")
  168. async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None:
  169. """更新 Goal 字段"""
  170. tree = await self.get_goal_tree(trace_id)
  171. if not tree:
  172. return
  173. goal = tree.find(goal_id)
  174. if not goal:
  175. return
  176. # 更新字段
  177. for key, value in updates.items():
  178. if hasattr(goal, key):
  179. # 特殊处理 stats 字段(可能是 dict)
  180. if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict):
  181. value = GoalStats.from_dict(value)
  182. setattr(goal, key, value)
  183. await self.update_goal_tree(trace_id, tree)
  184. # 推送 goal_updated 事件
  185. # 如果状态变为 completed,检查是否需要级联完成父 Goal
  186. affected_goals = [{"goal_id": goal_id, "updates": updates}]
  187. if updates.get("status") == "completed":
  188. # 检查级联完成:如果所有兄弟 Goal 都完成,父 Goal 也完成
  189. cascade_completed = await self._check_cascade_completion(trace_id, goal)
  190. affected_goals.extend(cascade_completed)
  191. await self.append_event(trace_id, "goal_updated", {
  192. "goal_id": goal_id,
  193. "updates": updates,
  194. "affected_goals": affected_goals
  195. })
  196. print(f"[DEBUG] Pushed goal_updated event: goal_id={goal_id}, updates={updates}, affected={len(affected_goals)}")
  197. async def _check_cascade_completion(
  198. self,
  199. trace_id: str,
  200. completed_goal: Goal
  201. ) -> List[Dict[str, Any]]:
  202. """
  203. 检查级联完成:如果一个 Goal 的所有子 Goal 都完成,则自动完成父 Goal
  204. Args:
  205. trace_id: Trace ID
  206. completed_goal: 刚完成的 Goal
  207. Returns:
  208. 受影响的父 Goals 列表(自动完成的)
  209. """
  210. if not completed_goal.parent_id:
  211. return []
  212. tree = await self.get_goal_tree(trace_id)
  213. if not tree:
  214. return []
  215. affected = []
  216. parent = tree.find(completed_goal.parent_id)
  217. if not parent:
  218. return []
  219. # 获取父 Goal 的所有子 Goal
  220. children = tree.get_children(parent.id)
  221. # 检查是否所有子 Goal 都已完成(排除 abandoned)
  222. all_completed = all(
  223. child.status in ["completed", "abandoned"]
  224. for child in children
  225. )
  226. if all_completed and parent.status != "completed":
  227. # 自动完成父 Goal
  228. parent.status = "completed"
  229. if not parent.summary:
  230. # 生成自动摘要
  231. completed_count = sum(1 for c in children if c.status == "completed")
  232. parent.summary = f"所有子目标已完成 ({completed_count}/{len(children)})"
  233. await self.update_goal_tree(trace_id, tree)
  234. affected.append({
  235. "goal_id": parent.id,
  236. "status": "completed",
  237. "summary": parent.summary,
  238. "cumulative_stats": parent.cumulative_stats.to_dict()
  239. })
  240. # 递归检查祖父 Goal
  241. grandparent_affected = await self._check_cascade_completion(trace_id, parent)
  242. affected.extend(grandparent_affected)
  243. return affected
  244. # ===== Message 操作 =====
  245. async def add_message(self, message: Message) -> str:
  246. """
  247. 添加 Message
  248. 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats)
  249. """
  250. trace_id = message.trace_id
  251. # 1. 写入 message 文件
  252. messages_dir = self._get_messages_dir(trace_id)
  253. message_file = messages_dir / f"{message.message_id}.json"
  254. message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
  255. # 2. 更新 trace 统计
  256. trace = await self.get_trace(trace_id)
  257. if trace:
  258. trace.total_messages += 1
  259. trace.last_sequence = max(trace.last_sequence, message.sequence)
  260. # 累计 tokens(完整版)
  261. if message.prompt_tokens:
  262. trace.total_prompt_tokens += message.prompt_tokens
  263. if message.completion_tokens:
  264. trace.total_completion_tokens += message.completion_tokens
  265. if message.reasoning_tokens:
  266. trace.total_reasoning_tokens += message.reasoning_tokens
  267. if message.cache_creation_tokens:
  268. trace.total_cache_creation_tokens += message.cache_creation_tokens
  269. if message.cache_read_tokens:
  270. trace.total_cache_read_tokens += message.cache_read_tokens
  271. # 向后兼容:也更新 total_tokens
  272. if message.tokens:
  273. trace.total_tokens += message.tokens
  274. elif message.prompt_tokens or message.completion_tokens:
  275. trace.total_tokens += (message.prompt_tokens or 0) + (message.completion_tokens or 0)
  276. if message.cost:
  277. trace.total_cost += message.cost
  278. if message.duration_ms:
  279. trace.total_duration_ms += message.duration_ms
  280. # 更新 Trace
  281. await self.update_trace(
  282. trace_id,
  283. total_messages=trace.total_messages,
  284. last_sequence=trace.last_sequence,
  285. total_tokens=trace.total_tokens,
  286. total_prompt_tokens=trace.total_prompt_tokens,
  287. total_completion_tokens=trace.total_completion_tokens,
  288. total_reasoning_tokens=trace.total_reasoning_tokens,
  289. total_cache_creation_tokens=trace.total_cache_creation_tokens,
  290. total_cache_read_tokens=trace.total_cache_read_tokens,
  291. total_cost=trace.total_cost,
  292. total_duration_ms=trace.total_duration_ms
  293. )
  294. # 3. 更新 Goal stats
  295. await self._update_goal_stats(trace_id, message)
  296. # 4. 追加 message_added 事件
  297. affected_goals = await self._get_affected_goals(trace_id, message)
  298. event_id = await self.append_event(trace_id, "message_added", {
  299. "message": message.to_dict(),
  300. "affected_goals": affected_goals
  301. })
  302. if event_id:
  303. try:
  304. from . import websocket as trace_ws
  305. await trace_ws.broadcast_message_added(
  306. trace_id=trace_id,
  307. event_id=event_id,
  308. message_dict=message.to_dict(),
  309. affected_goals=affected_goals,
  310. )
  311. except Exception:
  312. logger.exception("Failed to broadcast message_added (trace_id=%s, event_id=%s)", trace_id, event_id)
  313. return message.message_id
  314. async def _update_goal_stats(self, trace_id: str, message: Message) -> None:
  315. """更新 Goal 的 self_stats 和祖先的 cumulative_stats"""
  316. tree = await self.get_goal_tree(trace_id)
  317. if not tree:
  318. return
  319. # 找到关联的 Goal
  320. goal = tree.find(message.goal_id)
  321. if not goal:
  322. return
  323. # 更新自身 self_stats
  324. goal.self_stats.message_count += 1
  325. if message.tokens:
  326. goal.self_stats.total_tokens += message.tokens
  327. if message.cost:
  328. goal.self_stats.total_cost += message.cost
  329. # TODO: 更新 preview(工具调用摘要)
  330. # 更新自身 cumulative_stats
  331. goal.cumulative_stats.message_count += 1
  332. if message.tokens:
  333. goal.cumulative_stats.total_tokens += message.tokens
  334. if message.cost:
  335. goal.cumulative_stats.total_cost += message.cost
  336. # 沿祖先链向上更新 cumulative_stats
  337. current_goal = goal
  338. while current_goal.parent_id:
  339. parent = tree.find(current_goal.parent_id)
  340. if not parent:
  341. break
  342. parent.cumulative_stats.message_count += 1
  343. if message.tokens:
  344. parent.cumulative_stats.total_tokens += message.tokens
  345. if message.cost:
  346. parent.cumulative_stats.total_cost += message.cost
  347. current_goal = parent
  348. # 保存更新后的 tree
  349. await self.update_goal_tree(trace_id, tree)
  350. async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]:
  351. """获取受影响的 Goals(自身 + 所有祖先)"""
  352. tree = await self.get_goal_tree(trace_id)
  353. if not tree:
  354. return []
  355. goal = tree.find(message.goal_id)
  356. if not goal:
  357. return []
  358. affected = []
  359. # 添加自身(包含 self_stats 和 cumulative_stats)
  360. affected.append({
  361. "goal_id": goal.id,
  362. "self_stats": goal.self_stats.to_dict(),
  363. "cumulative_stats": goal.cumulative_stats.to_dict()
  364. })
  365. # 添加所有祖先(仅 cumulative_stats)
  366. current_goal = goal
  367. while current_goal.parent_id:
  368. parent = tree.find(current_goal.parent_id)
  369. if not parent:
  370. break
  371. affected.append({
  372. "goal_id": parent.id,
  373. "cumulative_stats": parent.cumulative_stats.to_dict()
  374. })
  375. current_goal = parent
  376. return affected
  377. return affected
  378. async def get_message(self, message_id: str) -> Optional[Message]:
  379. """获取 Message(扫描所有 trace)"""
  380. for trace_dir in self.base_path.iterdir():
  381. if not trace_dir.is_dir():
  382. continue
  383. # 检查 messages 目录
  384. message_file = trace_dir / "messages" / f"{message_id}.json"
  385. if message_file.exists():
  386. try:
  387. data = json.loads(message_file.read_text(encoding="utf-8"))
  388. return Message.from_dict(data)
  389. except Exception:
  390. pass
  391. return None
  392. async def get_trace_messages(
  393. self,
  394. trace_id: str,
  395. ) -> List[Message]:
  396. """获取 Trace 的所有 Messages(包含所有分支,按 sequence 排序)"""
  397. messages_dir = self._get_messages_dir(trace_id)
  398. if not messages_dir.exists():
  399. return []
  400. messages = []
  401. for message_file in messages_dir.glob("*.json"):
  402. try:
  403. data = json.loads(message_file.read_text(encoding="utf-8"))
  404. msg = Message.from_dict(data)
  405. messages.append(msg)
  406. except Exception:
  407. continue
  408. # 按 sequence 排序
  409. messages.sort(key=lambda m: m.sequence)
  410. return messages
  411. async def get_main_path_messages(
  412. self,
  413. trace_id: str,
  414. head_sequence: int
  415. ) -> List[Message]:
  416. """
  417. 获取主路径上的消息(从 head_sequence 沿 parent_sequence 链回溯到 root)
  418. Returns:
  419. 按 sequence 正序排列的主路径 Message 列表
  420. """
  421. # 加载所有消息,建立 sequence -> Message 索引
  422. all_messages = await self.get_trace_messages(trace_id)
  423. messages_by_seq = {m.sequence: m for m in all_messages}
  424. # 从 head 沿 parent chain 回溯
  425. path = []
  426. seq = head_sequence
  427. while seq is not None:
  428. msg = messages_by_seq.get(seq)
  429. if not msg:
  430. break
  431. path.append(msg)
  432. seq = msg.parent_sequence
  433. # 反转为正序(root → head)
  434. path.reverse()
  435. return path
  436. async def get_messages_by_goal(
  437. self,
  438. trace_id: str,
  439. goal_id: str
  440. ) -> List[Message]:
  441. """获取指定 Goal 关联的所有 Messages"""
  442. all_messages = await self.get_trace_messages(trace_id)
  443. return [m for m in all_messages if m.goal_id == goal_id]
  444. async def update_message(self, message_id: str, **updates) -> None:
  445. """更新 Message 字段"""
  446. message = await self.get_message(message_id)
  447. if not message:
  448. return
  449. # 更新字段
  450. for key, value in updates.items():
  451. if hasattr(message, key):
  452. setattr(message, key, value)
  453. # 确定文件路径
  454. messages_dir = self._get_messages_dir(message.trace_id)
  455. message_file = messages_dir / f"{message_id}.json"
  456. message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
  457. async def abandon_messages_after(self, trace_id: str, cutoff_sequence: int) -> List[str]:
  458. """
  459. 将 sequence > cutoff_sequence 的 active messages 标记为 abandoned。
  460. 返回被 abandon 的 message_id 列表。
  461. """
  462. all_messages = await self.get_trace_messages(trace_id)
  463. abandoned_ids = []
  464. now = datetime.now()
  465. for msg in all_messages:
  466. if msg.sequence > cutoff_sequence and msg.status == "active":
  467. msg.status = "abandoned"
  468. msg.abandoned_at = now
  469. # 直接写回文件
  470. message_file = self._get_messages_dir(trace_id) / f"{msg.message_id}.json"
  471. message_file.write_text(
  472. json.dumps(msg.to_dict(), indent=2, ensure_ascii=False),
  473. encoding="utf-8"
  474. )
  475. abandoned_ids.append(msg.message_id)
  476. return abandoned_ids
  477. # ===== 模型使用追踪 =====
  478. async def record_model_usage(
  479. self,
  480. trace_id: str,
  481. sequence: int,
  482. role: str,
  483. model: str,
  484. prompt_tokens: int,
  485. completion_tokens: int,
  486. cache_read_tokens: int = 0,
  487. tool_name: Optional[str] = None,
  488. ) -> None:
  489. """
  490. 记录模型使用情况到 model_usage.json
  491. Args:
  492. trace_id: Trace ID
  493. sequence: 消息序号
  494. role: 角色(assistant/tool)
  495. model: 模型名称
  496. prompt_tokens: 输入tokens
  497. completion_tokens: 输出tokens
  498. cache_read_tokens: 缓存读取tokens
  499. tool_name: 工具名称(role=tool时)
  500. """
  501. usage_file = self._get_model_usage_file(trace_id)
  502. # 读取现有数据
  503. if usage_file.exists():
  504. data = json.loads(usage_file.read_text(encoding="utf-8"))
  505. else:
  506. data = {
  507. "summary": {
  508. "total_models": 0,
  509. "total_tokens": 0,
  510. "total_cache_read_tokens": 0,
  511. "agent_tokens": 0,
  512. "tool_tokens": 0,
  513. },
  514. "models": [],
  515. "timeline": [],
  516. }
  517. # 更新summary
  518. total_tokens = prompt_tokens + completion_tokens
  519. data["summary"]["total_tokens"] += total_tokens
  520. data["summary"]["total_cache_read_tokens"] += cache_read_tokens
  521. if role == "assistant":
  522. data["summary"]["agent_tokens"] += total_tokens
  523. source = "agent"
  524. else:
  525. data["summary"]["tool_tokens"] += total_tokens
  526. source = f"tool:{tool_name}" if tool_name else "tool"
  527. # 更新models列表
  528. model_entry = None
  529. for m in data["models"]:
  530. if m["model"] == model and m["source"] == source:
  531. model_entry = m
  532. break
  533. if model_entry:
  534. model_entry["prompt_tokens"] += prompt_tokens
  535. model_entry["completion_tokens"] += completion_tokens
  536. model_entry["total_tokens"] += total_tokens
  537. model_entry["cache_read_tokens"] += cache_read_tokens
  538. model_entry["call_count"] += 1
  539. else:
  540. data["models"].append({
  541. "model": model,
  542. "source": source,
  543. "prompt_tokens": prompt_tokens,
  544. "completion_tokens": completion_tokens,
  545. "total_tokens": total_tokens,
  546. "cache_read_tokens": cache_read_tokens,
  547. "call_count": 1,
  548. })
  549. data["summary"]["total_models"] = len(data["models"])
  550. # 添加到timeline
  551. timeline_entry = {
  552. "sequence": sequence,
  553. "role": role,
  554. "model": model,
  555. "prompt_tokens": prompt_tokens,
  556. "completion_tokens": completion_tokens,
  557. }
  558. if cache_read_tokens > 0:
  559. timeline_entry["cache_read_tokens"] = cache_read_tokens
  560. if tool_name:
  561. timeline_entry["tool_name"] = tool_name
  562. data["timeline"].append(timeline_entry)
  563. # 写回文件
  564. usage_file.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
  565. # ===== 事件流操作(用于 WebSocket 断线续传)=====
  566. async def get_events(
  567. self,
  568. trace_id: str,
  569. since_event_id: int = 0
  570. ) -> List[Dict[str, Any]]:
  571. """获取事件流"""
  572. events_file = self._get_events_file(trace_id)
  573. if not events_file.exists():
  574. return []
  575. events = []
  576. with events_file.open('r', encoding='utf-8') as f:
  577. for line in f:
  578. try:
  579. event = json.loads(line.strip())
  580. if event.get("event_id", 0) > since_event_id:
  581. events.append(event)
  582. except Exception:
  583. continue
  584. return events
  585. async def append_event(
  586. self,
  587. trace_id: str,
  588. event_type: str,
  589. payload: Dict[str, Any]
  590. ) -> int:
  591. """追加事件,返回 event_id"""
  592. # 获取 trace 并递增 event_id
  593. trace = await self.get_trace(trace_id)
  594. if not trace:
  595. return 0
  596. trace.last_event_id += 1
  597. event_id = trace.last_event_id
  598. # 更新 trace 的 last_event_id
  599. await self.update_trace(trace_id, last_event_id=event_id)
  600. # 创建事件
  601. event = {
  602. "event_id": event_id,
  603. "event": event_type,
  604. "ts": datetime.now().isoformat(),
  605. **payload
  606. }
  607. # 追加到 events.jsonl
  608. events_file = self._get_events_file(trace_id)
  609. with events_file.open('a', encoding='utf-8') as f:
  610. f.write(json.dumps(event, ensure_ascii=False) + '\n')
  611. return event_id