""" FileSystem Trace Store - 文件系统存储实现 用于跨进程数据共享,数据持久化到 .trace/ 目录 目录结构: .trace/{trace_id}/ ├── meta.json # Trace 元数据 ├── goal.json # GoalTree(扁平 JSON,通过 parent_id 构建层级) ├── messages/ # Messages(每条独立文件) │ ├── {message_id}.json │ └── ... └── events.jsonl # 事件流(WebSocket 续传) Sub-Trace 是完全独立的 Trace,有自己的目录: .trace/{parent_id}@{mode}-{timestamp}-{seq}/ ├── meta.json # parent_trace_id 指向父 Trace ├── goal.json ├── messages/ └── events.jsonl """ import json import os from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime from .models import Trace, Message from .goal_models import GoalTree, Goal, GoalStats class FileSystemTraceStore: """文件系统 Trace 存储""" def __init__(self, base_path: str = ".trace"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) def _get_trace_dir(self, trace_id: str) -> Path: """获取 trace 目录""" return self.base_path / trace_id def _get_meta_file(self, trace_id: str) -> Path: """获取 meta.json 文件路径""" return self._get_trace_dir(trace_id) / "meta.json" def _get_goal_file(self, trace_id: str) -> Path: """获取 goal.json 文件路径""" return self._get_trace_dir(trace_id) / "goal.json" def _get_messages_dir(self, trace_id: str) -> Path: """获取 messages 目录""" return self._get_trace_dir(trace_id) / "messages" def _get_message_file(self, trace_id: str, message_id: str) -> Path: """获取 message 文件路径""" return self._get_messages_dir(trace_id) / f"{message_id}.json" def _get_events_file(self, trace_id: str) -> Path: """获取 events.jsonl 文件路径""" return self._get_trace_dir(trace_id) / "events.jsonl" # ===== Trace 操作 ===== async def create_trace(self, trace: Trace) -> str: """创建新的 Trace""" trace_dir = self._get_trace_dir(trace.trace_id) trace_dir.mkdir(exist_ok=True) # 创建 messages 目录 messages_dir = self._get_messages_dir(trace.trace_id) messages_dir.mkdir(exist_ok=True) # 写入 meta.json meta_file = self._get_meta_file(trace.trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) # 创建空的 events.jsonl events_file = self._get_events_file(trace.trace_id) events_file.touch() return trace.trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取 Trace""" meta_file = self._get_meta_file(trace_id) if not meta_file.exists(): return None data = json.loads(meta_file.read_text()) # 解析 datetime 字段 if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) return Trace(**data) async def update_trace(self, trace_id: str, **updates) -> None: """更新 Trace""" trace = await self.get_trace(trace_id) if not trace: return # 更新字段 for key, value in updates.items(): if hasattr(trace, key): setattr(trace, key, value) # 写回文件 meta_file = self._get_meta_file(trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) async def list_traces( self, mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = 50 ) -> List[Trace]: """列出 Traces""" traces = [] if not self.base_path.exists(): return [] for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue meta_file = trace_dir / "meta.json" if not meta_file.exists(): continue try: data = json.loads(meta_file.read_text()) # 过滤 if mode and data.get("mode") != mode: continue if agent_type and data.get("agent_type") != agent_type: continue if uid and data.get("uid") != uid: continue if status and data.get("status") != status: continue # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) traces.append(Trace(**data)) except Exception: continue # 排序(最新的在前) traces.sort(key=lambda t: t.created_at, reverse=True) return traces[:limit] # ===== GoalTree 操作 ===== async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: """获取 GoalTree""" goal_file = self._get_goal_file(trace_id) if not goal_file.exists(): return None try: data = json.loads(goal_file.read_text()) return GoalTree.from_dict(data) except Exception: return None async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: """更新完整 GoalTree""" goal_file = self._get_goal_file(trace_id) goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False)) async def add_goal(self, trace_id: str, goal: Goal) -> None: """添加 Goal 到 GoalTree""" tree = await self.get_goal_tree(trace_id) if not tree: return tree.goals.append(goal) await self.update_goal_tree(trace_id, tree) # 推送 goal_added 事件 event_data = { "goal": goal.to_dict(), "parent_id": goal.parent_id } await self.append_event(trace_id, "goal_added", event_data) print(f"[DEBUG] Pushed goal_added event: goal_id={goal.id}, parent_id={goal.parent_id}") async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: """更新 Goal 字段""" tree = await self.get_goal_tree(trace_id) if not tree: return goal = tree.find(goal_id) if not goal: return # 更新字段 for key, value in updates.items(): if hasattr(goal, key): # 特殊处理 stats 字段(可能是 dict) if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict): value = GoalStats.from_dict(value) setattr(goal, key, value) await self.update_goal_tree(trace_id, tree) # 推送 goal_updated 事件 # 如果状态变为 completed,检查是否需要级联完成父 Goal affected_goals = [{"goal_id": goal_id, "updates": updates}] if updates.get("status") == "completed": # 检查级联完成:如果所有兄弟 Goal 都完成,父 Goal 也完成 cascade_completed = await self._check_cascade_completion(trace_id, goal) affected_goals.extend(cascade_completed) await self.append_event(trace_id, "goal_updated", { "goal_id": goal_id, "updates": updates, "affected_goals": affected_goals }) print(f"[DEBUG] Pushed goal_updated event: goal_id={goal_id}, updates={updates}, affected={len(affected_goals)}") async def _check_cascade_completion( self, trace_id: str, completed_goal: Goal ) -> List[Dict[str, Any]]: """ 检查级联完成:如果一个 Goal 的所有子 Goal 都完成,则自动完成父 Goal Args: trace_id: Trace ID completed_goal: 刚完成的 Goal Returns: 受影响的父 Goals 列表(自动完成的) """ if not completed_goal.parent_id: return [] tree = await self.get_goal_tree(trace_id) if not tree: return [] affected = [] parent = tree.find(completed_goal.parent_id) if not parent: return [] # 获取父 Goal 的所有子 Goal children = tree.get_children(parent.id) # 检查是否所有子 Goal 都已完成(排除 abandoned) all_completed = all( child.status in ["completed", "abandoned"] for child in children ) if all_completed and parent.status != "completed": # 自动完成父 Goal parent.status = "completed" if not parent.summary: # 生成自动摘要 completed_count = sum(1 for c in children if c.status == "completed") parent.summary = f"所有子目标已完成 ({completed_count}/{len(children)})" await self.update_goal_tree(trace_id, tree) affected.append({ "goal_id": parent.id, "status": "completed", "summary": parent.summary, "cumulative_stats": parent.cumulative_stats.to_dict() }) # 递归检查祖父 Goal grandparent_affected = await self._check_cascade_completion(trace_id, parent) affected.extend(grandparent_affected) return affected # ===== Message 操作 ===== async def add_message(self, message: Message) -> str: """ 添加 Message 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats) """ trace_id = message.trace_id # 1. 写入 message 文件 messages_dir = self._get_messages_dir(trace_id) message_file = messages_dir / f"{message.message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8") # 2. 更新 trace 统计 trace = await self.get_trace(trace_id) if trace: trace.total_messages += 1 trace.last_sequence = max(trace.last_sequence, message.sequence) # 累计 tokens(完整版) if message.prompt_tokens: trace.total_prompt_tokens += message.prompt_tokens if message.completion_tokens: trace.total_completion_tokens += message.completion_tokens if message.reasoning_tokens: trace.total_reasoning_tokens += message.reasoning_tokens if message.cache_creation_tokens: trace.total_cache_creation_tokens += message.cache_creation_tokens if message.cache_read_tokens: trace.total_cache_read_tokens += message.cache_read_tokens # 向后兼容:也更新 total_tokens if message.tokens: trace.total_tokens += message.tokens elif message.prompt_tokens or message.completion_tokens: trace.total_tokens += (message.prompt_tokens or 0) + (message.completion_tokens or 0) if message.cost: trace.total_cost += message.cost if message.duration_ms: trace.total_duration_ms += message.duration_ms # 更新 Trace await self.update_trace( trace_id, total_messages=trace.total_messages, last_sequence=trace.last_sequence, total_tokens=trace.total_tokens, total_prompt_tokens=trace.total_prompt_tokens, total_completion_tokens=trace.total_completion_tokens, total_reasoning_tokens=trace.total_reasoning_tokens, total_cache_creation_tokens=trace.total_cache_creation_tokens, total_cache_read_tokens=trace.total_cache_read_tokens, total_cost=trace.total_cost, total_duration_ms=trace.total_duration_ms ) # 3. 更新 Goal stats await self._update_goal_stats(trace_id, message) # 4. 追加 message_added 事件 affected_goals = await self._get_affected_goals(trace_id, message) await self.append_event(trace_id, "message_added", { "message": message.to_dict(), "affected_goals": affected_goals }) return message.message_id async def _update_goal_stats(self, trace_id: str, message: Message) -> None: """更新 Goal 的 self_stats 和祖先的 cumulative_stats""" tree = await self.get_goal_tree(trace_id) if not tree: return # 找到关联的 Goal goal = tree.find(message.goal_id) if not goal: return # 更新自身 self_stats goal.self_stats.message_count += 1 if message.tokens: goal.self_stats.total_tokens += message.tokens if message.cost: goal.self_stats.total_cost += message.cost # TODO: 更新 preview(工具调用摘要) # 更新自身 cumulative_stats goal.cumulative_stats.message_count += 1 if message.tokens: goal.cumulative_stats.total_tokens += message.tokens if message.cost: goal.cumulative_stats.total_cost += message.cost # 沿祖先链向上更新 cumulative_stats current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break parent.cumulative_stats.message_count += 1 if message.tokens: parent.cumulative_stats.total_tokens += message.tokens if message.cost: parent.cumulative_stats.total_cost += message.cost current_goal = parent # 保存更新后的 tree await self.update_goal_tree(trace_id, tree) async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]: """获取受影响的 Goals(自身 + 所有祖先)""" tree = await self.get_goal_tree(trace_id) if not tree: return [] goal = tree.find(message.goal_id) if not goal: return [] affected = [] # 添加自身(包含 self_stats 和 cumulative_stats) affected.append({ "goal_id": goal.id, "self_stats": goal.self_stats.to_dict(), "cumulative_stats": goal.cumulative_stats.to_dict() }) # 添加所有祖先(仅 cumulative_stats) current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break affected.append({ "goal_id": parent.id, "cumulative_stats": parent.cumulative_stats.to_dict() }) current_goal = parent return affected return affected async def get_message(self, message_id: str) -> Optional[Message]: """获取 Message(扫描所有 trace)""" for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue # 检查 messages 目录 message_file = trace_dir / "messages" / f"{message_id}.json" if message_file.exists(): try: data = json.loads(message_file.read_text()) return Message.from_dict(data) except Exception: pass return None async def get_trace_messages( self, trace_id: str ) -> List[Message]: """获取 Trace 的所有 Messages""" messages_dir = self._get_messages_dir(trace_id) if not messages_dir.exists(): return [] messages = [] for message_file in messages_dir.glob("*.json"): try: data = json.loads(message_file.read_text()) messages.append(Message.from_dict(data)) except Exception: continue # 按 sequence 排序 messages.sort(key=lambda m: m.sequence) return messages async def get_messages_by_goal( self, trace_id: str, goal_id: str ) -> List[Message]: """获取指定 Goal 关联的所有 Messages""" all_messages = await self.get_trace_messages(trace_id) return [m for m in all_messages if m.goal_id == goal_id] async def update_message(self, message_id: str, **updates) -> None: """更新 Message 字段""" message = await self.get_message(message_id) if not message: return # 更新字段 for key, value in updates.items(): if hasattr(message, key): setattr(message, key, value) # 确定文件路径 messages_dir = self._get_messages_dir(message.trace_id) message_file = messages_dir / f"{message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False)) # ===== 事件流操作(用于 WebSocket 断线续传)===== async def get_events( self, trace_id: str, since_event_id: int = 0 ) -> List[Dict[str, Any]]: """获取事件流""" events_file = self._get_events_file(trace_id) if not events_file.exists(): return [] events = [] with events_file.open('r') as f: for line in f: try: event = json.loads(line.strip()) if event.get("event_id", 0) > since_event_id: events.append(event) except Exception: continue return events async def append_event( self, trace_id: str, event_type: str, payload: Dict[str, Any] ) -> int: """追加事件,返回 event_id""" # 获取 trace 并递增 event_id trace = await self.get_trace(trace_id) if not trace: return 0 trace.last_event_id += 1 event_id = trace.last_event_id # 更新 trace 的 last_event_id await self.update_trace(trace_id, last_event_id=event_id) # 创建事件 event = { "event_id": event_id, "event": event_type, "ts": datetime.now().isoformat(), **payload } # 追加到 events.jsonl events_file = self._get_events_file(trace_id) with events_file.open('a', encoding='utf-8') as f: f.write(json.dumps(event, ensure_ascii=False) + '\n') return event_id