""" FileSystem Trace Store - 文件系统存储实现 用于跨进程数据共享,数据持久化到 .trace/ 目录 目录结构: .trace/{trace_id}/ ├── meta.json # Trace 元数据 ├── goal.json # 主线 GoalTree(扁平 JSON,通过 parent_id 构建层级) ├── messages/ # 主线 Messages(每条独立文件) │ ├── {message_id}.json │ └── ... ├── branches/ # 分支数据(独立存储) │ ├── A/ │ │ ├── meta.json # BranchContext 元数据 │ │ ├── goal.json # 分支 A 的 GoalTree │ │ └── messages/ # 分支 A 的 Messages │ └── B/ │ └── ... └── events.jsonl # 事件流(WebSocket 续传) """ import json import os from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime from agent.execution.models import Trace, Message from agent.goal.models import GoalTree, Goal, BranchContext, GoalStats class FileSystemTraceStore: """文件系统 Trace 存储""" def __init__(self, base_path: str = ".trace"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) def _get_trace_dir(self, trace_id: str) -> Path: """获取 trace 目录""" return self.base_path / trace_id def _get_meta_file(self, trace_id: str) -> Path: """获取 meta.json 文件路径""" return self._get_trace_dir(trace_id) / "meta.json" def _get_goal_file(self, trace_id: str) -> Path: """获取 goal.json 文件路径""" return self._get_trace_dir(trace_id) / "goal.json" def _get_messages_dir(self, trace_id: str) -> Path: """获取 messages 目录""" return self._get_trace_dir(trace_id) / "messages" def _get_message_file(self, trace_id: str, message_id: str) -> Path: """获取 message 文件路径""" return self._get_messages_dir(trace_id) / f"{message_id}.json" def _get_branches_dir(self, trace_id: str) -> Path: """获取 branches 目录""" return self._get_trace_dir(trace_id) / "branches" def _get_branch_dir(self, trace_id: str, branch_id: str) -> Path: """获取分支目录""" return self._get_branches_dir(trace_id) / branch_id def _get_branch_meta_file(self, trace_id: str, branch_id: str) -> Path: """获取分支 meta.json 文件路径""" return self._get_branch_dir(trace_id, branch_id) / "meta.json" def _get_branch_goal_file(self, trace_id: str, branch_id: str) -> Path: """获取分支 goal.json 文件路径""" return self._get_branch_dir(trace_id, branch_id) / "goal.json" def _get_branch_messages_dir(self, trace_id: str, branch_id: str) -> Path: """获取分支 messages 目录""" return self._get_branch_dir(trace_id, branch_id) / "messages" def _get_events_file(self, trace_id: str) -> Path: """获取 events.jsonl 文件路径""" return self._get_trace_dir(trace_id) / "events.jsonl" # ===== Trace 操作 ===== async def create_trace(self, trace: Trace) -> str: """创建新的 Trace""" trace_dir = self._get_trace_dir(trace.trace_id) trace_dir.mkdir(exist_ok=True) # 创建 messages 目录 messages_dir = self._get_messages_dir(trace.trace_id) messages_dir.mkdir(exist_ok=True) # 创建 branches 目录 branches_dir = self._get_branches_dir(trace.trace_id) branches_dir.mkdir(exist_ok=True) # 写入 meta.json meta_file = self._get_meta_file(trace.trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) # 创建空的 events.jsonl events_file = self._get_events_file(trace.trace_id) events_file.touch() return trace.trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取 Trace""" meta_file = self._get_meta_file(trace_id) if not meta_file.exists(): return None data = json.loads(meta_file.read_text()) # 解析 datetime 字段 if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) return Trace(**data) async def update_trace(self, trace_id: str, **updates) -> None: """更新 Trace""" trace = await self.get_trace(trace_id) if not trace: return # 更新字段 for key, value in updates.items(): if hasattr(trace, key): setattr(trace, key, value) # 写回文件 meta_file = self._get_meta_file(trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) async def list_traces( self, mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = 50 ) -> List[Trace]: """列出 Traces""" traces = [] if not self.base_path.exists(): return [] for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue meta_file = trace_dir / "meta.json" if not meta_file.exists(): continue try: data = json.loads(meta_file.read_text()) # 过滤 if mode and data.get("mode") != mode: continue if agent_type and data.get("agent_type") != agent_type: continue if uid and data.get("uid") != uid: continue if status and data.get("status") != status: continue # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) traces.append(Trace(**data)) except Exception: continue # 排序(最新的在前) traces.sort(key=lambda t: t.created_at, reverse=True) return traces[:limit] # ===== GoalTree 操作 ===== async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: """获取 GoalTree""" goal_file = self._get_goal_file(trace_id) if not goal_file.exists(): return None try: data = json.loads(goal_file.read_text()) return GoalTree.from_dict(data) except Exception: return None async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: """更新完整 GoalTree""" goal_file = self._get_goal_file(trace_id) goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False)) async def add_goal(self, trace_id: str, goal: Goal) -> None: """添加 Goal 到 GoalTree""" tree = await self.get_goal_tree(trace_id) if not tree: return tree.goals.append(goal) await self.update_goal_tree(trace_id, tree) async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: """更新 Goal 字段""" tree = await self.get_goal_tree(trace_id) if not tree: return goal = tree.find(goal_id) if not goal: return # 更新字段 for key, value in updates.items(): if hasattr(goal, key): # 特殊处理 stats 字段(可能是 dict) if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict): value = GoalStats.from_dict(value) setattr(goal, key, value) await self.update_goal_tree(trace_id, tree) # ===== Branch 操作 ===== async def create_branch(self, trace_id: str, branch: BranchContext) -> None: """创建分支上下文""" branch_dir = self._get_branch_dir(trace_id, branch.id) branch_dir.mkdir(parents=True, exist_ok=True) # 创建分支 messages 目录 messages_dir = self._get_branch_messages_dir(trace_id, branch.id) messages_dir.mkdir(exist_ok=True) # 写入 meta.json meta_file = self._get_branch_meta_file(trace_id, branch.id) meta_file.write_text(json.dumps(branch.to_dict(), indent=2, ensure_ascii=False)) async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]: """获取分支元数据""" meta_file = self._get_branch_meta_file(trace_id, branch_id) if not meta_file.exists(): return None try: data = json.loads(meta_file.read_text()) return BranchContext.from_dict(data) except Exception: return None async def get_branch_goal_tree(self, trace_id: str, branch_id: str) -> Optional[GoalTree]: """获取分支的 GoalTree""" goal_file = self._get_branch_goal_file(trace_id, branch_id) if not goal_file.exists(): return None try: data = json.loads(goal_file.read_text()) return GoalTree.from_dict(data) except Exception: return None async def update_branch_goal_tree(self, trace_id: str, branch_id: str, tree: GoalTree) -> None: """更新分支的 GoalTree""" goal_file = self._get_branch_goal_file(trace_id, branch_id) goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False)) async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None: """更新分支元数据""" branch = await self.get_branch(trace_id, branch_id) if not branch: return # 更新字段 for key, value in updates.items(): if hasattr(branch, key): # 特殊处理 stats 字段 if key == "cumulative_stats" and isinstance(value, dict): value = GoalStats.from_dict(value) setattr(branch, key, value) # 写回文件 meta_file = self._get_branch_meta_file(trace_id, branch_id) meta_file.write_text(json.dumps(branch.to_dict(), indent=2, ensure_ascii=False)) async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]: """列出所有分支元数据""" branches_dir = self._get_branches_dir(trace_id) if not branches_dir.exists(): return {} branches = {} for branch_dir in branches_dir.iterdir(): if not branch_dir.is_dir(): continue meta_file = branch_dir / "meta.json" if not meta_file.exists(): continue try: data = json.loads(meta_file.read_text()) branch = BranchContext.from_dict(data) branches[branch.id] = branch except Exception: continue return branches # ===== Message 操作 ===== async def add_message(self, message: Message) -> str: """ 添加 Message 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats) """ trace_id = message.trace_id branch_id = message.branch_id # 1. 写入 message 文件 if branch_id: # 分支消息 messages_dir = self._get_branch_messages_dir(trace_id, branch_id) else: # 主线消息 messages_dir = self._get_messages_dir(trace_id) message_file = messages_dir / f"{message.message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False)) # 2. 更新 trace 统计 trace = await self.get_trace(trace_id) if trace: trace.total_messages += 1 trace.last_sequence = max(trace.last_sequence, message.sequence) if message.tokens: trace.total_tokens += message.tokens if message.cost: trace.total_cost += message.cost if message.duration_ms: trace.total_duration_ms += message.duration_ms # 更新 Trace(不要传递 trace_id,它已经在方法参数中) await self.update_trace( trace_id, total_messages=trace.total_messages, last_sequence=trace.last_sequence, total_tokens=trace.total_tokens, total_cost=trace.total_cost, total_duration_ms=trace.total_duration_ms ) # 3. 更新 Goal stats await self._update_goal_stats(trace_id, message) # 4. 追加 message_added 事件 affected_goals = await self._get_affected_goals(trace_id, message) await self.append_event(trace_id, "message_added", { "message": message.to_dict(), "affected_goals": affected_goals }) return message.message_id async def _update_goal_stats(self, trace_id: str, message: Message) -> None: """更新 Goal 的 self_stats 和祖先的 cumulative_stats""" # 确定使用主线还是分支的 GoalTree if message.branch_id: tree = await self.get_branch_goal_tree(trace_id, message.branch_id) else: tree = await self.get_goal_tree(trace_id) if not tree: return # 找到关联的 Goal goal = tree.find(message.goal_id) if not goal: return # 更新自身 self_stats goal.self_stats.message_count += 1 if message.tokens: goal.self_stats.total_tokens += message.tokens if message.cost: goal.self_stats.total_cost += message.cost # TODO: 更新 preview(工具调用摘要) # 更新自身 cumulative_stats goal.cumulative_stats.message_count += 1 if message.tokens: goal.cumulative_stats.total_tokens += message.tokens if message.cost: goal.cumulative_stats.total_cost += message.cost # 沿祖先链向上更新 cumulative_stats current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break parent.cumulative_stats.message_count += 1 if message.tokens: parent.cumulative_stats.total_tokens += message.tokens if message.cost: parent.cumulative_stats.total_cost += message.cost current_goal = parent # 保存更新后的 tree if message.branch_id: await self.update_branch_goal_tree(trace_id, message.branch_id, tree) else: await self.update_goal_tree(trace_id, tree) async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]: """获取受影响的 Goals(自身 + 所有祖先)""" if message.branch_id: tree = await self.get_branch_goal_tree(trace_id, message.branch_id) else: tree = await self.get_goal_tree(trace_id) if not tree: return [] goal = tree.find(message.goal_id) if not goal: return [] affected = [] # 添加自身(包含 self_stats 和 cumulative_stats) affected.append({ "goal_id": goal.id, "self_stats": goal.self_stats.to_dict(), "cumulative_stats": goal.cumulative_stats.to_dict() }) # 添加所有祖先(仅 cumulative_stats) current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break affected.append({ "goal_id": parent.id, "cumulative_stats": parent.cumulative_stats.to_dict() }) current_goal = parent return affected async def get_message(self, message_id: str) -> Optional[Message]: """获取 Message(扫描所有 trace)""" for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue # 检查主线 messages message_file = trace_dir / "messages" / f"{message_id}.json" if message_file.exists(): try: data = json.loads(message_file.read_text()) if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) return Message(**data) except Exception: pass # 检查分支 messages branches_dir = trace_dir / "branches" if branches_dir.exists(): for branch_dir in branches_dir.iterdir(): if not branch_dir.is_dir(): continue message_file = branch_dir / "messages" / f"{message_id}.json" if message_file.exists(): try: data = json.loads(message_file.read_text()) if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) return Message(**data) except Exception: pass return None async def get_trace_messages( self, trace_id: str, branch_id: Optional[str] = None ) -> List[Message]: """获取 Trace 的所有 Messages""" if branch_id: messages_dir = self._get_branch_messages_dir(trace_id, branch_id) else: messages_dir = self._get_messages_dir(trace_id) if not messages_dir.exists(): return [] messages = [] for message_file in messages_dir.glob("*.json"): try: data = json.loads(message_file.read_text()) if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) messages.append(Message(**data)) except Exception: continue # 按 sequence 排序 messages.sort(key=lambda m: m.sequence) return messages async def get_messages_by_goal( self, trace_id: str, goal_id: str, branch_id: Optional[str] = None ) -> List[Message]: """获取指定 Goal 关联的所有 Messages""" all_messages = await self.get_trace_messages(trace_id, branch_id) return [m for m in all_messages if m.goal_id == goal_id] async def update_message(self, message_id: str, **updates) -> None: """更新 Message 字段""" message = await self.get_message(message_id) if not message: return # 更新字段 for key, value in updates.items(): if hasattr(message, key): setattr(message, key, value) # 确定文件路径 if message.branch_id: messages_dir = self._get_branch_messages_dir(message.trace_id, message.branch_id) else: messages_dir = self._get_messages_dir(message.trace_id) message_file = messages_dir / f"{message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False)) # ===== 事件流操作(用于 WebSocket 断线续传)===== async def get_events( self, trace_id: str, since_event_id: int = 0 ) -> List[Dict[str, Any]]: """获取事件流""" events_file = self._get_events_file(trace_id) if not events_file.exists(): return [] events = [] with events_file.open('r') as f: for line in f: try: event = json.loads(line.strip()) if event.get("event_id", 0) > since_event_id: events.append(event) except Exception: continue return events async def append_event( self, trace_id: str, event_type: str, payload: Dict[str, Any] ) -> int: """追加事件,返回 event_id""" # 获取 trace 并递增 event_id trace = await self.get_trace(trace_id) if not trace: return 0 trace.last_event_id += 1 event_id = trace.last_event_id # 更新 trace 的 last_event_id await self.update_trace(trace_id, last_event_id=event_id) # 创建事件 event = { "event_id": event_id, "event": event_type, "ts": datetime.now().isoformat(), **payload } # 追加到 events.jsonl events_file = self._get_events_file(trace_id) with events_file.open('a') as f: f.write(json.dumps(event, ensure_ascii=False) + '\n') return event_id