| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748 |
- """
- FileSystem Trace Store - 文件系统存储实现
- 用于跨进程数据共享,数据持久化到 .trace/ 目录
- 目录结构:
- .trace/{trace_id}/
- ├── meta.json # Trace 元数据
- ├── goal.json # GoalTree(扁平 JSON,通过 parent_id 构建层级)
- ├── messages/ # Messages(每条独立文件)
- │ ├── {message_id}.json
- │ └── ...
- └── events.jsonl # 事件流(WebSocket 续传)
- Sub-Trace 是完全独立的 Trace,有自己的目录:
- .trace/{parent_id}@{mode}-{timestamp}-{seq}/
- ├── meta.json # parent_trace_id 指向父 Trace
- ├── goal.json
- ├── messages/
- └── events.jsonl
- """
- import json
- import os
- import logging
- from pathlib import Path
- from typing import Dict, List, Optional, Any
- from datetime import datetime
- from .models import Trace, Message
- from .goal_models import GoalTree, Goal, GoalStats
- logger = logging.getLogger(__name__)
- class FileSystemTraceStore:
- """文件系统 Trace 存储"""
- def __init__(self, base_path: str = ".trace"):
- self.base_path = Path(base_path)
- self.base_path.mkdir(exist_ok=True)
- def _get_trace_dir(self, trace_id: str) -> Path:
- """获取 trace 目录"""
- return self.base_path / trace_id
- def _get_meta_file(self, trace_id: str) -> Path:
- """获取 meta.json 文件路径"""
- return self._get_trace_dir(trace_id) / "meta.json"
- def _get_goal_file(self, trace_id: str) -> Path:
- """获取 goal.json 文件路径"""
- return self._get_trace_dir(trace_id) / "goal.json"
- def _get_messages_dir(self, trace_id: str) -> Path:
- """获取 messages 目录"""
- return self._get_trace_dir(trace_id) / "messages"
- def _get_message_file(self, trace_id: str, message_id: str) -> Path:
- """获取 message 文件路径"""
- return self._get_messages_dir(trace_id) / f"{message_id}.json"
- def _get_events_file(self, trace_id: str) -> Path:
- """获取 events.jsonl 文件路径"""
- return self._get_trace_dir(trace_id) / "events.jsonl"
- def _get_model_usage_file(self, trace_id: str) -> Path:
- """获取 model_usage.json 文件路径"""
- return self._get_trace_dir(trace_id) / "model_usage.json"
- # ===== Trace 操作 =====
- async def create_trace(self, trace: Trace) -> str:
- """创建新的 Trace"""
- trace_dir = self._get_trace_dir(trace.trace_id)
- trace_dir.mkdir(exist_ok=True)
- # 创建 messages 目录
- messages_dir = self._get_messages_dir(trace.trace_id)
- messages_dir.mkdir(exist_ok=True)
- # 写入 meta.json
- meta_file = self._get_meta_file(trace.trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
- # 创建空的 events.jsonl
- events_file = self._get_events_file(trace.trace_id)
- events_file.touch()
- return trace.trace_id
- async def get_trace(self, trace_id: str) -> Optional[Trace]:
- """获取 Trace"""
- meta_file = self._get_meta_file(trace_id)
- if not meta_file.exists():
- return None
- data = json.loads(meta_file.read_text(encoding="utf-8"))
- # 解析 datetime 字段
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- if data.get("completed_at"):
- data["completed_at"] = datetime.fromisoformat(data["completed_at"])
- return Trace(**data)
- async def update_trace(self, trace_id: str, **updates) -> None:
- """更新 Trace"""
- trace = await self.get_trace(trace_id)
- if not trace:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(trace, key):
- setattr(trace, key, value)
- # 写回文件
- meta_file = self._get_meta_file(trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
- async def list_traces(
- self,
- mode: Optional[str] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- status: Optional[str] = None,
- limit: int = 50
- ) -> List[Trace]:
- """列出 Traces"""
- traces = []
- if not self.base_path.exists():
- return []
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- meta_file = trace_dir / "meta.json"
- if not meta_file.exists():
- continue
- try:
- data = json.loads(meta_file.read_text(encoding="utf-8"))
- # 过滤
- if mode and data.get("mode") != mode:
- continue
- if agent_type and data.get("agent_type") != agent_type:
- continue
- if uid and data.get("uid") != uid:
- continue
- if status and data.get("status") != status:
- continue
- # 解析 datetime
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- if data.get("completed_at"):
- data["completed_at"] = datetime.fromisoformat(data["completed_at"])
- traces.append(Trace(**data))
- except Exception:
- continue
- # 排序(最新的在前)
- traces.sort(key=lambda t: t.created_at, reverse=True)
- return traces[:limit]
- # ===== GoalTree 操作 =====
- async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]:
- """获取 GoalTree"""
- goal_file = self._get_goal_file(trace_id)
- if not goal_file.exists():
- return None
- try:
- data = json.loads(goal_file.read_text(encoding="utf-8"))
- return GoalTree.from_dict(data)
- except Exception:
- return None
- async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None:
- """更新完整 GoalTree"""
- goal_file = self._get_goal_file(trace_id)
- goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
- async def add_goal(self, trace_id: str, goal: Goal) -> None:
- """添加 Goal 到 GoalTree"""
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return
- tree.goals.append(goal)
- await self.update_goal_tree(trace_id, tree)
- # 推送 goal_added 事件
- event_data = {
- "goal": goal.to_dict(),
- "parent_id": goal.parent_id
- }
- await self.append_event(trace_id, "goal_added", event_data)
- # 打印详细的 goal 信息
- desc_preview = goal.description[:80] + "..." if len(goal.description) > 80 else goal.description
- print(f"[Goal Added] ID={goal.id}, Parent={goal.parent_id or 'root'}")
- print(f" 📝 {desc_preview}")
- if goal.reason:
- reason_preview = goal.reason[:60] + "..." if len(goal.reason) > 60 else goal.reason
- print(f" 💡 {reason_preview}")
- async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None:
- """更新 Goal 字段"""
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return
- goal = tree.find(goal_id)
- if not goal:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(goal, key):
- # 特殊处理 stats 字段(可能是 dict)
- if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict):
- value = GoalStats.from_dict(value)
- setattr(goal, key, value)
- await self.update_goal_tree(trace_id, tree)
- # 推送 goal_updated 事件
- # 如果状态变为 completed,检查是否需要级联完成父 Goal
- affected_goals = [{"goal_id": goal_id, "updates": updates}]
- if updates.get("status") == "completed":
- # 检查级联完成:如果所有兄弟 Goal 都完成,父 Goal 也完成
- cascade_completed = await self._check_cascade_completion(trace_id, goal)
- affected_goals.extend(cascade_completed)
- await self.append_event(trace_id, "goal_updated", {
- "goal_id": goal_id,
- "updates": updates,
- "affected_goals": affected_goals
- })
- print(f"[DEBUG] Pushed goal_updated event: goal_id={goal_id}, updates={updates}, affected={len(affected_goals)}")
- async def _check_cascade_completion(
- self,
- trace_id: str,
- completed_goal: Goal
- ) -> List[Dict[str, Any]]:
- """
- 检查级联完成:如果一个 Goal 的所有子 Goal 都完成,则自动完成父 Goal
- Args:
- trace_id: Trace ID
- completed_goal: 刚完成的 Goal
- Returns:
- 受影响的父 Goals 列表(自动完成的)
- """
- if not completed_goal.parent_id:
- return []
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return []
- affected = []
- parent = tree.find(completed_goal.parent_id)
- if not parent:
- return []
- # 获取父 Goal 的所有子 Goal
- children = tree.get_children(parent.id)
- # 检查是否所有子 Goal 都已完成(排除 abandoned)
- all_completed = all(
- child.status in ["completed", "abandoned"]
- for child in children
- )
- if all_completed and parent.status != "completed":
- # 自动完成父 Goal
- parent.status = "completed"
- if not parent.summary:
- # 生成自动摘要
- completed_count = sum(1 for c in children if c.status == "completed")
- parent.summary = f"所有子目标已完成 ({completed_count}/{len(children)})"
- await self.update_goal_tree(trace_id, tree)
- affected.append({
- "goal_id": parent.id,
- "status": "completed",
- "summary": parent.summary,
- "cumulative_stats": parent.cumulative_stats.to_dict()
- })
- # 递归检查祖父 Goal
- grandparent_affected = await self._check_cascade_completion(trace_id, parent)
- affected.extend(grandparent_affected)
- return affected
- # ===== Message 操作 =====
- async def add_message(self, message: Message) -> str:
- """
- 添加 Message
- 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats)
- """
- trace_id = message.trace_id
- # 1. 写入 message 文件
- messages_dir = self._get_messages_dir(trace_id)
- message_file = messages_dir / f"{message.message_id}.json"
- message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
- # 2. 更新 trace 统计
- trace = await self.get_trace(trace_id)
- if trace:
- trace.total_messages += 1
- trace.last_sequence = max(trace.last_sequence, message.sequence)
- # 累计 tokens(完整版)
- if message.prompt_tokens:
- trace.total_prompt_tokens += message.prompt_tokens
- if message.completion_tokens:
- trace.total_completion_tokens += message.completion_tokens
- if message.reasoning_tokens:
- trace.total_reasoning_tokens += message.reasoning_tokens
- if message.cache_creation_tokens:
- trace.total_cache_creation_tokens += message.cache_creation_tokens
- if message.cache_read_tokens:
- trace.total_cache_read_tokens += message.cache_read_tokens
- # 向后兼容:也更新 total_tokens
- if message.tokens:
- trace.total_tokens += message.tokens
- elif message.prompt_tokens or message.completion_tokens:
- trace.total_tokens += (message.prompt_tokens or 0) + (message.completion_tokens or 0)
- if message.cost:
- trace.total_cost += message.cost
- if message.duration_ms:
- trace.total_duration_ms += message.duration_ms
- # 更新 Trace
- await self.update_trace(
- trace_id,
- total_messages=trace.total_messages,
- last_sequence=trace.last_sequence,
- total_tokens=trace.total_tokens,
- total_prompt_tokens=trace.total_prompt_tokens,
- total_completion_tokens=trace.total_completion_tokens,
- total_reasoning_tokens=trace.total_reasoning_tokens,
- total_cache_creation_tokens=trace.total_cache_creation_tokens,
- total_cache_read_tokens=trace.total_cache_read_tokens,
- total_cost=trace.total_cost,
- total_duration_ms=trace.total_duration_ms
- )
- # 3. 更新 Goal stats
- await self._update_goal_stats(trace_id, message)
- # 4. 追加 message_added 事件
- affected_goals = await self._get_affected_goals(trace_id, message)
- event_id = await self.append_event(trace_id, "message_added", {
- "message": message.to_dict(),
- "affected_goals": affected_goals
- })
- if event_id:
- try:
- from . import websocket as trace_ws
- await trace_ws.broadcast_message_added(
- trace_id=trace_id,
- event_id=event_id,
- message_dict=message.to_dict(),
- affected_goals=affected_goals,
- )
- except Exception:
- logger.exception("Failed to broadcast message_added (trace_id=%s, event_id=%s)", trace_id, event_id)
- return message.message_id
- async def _update_goal_stats(self, trace_id: str, message: Message) -> None:
- """更新 Goal 的 self_stats 和祖先的 cumulative_stats"""
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return
- # 找到关联的 Goal
- goal = tree.find(message.goal_id)
- if not goal:
- return
- # 更新自身 self_stats
- goal.self_stats.message_count += 1
- if message.tokens:
- goal.self_stats.total_tokens += message.tokens
- if message.cost:
- goal.self_stats.total_cost += message.cost
- # TODO: 更新 preview(工具调用摘要)
- # 更新自身 cumulative_stats
- goal.cumulative_stats.message_count += 1
- if message.tokens:
- goal.cumulative_stats.total_tokens += message.tokens
- if message.cost:
- goal.cumulative_stats.total_cost += message.cost
- # 沿祖先链向上更新 cumulative_stats
- current_goal = goal
- while current_goal.parent_id:
- parent = tree.find(current_goal.parent_id)
- if not parent:
- break
- parent.cumulative_stats.message_count += 1
- if message.tokens:
- parent.cumulative_stats.total_tokens += message.tokens
- if message.cost:
- parent.cumulative_stats.total_cost += message.cost
- current_goal = parent
- # 保存更新后的 tree
- await self.update_goal_tree(trace_id, tree)
- async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]:
- """获取受影响的 Goals(自身 + 所有祖先)"""
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return []
- goal = tree.find(message.goal_id)
- if not goal:
- return []
- affected = []
- # 添加自身(包含 self_stats 和 cumulative_stats)
- affected.append({
- "goal_id": goal.id,
- "self_stats": goal.self_stats.to_dict(),
- "cumulative_stats": goal.cumulative_stats.to_dict()
- })
- # 添加所有祖先(仅 cumulative_stats)
- current_goal = goal
- while current_goal.parent_id:
- parent = tree.find(current_goal.parent_id)
- if not parent:
- break
- affected.append({
- "goal_id": parent.id,
- "cumulative_stats": parent.cumulative_stats.to_dict()
- })
- current_goal = parent
- return affected
- return affected
- async def get_message(self, message_id: str) -> Optional[Message]:
- """获取 Message(扫描所有 trace)"""
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- # 检查 messages 目录
- message_file = trace_dir / "messages" / f"{message_id}.json"
- if message_file.exists():
- try:
- data = json.loads(message_file.read_text(encoding="utf-8"))
- return Message.from_dict(data)
- except Exception:
- pass
- return None
- async def get_trace_messages(
- self,
- trace_id: str,
- ) -> List[Message]:
- """获取 Trace 的所有 Messages(包含所有分支,按 sequence 排序)"""
- messages_dir = self._get_messages_dir(trace_id)
- if not messages_dir.exists():
- return []
- messages = []
- for message_file in messages_dir.glob("*.json"):
- try:
- data = json.loads(message_file.read_text(encoding="utf-8"))
- msg = Message.from_dict(data)
- messages.append(msg)
- except Exception:
- continue
- # 按 sequence 排序
- messages.sort(key=lambda m: m.sequence)
- return messages
- async def get_main_path_messages(
- self,
- trace_id: str,
- head_sequence: int
- ) -> List[Message]:
- """
- 获取主路径上的消息(从 head_sequence 沿 parent_sequence 链回溯到 root)
- Returns:
- 按 sequence 正序排列的主路径 Message 列表
- """
- # 加载所有消息,建立 sequence -> Message 索引
- all_messages = await self.get_trace_messages(trace_id)
- messages_by_seq = {m.sequence: m for m in all_messages}
- # 从 head 沿 parent chain 回溯
- path = []
- seq = head_sequence
- while seq is not None:
- msg = messages_by_seq.get(seq)
- if not msg:
- break
- path.append(msg)
- seq = msg.parent_sequence
- # 反转为正序(root → head)
- path.reverse()
- return path
- async def get_messages_by_goal(
- self,
- trace_id: str,
- goal_id: str
- ) -> List[Message]:
- """获取指定 Goal 关联的所有 Messages"""
- all_messages = await self.get_trace_messages(trace_id)
- return [m for m in all_messages if m.goal_id == goal_id]
- async def update_message(self, message_id: str, **updates) -> None:
- """更新 Message 字段"""
- message = await self.get_message(message_id)
- if not message:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(message, key):
- setattr(message, key, value)
- # 确定文件路径
- messages_dir = self._get_messages_dir(message.trace_id)
- message_file = messages_dir / f"{message_id}.json"
- message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
- async def abandon_messages_after(self, trace_id: str, cutoff_sequence: int) -> List[str]:
- """
- 将 sequence > cutoff_sequence 的 active messages 标记为 abandoned。
- 返回被 abandon 的 message_id 列表。
- """
- all_messages = await self.get_trace_messages(trace_id)
- abandoned_ids = []
- now = datetime.now()
- for msg in all_messages:
- if msg.sequence > cutoff_sequence and msg.status == "active":
- msg.status = "abandoned"
- msg.abandoned_at = now
- # 直接写回文件
- message_file = self._get_messages_dir(trace_id) / f"{msg.message_id}.json"
- message_file.write_text(
- json.dumps(msg.to_dict(), indent=2, ensure_ascii=False),
- encoding="utf-8"
- )
- abandoned_ids.append(msg.message_id)
- return abandoned_ids
- # ===== 模型使用追踪 =====
- async def record_model_usage(
- self,
- trace_id: str,
- sequence: int,
- role: str,
- model: str,
- prompt_tokens: int,
- completion_tokens: int,
- cache_read_tokens: int = 0,
- tool_name: Optional[str] = None,
- ) -> None:
- """
- 记录模型使用情况到 model_usage.json
- Args:
- trace_id: Trace ID
- sequence: 消息序号
- role: 角色(assistant/tool)
- model: 模型名称
- prompt_tokens: 输入tokens
- completion_tokens: 输出tokens
- cache_read_tokens: 缓存读取tokens
- tool_name: 工具名称(role=tool时)
- """
- usage_file = self._get_model_usage_file(trace_id)
- # 读取现有数据
- if usage_file.exists():
- data = json.loads(usage_file.read_text(encoding="utf-8"))
- else:
- data = {
- "summary": {
- "total_models": 0,
- "total_tokens": 0,
- "total_cache_read_tokens": 0,
- "agent_tokens": 0,
- "tool_tokens": 0,
- },
- "models": [],
- "timeline": [],
- }
- # 更新summary
- total_tokens = prompt_tokens + completion_tokens
- data["summary"]["total_tokens"] += total_tokens
- data["summary"]["total_cache_read_tokens"] += cache_read_tokens
- if role == "assistant":
- data["summary"]["agent_tokens"] += total_tokens
- source = "agent"
- else:
- data["summary"]["tool_tokens"] += total_tokens
- source = f"tool:{tool_name}" if tool_name else "tool"
- # 更新models列表
- model_entry = None
- for m in data["models"]:
- if m["model"] == model and m["source"] == source:
- model_entry = m
- break
- if model_entry:
- model_entry["prompt_tokens"] += prompt_tokens
- model_entry["completion_tokens"] += completion_tokens
- model_entry["total_tokens"] += total_tokens
- model_entry["cache_read_tokens"] += cache_read_tokens
- model_entry["call_count"] += 1
- else:
- data["models"].append({
- "model": model,
- "source": source,
- "prompt_tokens": prompt_tokens,
- "completion_tokens": completion_tokens,
- "total_tokens": total_tokens,
- "cache_read_tokens": cache_read_tokens,
- "call_count": 1,
- })
- data["summary"]["total_models"] = len(data["models"])
- # 添加到timeline
- timeline_entry = {
- "sequence": sequence,
- "role": role,
- "model": model,
- "prompt_tokens": prompt_tokens,
- "completion_tokens": completion_tokens,
- }
- if cache_read_tokens > 0:
- timeline_entry["cache_read_tokens"] = cache_read_tokens
- if tool_name:
- timeline_entry["tool_name"] = tool_name
- data["timeline"].append(timeline_entry)
- # 写回文件
- usage_file.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
- # ===== 事件流操作(用于 WebSocket 断线续传)=====
- async def get_events(
- self,
- trace_id: str,
- since_event_id: int = 0
- ) -> List[Dict[str, Any]]:
- """获取事件流"""
- events_file = self._get_events_file(trace_id)
- if not events_file.exists():
- return []
- events = []
- with events_file.open('r') as f:
- for line in f:
- try:
- event = json.loads(line.strip())
- if event.get("event_id", 0) > since_event_id:
- events.append(event)
- except Exception:
- continue
- return events
- async def append_event(
- self,
- trace_id: str,
- event_type: str,
- payload: Dict[str, Any]
- ) -> int:
- """追加事件,返回 event_id"""
- # 获取 trace 并递增 event_id
- trace = await self.get_trace(trace_id)
- if not trace:
- return 0
- trace.last_event_id += 1
- event_id = trace.last_event_id
- # 更新 trace 的 last_event_id
- await self.update_trace(trace_id, last_event_id=event_id)
- # 创建事件
- event = {
- "event_id": event_id,
- "event": event_type,
- "ts": datetime.now().isoformat(),
- **payload
- }
- # 追加到 events.jsonl
- events_file = self._get_events_file(trace_id)
- with events_file.open('a', encoding='utf-8') as f:
- f.write(json.dumps(event, ensure_ascii=False) + '\n')
- return event_id
|