""" FileSystem Trace Store - 文件系统存储实现 用于跨进程数据共享,数据持久化到 .trace/ 目录 """ import json import os from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime from agent.execution.models import Trace, Step class FileSystemTraceStore: """文件系统 Trace 存储 目录结构: .trace/ ├── trace-001/ │ ├── meta.json # Trace 元数据 │ ├── steps/ │ │ ├── step-1.json # 每个 step 独立文件 │ │ ├── step-2.json │ │ └── step-3.json │ └── events.jsonl # 事件流(WebSocket 续传) └── trace-002/ └── ... """ def __init__(self, base_path: str = ".trace"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) def _get_trace_dir(self, trace_id: str) -> Path: """获取 trace 目录""" return self.base_path / trace_id def _get_meta_file(self, trace_id: str) -> Path: """获取 meta.json 文件路径""" return self._get_trace_dir(trace_id) / "meta.json" def _get_steps_dir(self, trace_id: str) -> Path: """获取 steps 目录""" return self._get_trace_dir(trace_id) / "steps" def _get_step_file(self, trace_id: str, step_id: str) -> Path: """获取 step 文件路径""" return self._get_steps_dir(trace_id) / f"{step_id}.json" def _get_events_file(self, trace_id: str) -> Path: """获取 events.jsonl 文件路径""" return self._get_trace_dir(trace_id) / "events.jsonl" # ===== Trace 操作 ===== async def create_trace(self, trace: Trace) -> str: """创建新的 Trace""" trace_dir = self._get_trace_dir(trace.trace_id) trace_dir.mkdir(exist_ok=True) # 创建 steps 目录 steps_dir = self._get_steps_dir(trace.trace_id) steps_dir.mkdir(exist_ok=True) # 写入 meta.json meta_file = self._get_meta_file(trace.trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) # 创建空的 events.jsonl events_file = self._get_events_file(trace.trace_id) events_file.touch() return trace.trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取 Trace""" meta_file = self._get_meta_file(trace_id) if not meta_file.exists(): return None data = json.loads(meta_file.read_text()) # 解析 datetime 字段 if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) return Trace(**data) async def update_trace(self, trace_id: str, **updates) -> None: """更新 Trace""" trace = await self.get_trace(trace_id) if not trace: return # 更新字段 for key, value in updates.items(): if hasattr(trace, key): setattr(trace, key, value) # 写回文件 meta_file = self._get_meta_file(trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) async def list_traces( self, mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = 50 ) -> List[Trace]: """列出 Traces""" traces = [] # 遍历所有 trace 目录 if not self.base_path.exists(): return [] for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue meta_file = trace_dir / "meta.json" if not meta_file.exists(): continue try: data = json.loads(meta_file.read_text()) # 过滤 if mode and data.get("mode") != mode: continue if agent_type and data.get("agent_type") != agent_type: continue if uid and data.get("uid") != uid: continue if status and data.get("status") != status: continue # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) traces.append(Trace(**data)) except Exception: # 跳过损坏的文件 continue # 排序(最新的在前) traces.sort(key=lambda t: t.created_at, reverse=True) return traces[:limit] # ===== Step 操作 ===== async def add_step(self, step: Step) -> str: """添加 Step""" trace_id = step.trace_id # 1. 写入 step 文件 step_file = self._get_step_file(trace_id, step.step_id) step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False)) # 2. 更新 trace 的统计信息 trace = await self.get_trace(trace_id) if trace: trace.total_steps += 1 trace.last_sequence = max(trace.last_sequence, step.sequence) # 累加 tokens 和 cost if step.tokens: trace.total_tokens += step.tokens if step.cost: trace.total_cost += step.cost if step.duration_ms: trace.total_duration_ms += step.duration_ms # 写回 meta.json meta_file = self._get_meta_file(trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) # 3. 更新父节点的 UI 字段 if step.parent_id: parent = await self.get_step(step.parent_id) if parent: parent.has_children = True parent.children_count += 1 # 写回父节点文件 parent_file = self._get_step_file(trace_id, step.parent_id) parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False)) # 4. 追加 step_added 事件 await self.append_event(trace_id, "step_added", { "step_id": step.step_id, "step_type": step.step_type, "parent_id": step.parent_id, "sequence": step.sequence, }) return step.step_id async def get_step(self, step_id: str) -> Optional[Step]: """获取 Step(扫描所有 trace)""" for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue step_file = trace_dir / "steps" / f"{step_id}.json" if step_file.exists(): try: data = json.loads(step_file.read_text()) # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) return Step(**data) except Exception: continue return None async def get_trace_steps(self, trace_id: str) -> List[Step]: """获取 Trace 的所有 Steps""" steps_dir = self._get_steps_dir(trace_id) if not steps_dir.exists(): return [] steps = [] for step_file in steps_dir.glob("*.json"): try: data = json.loads(step_file.read_text()) # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) steps.append(Step(**data)) except Exception: # 跳过损坏的文件 continue # 按 sequence 排序 steps.sort(key=lambda s: s.sequence) return steps async def get_step_children(self, step_id: str) -> List[Step]: """获取 Step 的子节点""" # 需要扫描所有 trace 的所有 steps # TODO: 可以优化为维护索引文件 children = [] for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue steps_dir = trace_dir / "steps" if not steps_dir.exists(): continue for step_file in steps_dir.glob("*.json"): try: data = json.loads(step_file.read_text()) if data.get("parent_id") == step_id: # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) children.append(Step(**data)) except Exception: continue # 按 sequence 排序 children.sort(key=lambda s: s.sequence) return children async def update_step(self, step_id: str, **updates) -> None: """更新 Step 字段""" step = await self.get_step(step_id) if not step: return # 更新字段 for key, value in updates.items(): if hasattr(step, key): setattr(step, key, value) # 写回文件 step_file = self._get_step_file(step.trace_id, step_id) step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False)) # ===== 事件流操作(用于 WebSocket 断线续传)===== async def get_events( self, trace_id: str, since_event_id: int = 0 ) -> List[Dict[str, Any]]: """获取事件流""" events_file = self._get_events_file(trace_id) if not events_file.exists(): return [] events = [] with events_file.open('r') as f: for line in f: try: event = json.loads(line.strip()) if event.get("event_id", 0) > since_event_id: events.append(event) except Exception: continue return events async def append_event( self, trace_id: str, event_type: str, payload: Dict[str, Any] ) -> int: """追加事件,返回 event_id""" # 获取 trace 并递增 event_id trace = await self.get_trace(trace_id) if not trace: return 0 trace.last_event_id += 1 event_id = trace.last_event_id # 更新 trace 的 last_event_id await self.update_trace(trace_id, last_event_id=event_id) # 创建事件 event = { "event_id": event_id, "event": event_type, "ts": datetime.now().isoformat(), **payload } # 追加到 events.jsonl events_file = self._get_events_file(trace_id) with events_file.open('a') as f: f.write(json.dumps(event, ensure_ascii=False) + '\n') return event_id