fs_store.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. """
  2. FileSystem Trace Store - 文件系统存储实现
  3. 用于跨进程数据共享,数据持久化到 .trace/ 目录
  4. """
  5. import json
  6. import os
  7. from pathlib import Path
  8. from typing import Dict, List, Optional, Any
  9. from datetime import datetime
  10. from agent.execution.models import Trace, Step
  11. class FileSystemTraceStore:
  12. """文件系统 Trace 存储
  13. 目录结构:
  14. .trace/
  15. ├── trace-001/
  16. │ ├── meta.json # Trace 元数据
  17. │ ├── steps/
  18. │ │ ├── step-1.json # 每个 step 独立文件
  19. │ │ ├── step-2.json
  20. │ │ └── step-3.json
  21. │ └── events.jsonl # 事件流(WebSocket 续传)
  22. └── trace-002/
  23. └── ...
  24. """
  25. def __init__(self, base_path: str = ".trace"):
  26. self.base_path = Path(base_path)
  27. self.base_path.mkdir(exist_ok=True)
  28. def _get_trace_dir(self, trace_id: str) -> Path:
  29. """获取 trace 目录"""
  30. return self.base_path / trace_id
  31. def _get_meta_file(self, trace_id: str) -> Path:
  32. """获取 meta.json 文件路径"""
  33. return self._get_trace_dir(trace_id) / "meta.json"
  34. def _get_steps_dir(self, trace_id: str) -> Path:
  35. """获取 steps 目录"""
  36. return self._get_trace_dir(trace_id) / "steps"
  37. def _get_step_file(self, trace_id: str, step_id: str) -> Path:
  38. """获取 step 文件路径"""
  39. return self._get_steps_dir(trace_id) / f"{step_id}.json"
  40. def _get_events_file(self, trace_id: str) -> Path:
  41. """获取 events.jsonl 文件路径"""
  42. return self._get_trace_dir(trace_id) / "events.jsonl"
  43. # ===== Trace 操作 =====
  44. async def create_trace(self, trace: Trace) -> str:
  45. """创建新的 Trace"""
  46. trace_dir = self._get_trace_dir(trace.trace_id)
  47. trace_dir.mkdir(exist_ok=True)
  48. # 创建 steps 目录
  49. steps_dir = self._get_steps_dir(trace.trace_id)
  50. steps_dir.mkdir(exist_ok=True)
  51. # 写入 meta.json
  52. meta_file = self._get_meta_file(trace.trace_id)
  53. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
  54. # 创建空的 events.jsonl
  55. events_file = self._get_events_file(trace.trace_id)
  56. events_file.touch()
  57. return trace.trace_id
  58. async def get_trace(self, trace_id: str) -> Optional[Trace]:
  59. """获取 Trace"""
  60. meta_file = self._get_meta_file(trace_id)
  61. if not meta_file.exists():
  62. return None
  63. data = json.loads(meta_file.read_text())
  64. # 解析 datetime 字段
  65. if data.get("created_at"):
  66. data["created_at"] = datetime.fromisoformat(data["created_at"])
  67. if data.get("completed_at"):
  68. data["completed_at"] = datetime.fromisoformat(data["completed_at"])
  69. return Trace(**data)
  70. async def update_trace(self, trace_id: str, **updates) -> None:
  71. """更新 Trace"""
  72. trace = await self.get_trace(trace_id)
  73. if not trace:
  74. return
  75. # 更新字段
  76. for key, value in updates.items():
  77. if hasattr(trace, key):
  78. setattr(trace, key, value)
  79. # 写回文件
  80. meta_file = self._get_meta_file(trace_id)
  81. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
  82. async def list_traces(
  83. self,
  84. mode: Optional[str] = None,
  85. agent_type: Optional[str] = None,
  86. uid: Optional[str] = None,
  87. status: Optional[str] = None,
  88. limit: int = 50
  89. ) -> List[Trace]:
  90. """列出 Traces"""
  91. traces = []
  92. # 遍历所有 trace 目录
  93. if not self.base_path.exists():
  94. return []
  95. for trace_dir in self.base_path.iterdir():
  96. if not trace_dir.is_dir():
  97. continue
  98. meta_file = trace_dir / "meta.json"
  99. if not meta_file.exists():
  100. continue
  101. try:
  102. data = json.loads(meta_file.read_text())
  103. # 过滤
  104. if mode and data.get("mode") != mode:
  105. continue
  106. if agent_type and data.get("agent_type") != agent_type:
  107. continue
  108. if uid and data.get("uid") != uid:
  109. continue
  110. if status and data.get("status") != status:
  111. continue
  112. # 解析 datetime
  113. if data.get("created_at"):
  114. data["created_at"] = datetime.fromisoformat(data["created_at"])
  115. if data.get("completed_at"):
  116. data["completed_at"] = datetime.fromisoformat(data["completed_at"])
  117. traces.append(Trace(**data))
  118. except Exception:
  119. # 跳过损坏的文件
  120. continue
  121. # 排序(最新的在前)
  122. traces.sort(key=lambda t: t.created_at, reverse=True)
  123. return traces[:limit]
  124. # ===== Step 操作 =====
  125. async def add_step(self, step: Step) -> str:
  126. """添加 Step"""
  127. trace_id = step.trace_id
  128. # 1. 写入 step 文件
  129. step_file = self._get_step_file(trace_id, step.step_id)
  130. step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
  131. # 2. 更新 trace 的统计信息
  132. trace = await self.get_trace(trace_id)
  133. if trace:
  134. trace.total_steps += 1
  135. trace.last_sequence = max(trace.last_sequence, step.sequence)
  136. # 累加 tokens 和 cost
  137. if step.tokens:
  138. trace.total_tokens += step.tokens
  139. if step.cost:
  140. trace.total_cost += step.cost
  141. if step.duration_ms:
  142. trace.total_duration_ms += step.duration_ms
  143. # 写回 meta.json
  144. meta_file = self._get_meta_file(trace_id)
  145. meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
  146. # 3. 更新父节点的 UI 字段
  147. if step.parent_id:
  148. parent = await self.get_step(step.parent_id)
  149. if parent:
  150. parent.has_children = True
  151. parent.children_count += 1
  152. # 写回父节点文件
  153. parent_file = self._get_step_file(trace_id, step.parent_id)
  154. parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False))
  155. # 4. 追加 step_added 事件
  156. await self.append_event(trace_id, "step_added", {
  157. "step_id": step.step_id,
  158. "step_type": step.step_type,
  159. "parent_id": step.parent_id,
  160. "sequence": step.sequence,
  161. })
  162. return step.step_id
  163. async def get_step(self, step_id: str) -> Optional[Step]:
  164. """获取 Step(扫描所有 trace)"""
  165. for trace_dir in self.base_path.iterdir():
  166. if not trace_dir.is_dir():
  167. continue
  168. step_file = trace_dir / "steps" / f"{step_id}.json"
  169. if step_file.exists():
  170. try:
  171. data = json.loads(step_file.read_text())
  172. # 解析 datetime
  173. if data.get("created_at"):
  174. data["created_at"] = datetime.fromisoformat(data["created_at"])
  175. return Step(**data)
  176. except Exception:
  177. continue
  178. return None
  179. async def get_trace_steps(self, trace_id: str) -> List[Step]:
  180. """获取 Trace 的所有 Steps"""
  181. steps_dir = self._get_steps_dir(trace_id)
  182. if not steps_dir.exists():
  183. return []
  184. steps = []
  185. for step_file in steps_dir.glob("*.json"):
  186. try:
  187. data = json.loads(step_file.read_text())
  188. # 解析 datetime
  189. if data.get("created_at"):
  190. data["created_at"] = datetime.fromisoformat(data["created_at"])
  191. steps.append(Step(**data))
  192. except Exception:
  193. # 跳过损坏的文件
  194. continue
  195. # 按 sequence 排序
  196. steps.sort(key=lambda s: s.sequence)
  197. return steps
  198. async def get_step_children(self, step_id: str) -> List[Step]:
  199. """获取 Step 的子节点"""
  200. # 需要扫描所有 trace 的所有 steps
  201. # TODO: 可以优化为维护索引文件
  202. children = []
  203. for trace_dir in self.base_path.iterdir():
  204. if not trace_dir.is_dir():
  205. continue
  206. steps_dir = trace_dir / "steps"
  207. if not steps_dir.exists():
  208. continue
  209. for step_file in steps_dir.glob("*.json"):
  210. try:
  211. data = json.loads(step_file.read_text())
  212. if data.get("parent_id") == step_id:
  213. # 解析 datetime
  214. if data.get("created_at"):
  215. data["created_at"] = datetime.fromisoformat(data["created_at"])
  216. children.append(Step(**data))
  217. except Exception:
  218. continue
  219. # 按 sequence 排序
  220. children.sort(key=lambda s: s.sequence)
  221. return children
  222. async def update_step(self, step_id: str, **updates) -> None:
  223. """更新 Step 字段"""
  224. step = await self.get_step(step_id)
  225. if not step:
  226. return
  227. # 更新字段
  228. for key, value in updates.items():
  229. if hasattr(step, key):
  230. setattr(step, key, value)
  231. # 写回文件
  232. step_file = self._get_step_file(step.trace_id, step_id)
  233. step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
  234. # ===== 事件流操作(用于 WebSocket 断线续传)=====
  235. async def get_events(
  236. self,
  237. trace_id: str,
  238. since_event_id: int = 0
  239. ) -> List[Dict[str, Any]]:
  240. """获取事件流"""
  241. events_file = self._get_events_file(trace_id)
  242. if not events_file.exists():
  243. return []
  244. events = []
  245. with events_file.open('r') as f:
  246. for line in f:
  247. try:
  248. event = json.loads(line.strip())
  249. if event.get("event_id", 0) > since_event_id:
  250. events.append(event)
  251. except Exception:
  252. continue
  253. return events
  254. async def append_event(
  255. self,
  256. trace_id: str,
  257. event_type: str,
  258. payload: Dict[str, Any]
  259. ) -> int:
  260. """追加事件,返回 event_id"""
  261. # 获取 trace 并递增 event_id
  262. trace = await self.get_trace(trace_id)
  263. if not trace:
  264. return 0
  265. trace.last_event_id += 1
  266. event_id = trace.last_event_id
  267. # 更新 trace 的 last_event_id
  268. await self.update_trace(trace_id, last_event_id=event_id)
  269. # 创建事件
  270. event = {
  271. "event_id": event_id,
  272. "event": event_type,
  273. "ts": datetime.now().isoformat(),
  274. **payload
  275. }
  276. # 追加到 events.jsonl
  277. events_file = self._get_events_file(trace_id)
  278. with events_file.open('a') as f:
  279. f.write(json.dumps(event, ensure_ascii=False) + '\n')
  280. return event_id