""" FileSystem Trace Store - 文件系统存储实现 用于跨进程数据共享,数据持久化到 .trace/ 目录 目录结构: .trace/{trace_id}/ ├── meta.json # Trace 元数据 ├── goal.json # GoalTree(扁平 JSON,通过 parent_id 构建层级) ├── messages/ # Messages(每条独立文件) │ ├── {message_id}.json │ └── ... └── events.jsonl # 事件流(WebSocket 续传) Sub-Trace 是完全独立的 Trace,有自己的目录: .trace/{parent_id}@{mode}-{timestamp}-{seq}/ ├── meta.json # parent_trace_id 指向父 Trace ├── goal.json ├── messages/ └── events.jsonl """ import json import os from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime from agent.execution.models import Trace, Message from agent.models.goal import GoalTree, Goal, GoalStats class FileSystemTraceStore: """文件系统 Trace 存储""" def __init__(self, base_path: str = ".trace"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) def _get_trace_dir(self, trace_id: str) -> Path: """获取 trace 目录""" return self.base_path / trace_id def _get_meta_file(self, trace_id: str) -> Path: """获取 meta.json 文件路径""" return self._get_trace_dir(trace_id) / "meta.json" def _get_goal_file(self, trace_id: str) -> Path: """获取 goal.json 文件路径""" return self._get_trace_dir(trace_id) / "goal.json" def _get_messages_dir(self, trace_id: str) -> Path: """获取 messages 目录""" return self._get_trace_dir(trace_id) / "messages" def _get_message_file(self, trace_id: str, message_id: str) -> Path: """获取 message 文件路径""" return self._get_messages_dir(trace_id) / f"{message_id}.json" def _get_events_file(self, trace_id: str) -> Path: """获取 events.jsonl 文件路径""" return self._get_trace_dir(trace_id) / "events.jsonl" # ===== Trace 操作 ===== async def create_trace(self, trace: Trace) -> str: """创建新的 Trace""" trace_dir = self._get_trace_dir(trace.trace_id) trace_dir.mkdir(exist_ok=True) # 创建 messages 目录 messages_dir = self._get_messages_dir(trace.trace_id) messages_dir.mkdir(exist_ok=True) # 写入 meta.json meta_file = self._get_meta_file(trace.trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) # 创建空的 events.jsonl events_file = self._get_events_file(trace.trace_id) events_file.touch() return trace.trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取 Trace""" meta_file = self._get_meta_file(trace_id) if not meta_file.exists(): return None data = json.loads(meta_file.read_text()) # 解析 datetime 字段 if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) return Trace(**data) async def update_trace(self, trace_id: str, **updates) -> None: """更新 Trace""" trace = await self.get_trace(trace_id) if not trace: return # 更新字段 for key, value in updates.items(): if hasattr(trace, key): setattr(trace, key, value) # 写回文件 meta_file = self._get_meta_file(trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False)) async def list_traces( self, mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = 50 ) -> List[Trace]: """列出 Traces""" traces = [] if not self.base_path.exists(): return [] for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue meta_file = trace_dir / "meta.json" if not meta_file.exists(): continue try: data = json.loads(meta_file.read_text()) # 过滤 if mode and data.get("mode") != mode: continue if agent_type and data.get("agent_type") != agent_type: continue if uid and data.get("uid") != uid: continue if status and data.get("status") != status: continue # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) traces.append(Trace(**data)) except Exception: continue # 排序(最新的在前) traces.sort(key=lambda t: t.created_at, reverse=True) return traces[:limit] # ===== GoalTree 操作 ===== async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: """获取 GoalTree""" goal_file = self._get_goal_file(trace_id) if not goal_file.exists(): return None try: data = json.loads(goal_file.read_text()) return GoalTree.from_dict(data) except Exception: return None async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: """更新完整 GoalTree""" goal_file = self._get_goal_file(trace_id) goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False)) async def add_goal(self, trace_id: str, goal: Goal) -> None: """添加 Goal 到 GoalTree""" tree = await self.get_goal_tree(trace_id) if not tree: return tree.goals.append(goal) await self.update_goal_tree(trace_id, tree) # 推送 goal_added 事件 event_data = { "goal": goal.to_dict(), "parent_id": goal.parent_id } await self.append_event(trace_id, "goal_added", event_data) print(f"[DEBUG] Pushed goal_added event: goal_id={goal.id}, parent_id={goal.parent_id}") async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: """更新 Goal 字段""" tree = await self.get_goal_tree(trace_id) if not tree: return goal = tree.find(goal_id) if not goal: return # 更新字段 for key, value in updates.items(): if hasattr(goal, key): # 特殊处理 stats 字段(可能是 dict) if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict): value = GoalStats.from_dict(value) setattr(goal, key, value) await self.update_goal_tree(trace_id, tree) # 推送 goal_updated 事件 # 如果状态变为 completed,检查是否需要级联完成父 Goal affected_goals = [{"goal_id": goal_id, "updates": updates}] if updates.get("status") == "completed": # 检查级联完成:如果所有兄弟 Goal 都完成,父 Goal 也完成 cascade_completed = await self._check_cascade_completion(trace_id, goal) affected_goals.extend(cascade_completed) await self.append_event(trace_id, "goal_updated", { "goal_id": goal_id, "updates": updates, "affected_goals": affected_goals }) print(f"[DEBUG] Pushed goal_updated event: goal_id={goal_id}, updates={updates}, affected={len(affected_goals)}") async def _check_cascade_completion( self, trace_id: str, completed_goal: Goal ) -> List[Dict[str, Any]]: """ 检查级联完成:如果一个 Goal 的所有子 Goal 都完成,则自动完成父 Goal Args: trace_id: Trace ID completed_goal: 刚完成的 Goal Returns: 受影响的父 Goals 列表(自动完成的) """ if not completed_goal.parent_id: return [] tree = await self.get_goal_tree(trace_id) if not tree: return [] affected = [] parent = tree.find(completed_goal.parent_id) if not parent: return [] # 获取父 Goal 的所有子 Goal children = tree.get_children(parent.id) # 检查是否所有子 Goal 都已完成(排除 abandoned) all_completed = all( child.status in ["completed", "abandoned"] for child in children ) if all_completed and parent.status != "completed": # 自动完成父 Goal parent.status = "completed" if not parent.summary: # 生成自动摘要 completed_count = sum(1 for c in children if c.status == "completed") parent.summary = f"所有子目标已完成 ({completed_count}/{len(children)})" await self.update_goal_tree(trace_id, tree) affected.append({ "goal_id": parent.id, "status": "completed", "summary": parent.summary, "cumulative_stats": parent.cumulative_stats.to_dict() }) # 递归检查祖父 Goal grandparent_affected = await self._check_cascade_completion(trace_id, parent) affected.extend(grandparent_affected) return affected # ===== Message 操作 ===== async def add_message(self, message: Message) -> str: """ 添加 Message 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats) """ trace_id = message.trace_id # 1. 写入 message 文件 messages_dir = self._get_messages_dir(trace_id) message_file = messages_dir / f"{message.message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False)) # 2. 更新 trace 统计 trace = await self.get_trace(trace_id) if trace: trace.total_messages += 1 trace.last_sequence = max(trace.last_sequence, message.sequence) # 累计 tokens(拆分) if message.prompt_tokens: trace.total_prompt_tokens += message.prompt_tokens if message.completion_tokens: trace.total_completion_tokens += message.completion_tokens # 向后兼容:也更新 total_tokens if message.tokens: trace.total_tokens += message.tokens elif message.prompt_tokens or message.completion_tokens: trace.total_tokens += (message.prompt_tokens or 0) + (message.completion_tokens or 0) if message.cost: trace.total_cost += message.cost if message.duration_ms: trace.total_duration_ms += message.duration_ms # 更新 Trace(不要传递 trace_id,它已经在方法参数中) await self.update_trace( trace_id, total_messages=trace.total_messages, last_sequence=trace.last_sequence, total_tokens=trace.total_tokens, total_prompt_tokens=trace.total_prompt_tokens, total_completion_tokens=trace.total_completion_tokens, total_cost=trace.total_cost, total_duration_ms=trace.total_duration_ms ) # 3. 更新 Goal stats await self._update_goal_stats(trace_id, message) # 4. 追加 message_added 事件 affected_goals = await self._get_affected_goals(trace_id, message) await self.append_event(trace_id, "message_added", { "message": message.to_dict(), "affected_goals": affected_goals }) return message.message_id async def _update_goal_stats(self, trace_id: str, message: Message) -> None: """更新 Goal 的 self_stats 和祖先的 cumulative_stats""" tree = await self.get_goal_tree(trace_id) if not tree: return # 找到关联的 Goal goal = tree.find(message.goal_id) if not goal: return # 更新自身 self_stats goal.self_stats.message_count += 1 if message.tokens: goal.self_stats.total_tokens += message.tokens if message.cost: goal.self_stats.total_cost += message.cost # TODO: 更新 preview(工具调用摘要) # 更新自身 cumulative_stats goal.cumulative_stats.message_count += 1 if message.tokens: goal.cumulative_stats.total_tokens += message.tokens if message.cost: goal.cumulative_stats.total_cost += message.cost # 沿祖先链向上更新 cumulative_stats current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break parent.cumulative_stats.message_count += 1 if message.tokens: parent.cumulative_stats.total_tokens += message.tokens if message.cost: parent.cumulative_stats.total_cost += message.cost current_goal = parent # 保存更新后的 tree await self.update_goal_tree(trace_id, tree) async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]: """获取受影响的 Goals(自身 + 所有祖先)""" tree = await self.get_goal_tree(trace_id) if not tree: return [] goal = tree.find(message.goal_id) if not goal: return [] affected = [] # 添加自身(包含 self_stats 和 cumulative_stats) affected.append({ "goal_id": goal.id, "self_stats": goal.self_stats.to_dict(), "cumulative_stats": goal.cumulative_stats.to_dict() }) # 添加所有祖先(仅 cumulative_stats) current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break affected.append({ "goal_id": parent.id, "cumulative_stats": parent.cumulative_stats.to_dict() }) current_goal = parent return affected return affected async def get_message(self, message_id: str) -> Optional[Message]: """获取 Message(扫描所有 trace)""" for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue # 检查 messages 目录 message_file = trace_dir / "messages" / f"{message_id}.json" if message_file.exists(): try: data = json.loads(message_file.read_text()) return Message.from_dict(data) except Exception: pass return None async def get_trace_messages( self, trace_id: str ) -> List[Message]: """获取 Trace 的所有 Messages""" messages_dir = self._get_messages_dir(trace_id) if not messages_dir.exists(): return [] messages = [] for message_file in messages_dir.glob("*.json"): try: data = json.loads(message_file.read_text()) messages.append(Message.from_dict(data)) except Exception: continue # 按 sequence 排序 messages.sort(key=lambda m: m.sequence) return messages async def get_messages_by_goal( self, trace_id: str, goal_id: str ) -> List[Message]: """获取指定 Goal 关联的所有 Messages""" all_messages = await self.get_trace_messages(trace_id) return [m for m in all_messages if m.goal_id == goal_id] async def update_message(self, message_id: str, **updates) -> None: """更新 Message 字段""" message = await self.get_message(message_id) if not message: return # 更新字段 for key, value in updates.items(): if hasattr(message, key): setattr(message, key, value) # 确定文件路径 messages_dir = self._get_messages_dir(message.trace_id) message_file = messages_dir / f"{message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False)) # ===== 事件流操作(用于 WebSocket 断线续传)===== async def get_events( self, trace_id: str, since_event_id: int = 0 ) -> List[Dict[str, Any]]: """获取事件流""" events_file = self._get_events_file(trace_id) if not events_file.exists(): return [] events = [] with events_file.open('r') as f: for line in f: try: event = json.loads(line.strip()) if event.get("event_id", 0) > since_event_id: events.append(event) except Exception: continue return events async def append_event( self, trace_id: str, event_type: str, payload: Dict[str, Any] ) -> int: """追加事件,返回 event_id""" # 获取 trace 并递增 event_id trace = await self.get_trace(trace_id) if not trace: return 0 trace.last_event_id += 1 event_id = trace.last_event_id # 更新 trace 的 last_event_id await self.update_trace(trace_id, last_event_id=event_id) # 创建事件 event = { "event_id": event_id, "event": event_type, "ts": datetime.now().isoformat(), **payload } # 追加到 events.jsonl events_file = self._get_events_file(trace_id) with events_file.open('a') as f: f.write(json.dumps(event, ensure_ascii=False) + '\n') return event_id