""" FileSystem Trace Store - 文件系统存储实现 用于跨进程数据共享,数据持久化到 .trace/ 目录 目录结构: .trace/{trace_id}/ ├── meta.json # Trace 元数据 ├── goal.json # GoalTree(扁平 JSON,通过 parent_id 构建层级) ├── messages/ # Messages(每条独立文件) │ ├── {message_id}.json │ └── ... └── events.jsonl # 事件流(WebSocket 续传) Sub-Trace 是完全独立的 Trace,有自己的目录: .trace/{parent_id}@{mode}-{timestamp}-{seq}/ ├── meta.json # parent_trace_id 指向父 Trace ├── goal.json ├── messages/ └── events.jsonl """ import json import os import logging from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime from .models import Trace, Message from .goal_models import GoalTree, Goal, GoalStats logger = logging.getLogger(__name__) class FileSystemTraceStore: """文件系统 Trace 存储""" def __init__(self, base_path: str = ".trace"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) def _get_trace_dir(self, trace_id: str) -> Path: """获取 trace 目录""" return self.base_path / trace_id def _get_meta_file(self, trace_id: str) -> Path: """获取 meta.json 文件路径""" return self._get_trace_dir(trace_id) / "meta.json" def _get_goal_file(self, trace_id: str) -> Path: """获取 goal.json 文件路径""" return self._get_trace_dir(trace_id) / "goal.json" def _get_messages_dir(self, trace_id: str) -> Path: """获取 messages 目录""" return self._get_trace_dir(trace_id) / "messages" def _get_message_file(self, trace_id: str, message_id: str) -> Path: """获取 message 文件路径""" return self._get_messages_dir(trace_id) / f"{message_id}.json" def _get_events_file(self, trace_id: str) -> Path: """获取 events.jsonl 文件路径""" return self._get_trace_dir(trace_id) / "events.jsonl" def _get_model_usage_file(self, trace_id: str) -> Path: """获取 model_usage.json 文件路径""" return self._get_trace_dir(trace_id) / "model_usage.json" # ===== Trace 操作 ===== async def create_trace(self, trace: Trace) -> str: """创建新的 Trace""" trace_dir = self._get_trace_dir(trace.trace_id) trace_dir.mkdir(exist_ok=True) # 创建 messages 目录 messages_dir = self._get_messages_dir(trace.trace_id) messages_dir.mkdir(exist_ok=True) # 写入 meta.json meta_file = self._get_meta_file(trace.trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8") # 创建空的 events.jsonl events_file = self._get_events_file(trace.trace_id) events_file.touch() return trace.trace_id async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取 Trace""" meta_file = self._get_meta_file(trace_id) if not meta_file.exists(): return None data = json.loads(meta_file.read_text(encoding="utf-8")) # 解析 datetime 字段 if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) return Trace.from_dict(data) async def update_trace(self, trace_id: str, **updates) -> None: """更新 Trace""" trace = await self.get_trace(trace_id) if not trace: return # 更新字段 for key, value in updates.items(): if hasattr(trace, key): setattr(trace, key, value) # 写回文件 meta_file = self._get_meta_file(trace_id) meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8") async def list_traces( self, mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = 50 ) -> List[Trace]: """列出 Traces""" traces = [] if not self.base_path.exists(): return [] for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue meta_file = trace_dir / "meta.json" if not meta_file.exists(): continue try: data = json.loads(meta_file.read_text(encoding="utf-8")) # 过滤 if mode and data.get("mode") != mode: continue if agent_type and data.get("agent_type") != agent_type: continue if uid and data.get("uid") != uid: continue if status and data.get("status") != status: continue # 解析 datetime if data.get("created_at"): data["created_at"] = datetime.fromisoformat(data["created_at"]) if data.get("completed_at"): data["completed_at"] = datetime.fromisoformat(data["completed_at"]) traces.append(Trace.from_dict(data)) except Exception: continue # 排序(最新的在前) traces.sort(key=lambda t: t.created_at, reverse=True) return traces[:limit] # ===== GoalTree 操作 ===== async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: """获取 GoalTree""" goal_file = self._get_goal_file(trace_id) if not goal_file.exists(): return None try: data = json.loads(goal_file.read_text(encoding="utf-8")) return GoalTree.from_dict(data) except Exception: return None async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: """更新完整 GoalTree""" goal_file = self._get_goal_file(trace_id) goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8") async def add_goal(self, trace_id: str, goal: Goal) -> None: """添加 Goal 到 GoalTree""" tree = await self.get_goal_tree(trace_id) if not tree: return tree.goals.append(goal) await self.update_goal_tree(trace_id, tree) # 推送 goal_added 事件 event_data = { "goal": goal.to_dict(), "parent_id": goal.parent_id } await self.append_event(trace_id, "goal_added", event_data) # 打印详细的 goal 信息 desc_preview = goal.description[:80] + "..." if len(goal.description) > 80 else goal.description print(f"[Goal Added] ID={goal.id}, Parent={goal.parent_id or 'root'}") print(f" 📝 {desc_preview}") if goal.reason: reason_preview = goal.reason[:60] + "..." if len(goal.reason) > 60 else goal.reason print(f" 💡 {reason_preview}") async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: """更新 Goal 字段""" tree = await self.get_goal_tree(trace_id) if not tree: return goal = tree.find(goal_id) if not goal: return # 更新字段 for key, value in updates.items(): if hasattr(goal, key): # 特殊处理 stats 字段(可能是 dict) if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict): value = GoalStats.from_dict(value) setattr(goal, key, value) await self.update_goal_tree(trace_id, tree) # 推送 goal_updated 事件 # 如果状态变为 completed,检查是否需要级联完成父 Goal affected_goals = [{"goal_id": goal_id, "updates": updates}] if updates.get("status") == "completed": # 检查级联完成:如果所有兄弟 Goal 都完成,父 Goal 也完成 cascade_completed = await self._check_cascade_completion(trace_id, goal) affected_goals.extend(cascade_completed) await self.append_event(trace_id, "goal_updated", { "goal_id": goal_id, "updates": updates, "affected_goals": affected_goals }) print(f"[DEBUG] Pushed goal_updated event: goal_id={goal_id}, updates={updates}, affected={len(affected_goals)}") async def _check_cascade_completion( self, trace_id: str, completed_goal: Goal ) -> List[Dict[str, Any]]: """ 检查级联完成:如果一个 Goal 的所有子 Goal 都完成,则自动完成父 Goal Args: trace_id: Trace ID completed_goal: 刚完成的 Goal Returns: 受影响的父 Goals 列表(自动完成的) """ if not completed_goal.parent_id: return [] tree = await self.get_goal_tree(trace_id) if not tree: return [] affected = [] parent = tree.find(completed_goal.parent_id) if not parent: return [] # 获取父 Goal 的所有子 Goal children = tree.get_children(parent.id) # 检查是否所有子 Goal 都已完成(排除 abandoned) all_completed = all( child.status in ["completed", "abandoned"] for child in children ) if all_completed and parent.status != "completed": # 自动完成父 Goal parent.status = "completed" if not parent.summary: # 生成自动摘要 completed_count = sum(1 for c in children if c.status == "completed") parent.summary = f"所有子目标已完成 ({completed_count}/{len(children)})" await self.update_goal_tree(trace_id, tree) affected.append({ "goal_id": parent.id, "status": "completed", "summary": parent.summary, "cumulative_stats": parent.cumulative_stats.to_dict() }) # 递归检查祖父 Goal grandparent_affected = await self._check_cascade_completion(trace_id, parent) affected.extend(grandparent_affected) return affected # ===== Message 操作 ===== async def add_message(self, message: Message) -> str: """ 添加 Message 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats) """ trace_id = message.trace_id # 1. 写入 message 文件 messages_dir = self._get_messages_dir(trace_id) message_file = messages_dir / f"{message.message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8") # 2. 更新 trace 统计 trace = await self.get_trace(trace_id) if trace: trace.total_messages += 1 trace.last_sequence = max(trace.last_sequence, message.sequence) # 累计 tokens(完整版) if message.prompt_tokens: trace.total_prompt_tokens += message.prompt_tokens if message.completion_tokens: trace.total_completion_tokens += message.completion_tokens if message.reasoning_tokens: trace.total_reasoning_tokens += message.reasoning_tokens if message.cache_creation_tokens: trace.total_cache_creation_tokens += message.cache_creation_tokens if message.cache_read_tokens: trace.total_cache_read_tokens += message.cache_read_tokens # 向后兼容:也更新 total_tokens if message.tokens: trace.total_tokens += message.tokens elif message.prompt_tokens or message.completion_tokens: trace.total_tokens += (message.prompt_tokens or 0) + (message.completion_tokens or 0) if message.cost: trace.total_cost += message.cost if message.duration_ms: trace.total_duration_ms += message.duration_ms # 更新 Trace await self.update_trace( trace_id, total_messages=trace.total_messages, last_sequence=trace.last_sequence, total_tokens=trace.total_tokens, total_prompt_tokens=trace.total_prompt_tokens, total_completion_tokens=trace.total_completion_tokens, total_reasoning_tokens=trace.total_reasoning_tokens, total_cache_creation_tokens=trace.total_cache_creation_tokens, total_cache_read_tokens=trace.total_cache_read_tokens, total_cost=trace.total_cost, total_duration_ms=trace.total_duration_ms ) # 3. 更新 Goal stats await self._update_goal_stats(trace_id, message) # 4. 追加 message_added 事件 affected_goals = await self._get_affected_goals(trace_id, message) event_id = await self.append_event(trace_id, "message_added", { "message": message.to_dict(), "affected_goals": affected_goals }) if event_id: try: from . import websocket as trace_ws await trace_ws.broadcast_message_added( trace_id=trace_id, event_id=event_id, message_dict=message.to_dict(), affected_goals=affected_goals, ) except Exception: logger.exception("Failed to broadcast message_added (trace_id=%s, event_id=%s)", trace_id, event_id) return message.message_id async def _update_goal_stats(self, trace_id: str, message: Message) -> None: """更新 Goal 的 self_stats 和祖先的 cumulative_stats""" tree = await self.get_goal_tree(trace_id) if not tree: return # 找到关联的 Goal goal = tree.find(message.goal_id) if not goal: return # 更新自身 self_stats goal.self_stats.message_count += 1 if message.tokens: goal.self_stats.total_tokens += message.tokens if message.cost: goal.self_stats.total_cost += message.cost # TODO: 更新 preview(工具调用摘要) # 更新自身 cumulative_stats goal.cumulative_stats.message_count += 1 if message.tokens: goal.cumulative_stats.total_tokens += message.tokens if message.cost: goal.cumulative_stats.total_cost += message.cost # 沿祖先链向上更新 cumulative_stats current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break parent.cumulative_stats.message_count += 1 if message.tokens: parent.cumulative_stats.total_tokens += message.tokens if message.cost: parent.cumulative_stats.total_cost += message.cost current_goal = parent # 保存更新后的 tree await self.update_goal_tree(trace_id, tree) async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]: """获取受影响的 Goals(自身 + 所有祖先)""" tree = await self.get_goal_tree(trace_id) if not tree: return [] goal = tree.find(message.goal_id) if not goal: return [] affected = [] # 添加自身(包含 self_stats 和 cumulative_stats) affected.append({ "goal_id": goal.id, "self_stats": goal.self_stats.to_dict(), "cumulative_stats": goal.cumulative_stats.to_dict() }) # 添加所有祖先(仅 cumulative_stats) current_goal = goal while current_goal.parent_id: parent = tree.find(current_goal.parent_id) if not parent: break affected.append({ "goal_id": parent.id, "cumulative_stats": parent.cumulative_stats.to_dict() }) current_goal = parent return affected return affected async def get_message(self, message_id: str) -> Optional[Message]: """获取 Message(扫描所有 trace)""" for trace_dir in self.base_path.iterdir(): if not trace_dir.is_dir(): continue # 检查 messages 目录 message_file = trace_dir / "messages" / f"{message_id}.json" if message_file.exists(): try: data = json.loads(message_file.read_text(encoding="utf-8")) return Message.from_dict(data) except Exception: pass return None async def get_trace_messages( self, trace_id: str, ) -> List[Message]: """获取 Trace 的所有 Messages(包含所有分支,按 sequence 排序)""" messages_dir = self._get_messages_dir(trace_id) if not messages_dir.exists(): return [] messages = [] for message_file in messages_dir.glob("*.json"): try: data = json.loads(message_file.read_text(encoding="utf-8")) msg = Message.from_dict(data) messages.append(msg) except Exception: continue # 按 sequence 排序 messages.sort(key=lambda m: m.sequence) return messages async def get_main_path_messages( self, trace_id: str, head_sequence: int ) -> List[Message]: """ 获取从 head_sequence 沿 parent_sequence 链回溯到 root 的完整路径 此函数是通用的路径追溯函数,返回从指定 head 到 root 的完整消息链。 只要 trace.head_sequence 管理正确(指向主路径),此函数自然返回主路径消息。 侧分支消息通过 parent_sequence 链自然被跳过(因为主路径的 parent 不指向侧分支)。 Returns: 按 sequence 正序排列的路径 Message 列表 """ # 加载所有消息,建立 sequence -> Message 索引 all_messages = await self.get_trace_messages(trace_id) messages_by_seq = {m.sequence: m for m in all_messages} # 从 head 沿 parent chain 回溯 path = [] seq = head_sequence while seq is not None: msg = messages_by_seq.get(seq) if not msg: break path.append(msg) seq = msg.parent_sequence # 反转为正序(root → head) path.reverse() return path async def get_messages_by_goal( self, trace_id: str, goal_id: str ) -> List[Message]: """获取指定 Goal 关联的所有 Messages""" all_messages = await self.get_trace_messages(trace_id) return [m for m in all_messages if m.goal_id == goal_id] async def update_message(self, message_id: str, **updates) -> None: """更新 Message 字段""" message = await self.get_message(message_id) if not message: return # 更新字段 for key, value in updates.items(): if hasattr(message, key): setattr(message, key, value) # 确定文件路径 messages_dir = self._get_messages_dir(message.trace_id) message_file = messages_dir / f"{message_id}.json" message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8") async def abandon_messages_after(self, trace_id: str, cutoff_sequence: int) -> List[str]: """ 将 sequence > cutoff_sequence 的 active messages 标记为 abandoned。 返回被 abandon 的 message_id 列表。 """ all_messages = await self.get_trace_messages(trace_id) abandoned_ids = [] now = datetime.now() for msg in all_messages: if msg.sequence > cutoff_sequence and msg.status == "active": msg.status = "abandoned" msg.abandoned_at = now # 直接写回文件 message_file = self._get_messages_dir(trace_id) / f"{msg.message_id}.json" message_file.write_text( json.dumps(msg.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8" ) abandoned_ids.append(msg.message_id) return abandoned_ids # ===== 模型使用追踪 ===== async def record_model_usage( self, trace_id: str, sequence: int, role: str, model: str, prompt_tokens: int, completion_tokens: int, cache_read_tokens: int = 0, tool_name: Optional[str] = None, ) -> None: """ 记录模型使用情况到 model_usage.json Args: trace_id: Trace ID sequence: 消息序号 role: 角色(assistant/tool) model: 模型名称 prompt_tokens: 输入tokens completion_tokens: 输出tokens cache_read_tokens: 缓存读取tokens tool_name: 工具名称(role=tool时) """ usage_file = self._get_model_usage_file(trace_id) # 读取现有数据 if usage_file.exists(): data = json.loads(usage_file.read_text(encoding="utf-8")) else: data = { "summary": { "total_models": 0, "total_tokens": 0, "total_cache_read_tokens": 0, "agent_tokens": 0, "tool_tokens": 0, }, "models": [], "timeline": [], } # 更新summary total_tokens = prompt_tokens + completion_tokens data["summary"]["total_tokens"] += total_tokens data["summary"]["total_cache_read_tokens"] += cache_read_tokens if role == "assistant": data["summary"]["agent_tokens"] += total_tokens source = "agent" else: data["summary"]["tool_tokens"] += total_tokens source = f"tool:{tool_name}" if tool_name else "tool" # 更新models列表 model_entry = None for m in data["models"]: if m["model"] == model and m["source"] == source: model_entry = m break if model_entry: model_entry["prompt_tokens"] += prompt_tokens model_entry["completion_tokens"] += completion_tokens model_entry["total_tokens"] += total_tokens model_entry["cache_read_tokens"] += cache_read_tokens model_entry["call_count"] += 1 else: data["models"].append({ "model": model, "source": source, "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens, "cache_read_tokens": cache_read_tokens, "call_count": 1, }) data["summary"]["total_models"] = len(data["models"]) # 添加到timeline timeline_entry = { "sequence": sequence, "role": role, "model": model, "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, } if cache_read_tokens > 0: timeline_entry["cache_read_tokens"] = cache_read_tokens if tool_name: timeline_entry["tool_name"] = tool_name data["timeline"].append(timeline_entry) # 写回文件 usage_file.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") # ===== 事件流操作(用于 WebSocket 断线续传)===== async def get_events( self, trace_id: str, since_event_id: int = 0 ) -> List[Dict[str, Any]]: """获取事件流""" events_file = self._get_events_file(trace_id) if not events_file.exists(): return [] events = [] with events_file.open('r', encoding='utf-8') as f: for line in f: try: event = json.loads(line.strip()) if event.get("event_id", 0) > since_event_id: events.append(event) except Exception: continue return events async def append_event( self, trace_id: str, event_type: str, payload: Dict[str, Any] ) -> int: """追加事件,返回 event_id""" # 获取 trace 并递增 event_id trace = await self.get_trace(trace_id) if not trace: return 0 trace.last_event_id += 1 event_id = trace.last_event_id # 更新 trace 的 last_event_id await self.update_trace(trace_id, last_event_id=event_id) # 创建事件 event = { "event_id": event_id, "event": event_type, "ts": datetime.now().isoformat(), **payload } # 追加到 events.jsonl events_file = self._get_events_file(trace_id) with events_file.open('a', encoding='utf-8') as f: f.write(json.dumps(event, ensure_ascii=False) + '\n') return event_id