| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620 |
- """
- FileSystem Trace Store - 文件系统存储实现
- 用于跨进程数据共享,数据持久化到 .trace/ 目录
- 目录结构:
- .trace/{trace_id}/
- ├── meta.json # Trace 元数据
- ├── goal.json # 主线 GoalTree(扁平 JSON,通过 parent_id 构建层级)
- ├── messages/ # 主线 Messages(每条独立文件)
- │ ├── {message_id}.json
- │ └── ...
- ├── branches/ # 分支数据(独立存储)
- │ ├── A/
- │ │ ├── meta.json # BranchContext 元数据
- │ │ ├── goal.json # 分支 A 的 GoalTree
- │ │ └── messages/ # 分支 A 的 Messages
- │ └── B/
- │ └── ...
- └── events.jsonl # 事件流(WebSocket 续传)
- """
- import json
- import os
- from pathlib import Path
- from typing import Dict, List, Optional, Any
- from datetime import datetime
- from agent.execution.models import Trace, Message
- from agent.goal.models import GoalTree, Goal, BranchContext, GoalStats
- class FileSystemTraceStore:
- """文件系统 Trace 存储"""
- def __init__(self, base_path: str = ".trace"):
- self.base_path = Path(base_path)
- self.base_path.mkdir(exist_ok=True)
- def _get_trace_dir(self, trace_id: str) -> Path:
- """获取 trace 目录"""
- return self.base_path / trace_id
- def _get_meta_file(self, trace_id: str) -> Path:
- """获取 meta.json 文件路径"""
- return self._get_trace_dir(trace_id) / "meta.json"
- def _get_goal_file(self, trace_id: str) -> Path:
- """获取 goal.json 文件路径"""
- return self._get_trace_dir(trace_id) / "goal.json"
- def _get_messages_dir(self, trace_id: str) -> Path:
- """获取 messages 目录"""
- return self._get_trace_dir(trace_id) / "messages"
- def _get_message_file(self, trace_id: str, message_id: str) -> Path:
- """获取 message 文件路径"""
- return self._get_messages_dir(trace_id) / f"{message_id}.json"
- def _get_branches_dir(self, trace_id: str) -> Path:
- """获取 branches 目录"""
- return self._get_trace_dir(trace_id) / "branches"
- def _get_branch_dir(self, trace_id: str, branch_id: str) -> Path:
- """获取分支目录"""
- return self._get_branches_dir(trace_id) / branch_id
- def _get_branch_meta_file(self, trace_id: str, branch_id: str) -> Path:
- """获取分支 meta.json 文件路径"""
- return self._get_branch_dir(trace_id, branch_id) / "meta.json"
- def _get_branch_goal_file(self, trace_id: str, branch_id: str) -> Path:
- """获取分支 goal.json 文件路径"""
- return self._get_branch_dir(trace_id, branch_id) / "goal.json"
- def _get_branch_messages_dir(self, trace_id: str, branch_id: str) -> Path:
- """获取分支 messages 目录"""
- return self._get_branch_dir(trace_id, branch_id) / "messages"
- def _get_events_file(self, trace_id: str) -> Path:
- """获取 events.jsonl 文件路径"""
- return self._get_trace_dir(trace_id) / "events.jsonl"
- # ===== Trace 操作 =====
- async def create_trace(self, trace: Trace) -> str:
- """创建新的 Trace"""
- trace_dir = self._get_trace_dir(trace.trace_id)
- trace_dir.mkdir(exist_ok=True)
- # 创建 messages 目录
- messages_dir = self._get_messages_dir(trace.trace_id)
- messages_dir.mkdir(exist_ok=True)
- # 创建 branches 目录
- branches_dir = self._get_branches_dir(trace.trace_id)
- branches_dir.mkdir(exist_ok=True)
- # 写入 meta.json
- meta_file = self._get_meta_file(trace.trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
- # 创建空的 events.jsonl
- events_file = self._get_events_file(trace.trace_id)
- events_file.touch()
- return trace.trace_id
- async def get_trace(self, trace_id: str) -> Optional[Trace]:
- """获取 Trace"""
- meta_file = self._get_meta_file(trace_id)
- if not meta_file.exists():
- return None
- data = json.loads(meta_file.read_text())
- # 解析 datetime 字段
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- if data.get("completed_at"):
- data["completed_at"] = datetime.fromisoformat(data["completed_at"])
- return Trace(**data)
- async def update_trace(self, trace_id: str, **updates) -> None:
- """更新 Trace"""
- trace = await self.get_trace(trace_id)
- if not trace:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(trace, key):
- setattr(trace, key, value)
- # 写回文件
- meta_file = self._get_meta_file(trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
- async def list_traces(
- self,
- mode: Optional[str] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- status: Optional[str] = None,
- limit: int = 50
- ) -> List[Trace]:
- """列出 Traces"""
- traces = []
- if not self.base_path.exists():
- return []
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- meta_file = trace_dir / "meta.json"
- if not meta_file.exists():
- continue
- try:
- data = json.loads(meta_file.read_text())
- # 过滤
- if mode and data.get("mode") != mode:
- continue
- if agent_type and data.get("agent_type") != agent_type:
- continue
- if uid and data.get("uid") != uid:
- continue
- if status and data.get("status") != status:
- continue
- # 解析 datetime
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- if data.get("completed_at"):
- data["completed_at"] = datetime.fromisoformat(data["completed_at"])
- traces.append(Trace(**data))
- except Exception:
- continue
- # 排序(最新的在前)
- traces.sort(key=lambda t: t.created_at, reverse=True)
- return traces[:limit]
- # ===== GoalTree 操作 =====
- async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]:
- """获取 GoalTree"""
- goal_file = self._get_goal_file(trace_id)
- if not goal_file.exists():
- return None
- try:
- data = json.loads(goal_file.read_text())
- return GoalTree.from_dict(data)
- except Exception:
- return None
- async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None:
- """更新完整 GoalTree"""
- goal_file = self._get_goal_file(trace_id)
- goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False))
- async def add_goal(self, trace_id: str, goal: Goal) -> None:
- """添加 Goal 到 GoalTree"""
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return
- tree.goals.append(goal)
- await self.update_goal_tree(trace_id, tree)
- async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None:
- """更新 Goal 字段"""
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return
- goal = tree.find(goal_id)
- if not goal:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(goal, key):
- # 特殊处理 stats 字段(可能是 dict)
- if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict):
- value = GoalStats.from_dict(value)
- setattr(goal, key, value)
- await self.update_goal_tree(trace_id, tree)
- # ===== Branch 操作 =====
- async def create_branch(self, trace_id: str, branch: BranchContext) -> None:
- """创建分支上下文"""
- branch_dir = self._get_branch_dir(trace_id, branch.id)
- branch_dir.mkdir(parents=True, exist_ok=True)
- # 创建分支 messages 目录
- messages_dir = self._get_branch_messages_dir(trace_id, branch.id)
- messages_dir.mkdir(exist_ok=True)
- # 写入 meta.json
- meta_file = self._get_branch_meta_file(trace_id, branch.id)
- meta_file.write_text(json.dumps(branch.to_dict(), indent=2, ensure_ascii=False))
- async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]:
- """获取分支元数据"""
- meta_file = self._get_branch_meta_file(trace_id, branch_id)
- if not meta_file.exists():
- return None
- try:
- data = json.loads(meta_file.read_text())
- return BranchContext.from_dict(data)
- except Exception:
- return None
- async def get_branch_goal_tree(self, trace_id: str, branch_id: str) -> Optional[GoalTree]:
- """获取分支的 GoalTree"""
- goal_file = self._get_branch_goal_file(trace_id, branch_id)
- if not goal_file.exists():
- return None
- try:
- data = json.loads(goal_file.read_text())
- return GoalTree.from_dict(data)
- except Exception:
- return None
- async def update_branch_goal_tree(self, trace_id: str, branch_id: str, tree: GoalTree) -> None:
- """更新分支的 GoalTree"""
- goal_file = self._get_branch_goal_file(trace_id, branch_id)
- goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False))
- async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None:
- """更新分支元数据"""
- branch = await self.get_branch(trace_id, branch_id)
- if not branch:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(branch, key):
- # 特殊处理 stats 字段
- if key == "cumulative_stats" and isinstance(value, dict):
- value = GoalStats.from_dict(value)
- setattr(branch, key, value)
- # 写回文件
- meta_file = self._get_branch_meta_file(trace_id, branch_id)
- meta_file.write_text(json.dumps(branch.to_dict(), indent=2, ensure_ascii=False))
- async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]:
- """列出所有分支元数据"""
- branches_dir = self._get_branches_dir(trace_id)
- if not branches_dir.exists():
- return {}
- branches = {}
- for branch_dir in branches_dir.iterdir():
- if not branch_dir.is_dir():
- continue
- meta_file = branch_dir / "meta.json"
- if not meta_file.exists():
- continue
- try:
- data = json.loads(meta_file.read_text())
- branch = BranchContext.from_dict(data)
- branches[branch.id] = branch
- except Exception:
- continue
- return branches
- # ===== Message 操作 =====
- async def add_message(self, message: Message) -> str:
- """
- 添加 Message
- 自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats)
- """
- trace_id = message.trace_id
- branch_id = message.branch_id
- # 1. 写入 message 文件
- if branch_id:
- # 分支消息
- messages_dir = self._get_branch_messages_dir(trace_id, branch_id)
- else:
- # 主线消息
- messages_dir = self._get_messages_dir(trace_id)
- message_file = messages_dir / f"{message.message_id}.json"
- message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False))
- # 2. 更新 trace 统计
- trace = await self.get_trace(trace_id)
- if trace:
- trace.total_messages += 1
- trace.last_sequence = max(trace.last_sequence, message.sequence)
- if message.tokens:
- trace.total_tokens += message.tokens
- if message.cost:
- trace.total_cost += message.cost
- if message.duration_ms:
- trace.total_duration_ms += message.duration_ms
- # 更新 Trace(不要传递 trace_id,它已经在方法参数中)
- await self.update_trace(
- trace_id,
- total_messages=trace.total_messages,
- last_sequence=trace.last_sequence,
- total_tokens=trace.total_tokens,
- total_cost=trace.total_cost,
- total_duration_ms=trace.total_duration_ms
- )
- # 3. 更新 Goal stats
- await self._update_goal_stats(trace_id, message)
- # 4. 追加 message_added 事件
- affected_goals = await self._get_affected_goals(trace_id, message)
- await self.append_event(trace_id, "message_added", {
- "message": message.to_dict(),
- "affected_goals": affected_goals
- })
- return message.message_id
- async def _update_goal_stats(self, trace_id: str, message: Message) -> None:
- """更新 Goal 的 self_stats 和祖先的 cumulative_stats"""
- # 确定使用主线还是分支的 GoalTree
- if message.branch_id:
- tree = await self.get_branch_goal_tree(trace_id, message.branch_id)
- else:
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return
- # 找到关联的 Goal
- goal = tree.find(message.goal_id)
- if not goal:
- return
- # 更新自身 self_stats
- goal.self_stats.message_count += 1
- if message.tokens:
- goal.self_stats.total_tokens += message.tokens
- if message.cost:
- goal.self_stats.total_cost += message.cost
- # TODO: 更新 preview(工具调用摘要)
- # 更新自身 cumulative_stats
- goal.cumulative_stats.message_count += 1
- if message.tokens:
- goal.cumulative_stats.total_tokens += message.tokens
- if message.cost:
- goal.cumulative_stats.total_cost += message.cost
- # 沿祖先链向上更新 cumulative_stats
- current_goal = goal
- while current_goal.parent_id:
- parent = tree.find(current_goal.parent_id)
- if not parent:
- break
- parent.cumulative_stats.message_count += 1
- if message.tokens:
- parent.cumulative_stats.total_tokens += message.tokens
- if message.cost:
- parent.cumulative_stats.total_cost += message.cost
- current_goal = parent
- # 保存更新后的 tree
- if message.branch_id:
- await self.update_branch_goal_tree(trace_id, message.branch_id, tree)
- else:
- await self.update_goal_tree(trace_id, tree)
- async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]:
- """获取受影响的 Goals(自身 + 所有祖先)"""
- if message.branch_id:
- tree = await self.get_branch_goal_tree(trace_id, message.branch_id)
- else:
- tree = await self.get_goal_tree(trace_id)
- if not tree:
- return []
- goal = tree.find(message.goal_id)
- if not goal:
- return []
- affected = []
- # 添加自身(包含 self_stats 和 cumulative_stats)
- affected.append({
- "goal_id": goal.id,
- "self_stats": goal.self_stats.to_dict(),
- "cumulative_stats": goal.cumulative_stats.to_dict()
- })
- # 添加所有祖先(仅 cumulative_stats)
- current_goal = goal
- while current_goal.parent_id:
- parent = tree.find(current_goal.parent_id)
- if not parent:
- break
- affected.append({
- "goal_id": parent.id,
- "cumulative_stats": parent.cumulative_stats.to_dict()
- })
- current_goal = parent
- return affected
- async def get_message(self, message_id: str) -> Optional[Message]:
- """获取 Message(扫描所有 trace)"""
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- # 检查主线 messages
- message_file = trace_dir / "messages" / f"{message_id}.json"
- if message_file.exists():
- try:
- data = json.loads(message_file.read_text())
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- return Message(**data)
- except Exception:
- pass
- # 检查分支 messages
- branches_dir = trace_dir / "branches"
- if branches_dir.exists():
- for branch_dir in branches_dir.iterdir():
- if not branch_dir.is_dir():
- continue
- message_file = branch_dir / "messages" / f"{message_id}.json"
- if message_file.exists():
- try:
- data = json.loads(message_file.read_text())
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- return Message(**data)
- except Exception:
- pass
- return None
- async def get_trace_messages(
- self,
- trace_id: str,
- branch_id: Optional[str] = None
- ) -> List[Message]:
- """获取 Trace 的所有 Messages"""
- if branch_id:
- messages_dir = self._get_branch_messages_dir(trace_id, branch_id)
- else:
- messages_dir = self._get_messages_dir(trace_id)
- if not messages_dir.exists():
- return []
- messages = []
- for message_file in messages_dir.glob("*.json"):
- try:
- data = json.loads(message_file.read_text())
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- messages.append(Message(**data))
- except Exception:
- continue
- # 按 sequence 排序
- messages.sort(key=lambda m: m.sequence)
- return messages
- async def get_messages_by_goal(
- self,
- trace_id: str,
- goal_id: str,
- branch_id: Optional[str] = None
- ) -> List[Message]:
- """获取指定 Goal 关联的所有 Messages"""
- all_messages = await self.get_trace_messages(trace_id, branch_id)
- return [m for m in all_messages if m.goal_id == goal_id]
- async def update_message(self, message_id: str, **updates) -> None:
- """更新 Message 字段"""
- message = await self.get_message(message_id)
- if not message:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(message, key):
- setattr(message, key, value)
- # 确定文件路径
- if message.branch_id:
- messages_dir = self._get_branch_messages_dir(message.trace_id, message.branch_id)
- else:
- messages_dir = self._get_messages_dir(message.trace_id)
- message_file = messages_dir / f"{message_id}.json"
- message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False))
- # ===== 事件流操作(用于 WebSocket 断线续传)=====
- async def get_events(
- self,
- trace_id: str,
- since_event_id: int = 0
- ) -> List[Dict[str, Any]]:
- """获取事件流"""
- events_file = self._get_events_file(trace_id)
- if not events_file.exists():
- return []
- events = []
- with events_file.open('r') as f:
- for line in f:
- try:
- event = json.loads(line.strip())
- if event.get("event_id", 0) > since_event_id:
- events.append(event)
- except Exception:
- continue
- return events
- async def append_event(
- self,
- trace_id: str,
- event_type: str,
- payload: Dict[str, Any]
- ) -> int:
- """追加事件,返回 event_id"""
- # 获取 trace 并递增 event_id
- trace = await self.get_trace(trace_id)
- if not trace:
- return 0
- trace.last_event_id += 1
- event_id = trace.last_event_id
- # 更新 trace 的 last_event_id
- await self.update_trace(trace_id, last_event_id=event_id)
- # 创建事件
- event = {
- "event_id": event_id,
- "event": event_type,
- "ts": datetime.now().isoformat(),
- **payload
- }
- # 追加到 events.jsonl
- events_file = self._get_events_file(trace_id)
- with events_file.open('a') as f:
- f.write(json.dumps(event, ensure_ascii=False) + '\n')
- return event_id
|