fs_store.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. """
  2. FileSystem Trace Store - 文件系统存储实现
  3. 用于跨进程数据共享,数据持久化到 .trace/ 目录
  4. """
  5. import json
  6. import os
  7. from pathlib import Path
  8. from typing import Dict, List, Optional, Any
  9. from datetime import datetime
  10. from agent.execution.models import Trace, Step
  11. class FileSystemTraceStore:
  12. """文件系统 Trace 存储
  13. 目录结构:
  14. .trace/
  15. ├── trace-001/
  16. │ ├── meta.json # Trace 元数据
  17. │ ├── steps/
  18. │ │ ├── step-1.json # 每个 step 独立文件
  19. │ │ ├── step-2.json
  20. │ │ └── step-3.json
  21. │ └── events.jsonl # 事件流(WebSocket 续传)
  22. └── trace-002/
  23. └── ...
  24. """
  25. def __init__(self, base_path: str = ".trace"):
  26. self.base_path = Path(base_path)
  27. self.base_path.mkdir(exist_ok=True)
  28. def _get_trace_dir(self, trace_id: str) -> Path:
  29. """获取 trace 目录"""
  30. return self.base_path / trace_id
  31. def _get_meta_file(self, trace_id: str) -> Path:
  32. """获取 meta.json 文件路径"""
  33. return self._get_trace_dir(trace_id) / "meta.json"
  34. def _get_steps_dir(self, trace_id: str) -> Path:
  35. """获取 steps 目录"""
  36. return self._get_trace_dir(trace_id) / "steps"
  37. def _get_step_file(self, trace_id: str, step_id: str) -> Path:
  38. """获取 step 文件路径"""
  39. return self._get_steps_dir(trace_id) / f"{step_id}.json"
  40. def _get_events_file(self, trace_id: str) -> Path:
  41. """获取 events.jsonl 文件路径"""
  42. return self._get_trace_dir(trace_id) / "events.jsonl"
  43. # ===== Trace 操作 =====
  44. async def create_trace(self, trace: Trace) -> str:
  45. """创建新的 Trace"""
  46. trace_dir = self._get_trace_dir(trace.trace_id)
  47. trace_dir.mkdir(exist_ok=True)
  48. # 创建 steps 目录
  49. steps_dir = self._get_steps_dir(trace.trace_id)
  50. steps_dir.mkdir(exist_ok=True)
  51. # 写入 meta.json
  52. meta_file = self._get_meta_file(trace.trace_id)
  53. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
  54. # 创建空的 events.jsonl
  55. events_file = self._get_events_file(trace.trace_id)
  56. events_file.touch()
  57. return trace.trace_id
  58. async def get_trace(self, trace_id: str) -> Optional[Trace]:
  59. """获取 Trace"""
  60. meta_file = self._get_meta_file(trace_id)
  61. if not meta_file.exists():
  62. return None
  63. data = json.loads(meta_file.read_text())
  64. # 解析 datetime 字段
  65. if data.get("created_at"):
  66. data["created_at"] = datetime.fromisoformat(data["created_at"])
  67. if data.get("completed_at"):
  68. data["completed_at"] = datetime.fromisoformat(data["completed_at"])
  69. return Trace(**data)
  70. async def update_trace(self, trace_id: str, **updates) -> None:
  71. """更新 Trace"""
  72. trace = await self.get_trace(trace_id)
  73. if not trace:
  74. return
  75. # 更新字段
  76. for key, value in updates.items():
  77. if hasattr(trace, key):
  78. setattr(trace, key, value)
  79. # 写回文件
  80. meta_file = self._get_meta_file(trace_id)
  81. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
  82. async def list_traces(
  83. self,
  84. mode: Optional[str] = None,
  85. agent_type: Optional[str] = None,
  86. uid: Optional[str] = None,
  87. status: Optional[str] = None,
  88. limit: int = 50
  89. ) -> List[Trace]:
  90. """列出 Traces"""
  91. traces = []
  92. # 遍历所有 trace 目录
  93. if not self.base_path.exists():
  94. return []
  95. for trace_dir in self.base_path.iterdir():
  96. if not trace_dir.is_dir():
  97. continue
  98. meta_file = trace_dir / "meta.json"
  99. if not meta_file.exists():
  100. continue
  101. try:
  102. data = json.loads(meta_file.read_text())
  103. # 过滤
  104. if mode and data.get("mode") != mode:
  105. continue
  106. if agent_type and data.get("agent_type") != agent_type:
  107. continue
  108. if uid and data.get("uid") != uid:
  109. continue
  110. if status and data.get("status") != status:
  111. continue
  112. # 解析 datetime
  113. if data.get("created_at"):
  114. data["created_at"] = datetime.fromisoformat(data["created_at"])
  115. if data.get("completed_at"):
  116. data["completed_at"] = datetime.fromisoformat(data["completed_at"])
  117. traces.append(Trace(**data))
  118. except Exception:
  119. # 跳过损坏的文件
  120. continue
  121. # 排序(最新的在前)
  122. traces.sort(key=lambda t: t.created_at, reverse=True)
  123. return traces[:limit]
  124. # ===== Step 操作 =====
  125. async def add_step(self, step: Step) -> str:
  126. """添加 Step"""
  127. trace_id = step.trace_id
  128. # 1. 写入 step 文件
  129. step_file = self._get_step_file(trace_id, step.step_id)
  130. step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
  131. # 2. 更新 trace 的统计信息
  132. trace = await self.get_trace(trace_id)
  133. if trace:
  134. trace.total_steps += 1
  135. trace.last_sequence = max(trace.last_sequence, step.sequence)
  136. # 累加 tokens 和 cost
  137. if step.tokens:
  138. trace.total_tokens += step.tokens
  139. if step.cost:
  140. trace.total_cost += step.cost
  141. if step.duration_ms:
  142. trace.total_duration_ms += step.duration_ms
  143. # 写回 meta.json
  144. meta_file = self._get_meta_file(trace_id)
  145. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
  146. # 3. 更新父节点的 UI 字段
  147. if step.parent_id:
  148. parent = await self.get_step(step.parent_id)
  149. if parent:
  150. parent.has_children = True
  151. parent.children_count += 1
  152. # 写回父节点文件
  153. parent_file = self._get_step_file(trace_id, step.parent_id)
  154. parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False))
  155. # 4. 追加 step_added 事件(包含完整 compact 视图,用于断线续传)
  156. await self.append_event(trace_id, "step_added", {
  157. "step": step.to_dict(view="compact")
  158. })
  159. return step.step_id
  160. async def get_step(self, step_id: str) -> Optional[Step]:
  161. """获取 Step(扫描所有 trace)"""
  162. for trace_dir in self.base_path.iterdir():
  163. if not trace_dir.is_dir():
  164. continue
  165. step_file = trace_dir / "steps" / f"{step_id}.json"
  166. if step_file.exists():
  167. try:
  168. data = json.loads(step_file.read_text())
  169. # 解析 datetime
  170. if data.get("created_at"):
  171. data["created_at"] = datetime.fromisoformat(data["created_at"])
  172. return Step(**data)
  173. except Exception:
  174. continue
  175. return None
  176. async def get_trace_steps(self, trace_id: str) -> List[Step]:
  177. """获取 Trace 的所有 Steps"""
  178. steps_dir = self._get_steps_dir(trace_id)
  179. if not steps_dir.exists():
  180. return []
  181. steps = []
  182. for step_file in steps_dir.glob("*.json"):
  183. try:
  184. data = json.loads(step_file.read_text())
  185. # 解析 datetime
  186. if data.get("created_at"):
  187. data["created_at"] = datetime.fromisoformat(data["created_at"])
  188. steps.append(Step(**data))
  189. except Exception:
  190. # 跳过损坏的文件
  191. continue
  192. # 按 sequence 排序
  193. steps.sort(key=lambda s: s.sequence)
  194. return steps
  195. async def get_step_children(self, step_id: str) -> List[Step]:
  196. """获取 Step 的子节点"""
  197. # 需要扫描所有 trace 的所有 steps
  198. # TODO: 可以优化为维护索引文件
  199. children = []
  200. for trace_dir in self.base_path.iterdir():
  201. if not trace_dir.is_dir():
  202. continue
  203. steps_dir = trace_dir / "steps"
  204. if not steps_dir.exists():
  205. continue
  206. for step_file in steps_dir.glob("*.json"):
  207. try:
  208. data = json.loads(step_file.read_text())
  209. if data.get("parent_id") == step_id:
  210. # 解析 datetime
  211. if data.get("created_at"):
  212. data["created_at"] = datetime.fromisoformat(data["created_at"])
  213. children.append(Step(**data))
  214. except Exception:
  215. continue
  216. # 按 sequence 排序
  217. children.sort(key=lambda s: s.sequence)
  218. return children
  219. async def update_step(self, step_id: str, **updates) -> None:
  220. """更新 Step 字段"""
  221. step = await self.get_step(step_id)
  222. if not step:
  223. return
  224. # 更新字段
  225. for key, value in updates.items():
  226. if hasattr(step, key):
  227. setattr(step, key, value)
  228. # 写回文件
  229. step_file = self._get_step_file(step.trace_id, step_id)
  230. step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
  231. # ===== 事件流操作(用于 WebSocket 断线续传)=====
  232. async def get_events(
  233. self,
  234. trace_id: str,
  235. since_event_id: int = 0
  236. ) -> List[Dict[str, Any]]:
  237. """获取事件流"""
  238. events_file = self._get_events_file(trace_id)
  239. if not events_file.exists():
  240. return []
  241. events = []
  242. with events_file.open('r') as f:
  243. for line in f:
  244. try:
  245. event = json.loads(line.strip())
  246. if event.get("event_id", 0) > since_event_id:
  247. events.append(event)
  248. except Exception:
  249. continue
  250. return events
  251. async def append_event(
  252. self,
  253. trace_id: str,
  254. event_type: str,
  255. payload: Dict[str, Any]
  256. ) -> int:
  257. """追加事件,返回 event_id"""
  258. # 获取 trace 并递增 event_id
  259. trace = await self.get_trace(trace_id)
  260. if not trace:
  261. return 0
  262. trace.last_event_id += 1
  263. event_id = trace.last_event_id
  264. # 更新 trace 的 last_event_id
  265. await self.update_trace(trace_id, last_event_id=event_id)
  266. # 创建事件
  267. event = {
  268. "event_id": event_id,
  269. "event": event_type,
  270. "ts": datetime.now().isoformat(),
  271. **payload
  272. }
  273. # 追加到 events.jsonl
  274. events_file = self._get_events_file(trace_id)
  275. with events_file.open('a') as f:
  276. f.write(json.dumps(event, ensure_ascii=False) + '\n')
  277. return event_id