| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- """
- FileSystem Trace Store - 文件系统存储实现
- 用于跨进程数据共享,数据持久化到 .trace/ 目录
- """
- import json
- import os
- from pathlib import Path
- from typing import Dict, List, Optional, Any
- from datetime import datetime
- from agent.execution.models import Trace, Step
- class FileSystemTraceStore:
- """文件系统 Trace 存储
- 目录结构:
- .trace/
- ├── trace-001/
- │ ├── meta.json # Trace 元数据
- │ ├── steps/
- │ │ ├── step-1.json # 每个 step 独立文件
- │ │ ├── step-2.json
- │ │ └── step-3.json
- │ └── events.jsonl # 事件流(WebSocket 续传)
- └── trace-002/
- └── ...
- """
- def __init__(self, base_path: str = ".trace"):
- self.base_path = Path(base_path)
- self.base_path.mkdir(exist_ok=True)
- def _get_trace_dir(self, trace_id: str) -> Path:
- """获取 trace 目录"""
- return self.base_path / trace_id
- def _get_meta_file(self, trace_id: str) -> Path:
- """获取 meta.json 文件路径"""
- return self._get_trace_dir(trace_id) / "meta.json"
- def _get_steps_dir(self, trace_id: str) -> Path:
- """获取 steps 目录"""
- return self._get_trace_dir(trace_id) / "steps"
- def _get_step_file(self, trace_id: str, step_id: str) -> Path:
- """获取 step 文件路径"""
- return self._get_steps_dir(trace_id) / f"{step_id}.json"
- def _get_events_file(self, trace_id: str) -> Path:
- """获取 events.jsonl 文件路径"""
- return self._get_trace_dir(trace_id) / "events.jsonl"
- # ===== Trace 操作 =====
- async def create_trace(self, trace: Trace) -> str:
- """创建新的 Trace"""
- trace_dir = self._get_trace_dir(trace.trace_id)
- trace_dir.mkdir(exist_ok=True)
- # 创建 steps 目录
- steps_dir = self._get_steps_dir(trace.trace_id)
- steps_dir.mkdir(exist_ok=True)
- # 写入 meta.json
- meta_file = self._get_meta_file(trace.trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
- # 创建空的 events.jsonl
- events_file = self._get_events_file(trace.trace_id)
- events_file.touch()
- return trace.trace_id
- async def get_trace(self, trace_id: str) -> Optional[Trace]:
- """获取 Trace"""
- meta_file = self._get_meta_file(trace_id)
- if not meta_file.exists():
- return None
- data = json.loads(meta_file.read_text())
- # 解析 datetime 字段
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- if data.get("completed_at"):
- data["completed_at"] = datetime.fromisoformat(data["completed_at"])
- return Trace(**data)
- async def update_trace(self, trace_id: str, **updates) -> None:
- """更新 Trace"""
- trace = await self.get_trace(trace_id)
- if not trace:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(trace, key):
- setattr(trace, key, value)
- # 写回文件
- meta_file = self._get_meta_file(trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
- async def list_traces(
- self,
- mode: Optional[str] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- status: Optional[str] = None,
- limit: int = 50
- ) -> List[Trace]:
- """列出 Traces"""
- traces = []
- # 遍历所有 trace 目录
- if not self.base_path.exists():
- return []
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- meta_file = trace_dir / "meta.json"
- if not meta_file.exists():
- continue
- try:
- data = json.loads(meta_file.read_text())
- # 过滤
- if mode and data.get("mode") != mode:
- continue
- if agent_type and data.get("agent_type") != agent_type:
- continue
- if uid and data.get("uid") != uid:
- continue
- if status and data.get("status") != status:
- continue
- # 解析 datetime
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- if data.get("completed_at"):
- data["completed_at"] = datetime.fromisoformat(data["completed_at"])
- traces.append(Trace(**data))
- except Exception:
- # 跳过损坏的文件
- continue
- # 排序(最新的在前)
- traces.sort(key=lambda t: t.created_at, reverse=True)
- return traces[:limit]
- # ===== Step 操作 =====
- async def add_step(self, step: Step) -> str:
- """添加 Step"""
- trace_id = step.trace_id
- # 1. 写入 step 文件
- step_file = self._get_step_file(trace_id, step.step_id)
- step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
- # 2. 更新 trace 的统计信息
- trace = await self.get_trace(trace_id)
- if trace:
- trace.total_steps += 1
- trace.last_sequence = max(trace.last_sequence, step.sequence)
- # 累加 tokens 和 cost
- if step.tokens:
- trace.total_tokens += step.tokens
- if step.cost:
- trace.total_cost += step.cost
- if step.duration_ms:
- trace.total_duration_ms += step.duration_ms
- # 写回 meta.json
- meta_file = self._get_meta_file(trace_id)
- meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
- # 3. 更新父节点的 UI 字段
- if step.parent_id:
- parent = await self.get_step(step.parent_id)
- if parent:
- parent.has_children = True
- parent.children_count += 1
- # 写回父节点文件
- parent_file = self._get_step_file(trace_id, step.parent_id)
- parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False))
- # 4. 追加 step_added 事件
- await self.append_event(trace_id, "step_added", {
- "step_id": step.step_id,
- "step_type": step.step_type,
- "parent_id": step.parent_id,
- "sequence": step.sequence,
- })
- return step.step_id
- async def get_step(self, step_id: str) -> Optional[Step]:
- """获取 Step(扫描所有 trace)"""
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- step_file = trace_dir / "steps" / f"{step_id}.json"
- if step_file.exists():
- try:
- data = json.loads(step_file.read_text())
- # 解析 datetime
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- return Step(**data)
- except Exception:
- continue
- return None
- async def get_trace_steps(self, trace_id: str) -> List[Step]:
- """获取 Trace 的所有 Steps"""
- steps_dir = self._get_steps_dir(trace_id)
- if not steps_dir.exists():
- return []
- steps = []
- for step_file in steps_dir.glob("*.json"):
- try:
- data = json.loads(step_file.read_text())
- # 解析 datetime
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- steps.append(Step(**data))
- except Exception:
- # 跳过损坏的文件
- continue
- # 按 sequence 排序
- steps.sort(key=lambda s: s.sequence)
- return steps
- async def get_step_children(self, step_id: str) -> List[Step]:
- """获取 Step 的子节点"""
- # 需要扫描所有 trace 的所有 steps
- # TODO: 可以优化为维护索引文件
- children = []
- for trace_dir in self.base_path.iterdir():
- if not trace_dir.is_dir():
- continue
- steps_dir = trace_dir / "steps"
- if not steps_dir.exists():
- continue
- for step_file in steps_dir.glob("*.json"):
- try:
- data = json.loads(step_file.read_text())
- if data.get("parent_id") == step_id:
- # 解析 datetime
- if data.get("created_at"):
- data["created_at"] = datetime.fromisoformat(data["created_at"])
- children.append(Step(**data))
- except Exception:
- continue
- # 按 sequence 排序
- children.sort(key=lambda s: s.sequence)
- return children
- async def update_step(self, step_id: str, **updates) -> None:
- """更新 Step 字段"""
- step = await self.get_step(step_id)
- if not step:
- return
- # 更新字段
- for key, value in updates.items():
- if hasattr(step, key):
- setattr(step, key, value)
- # 写回文件
- step_file = self._get_step_file(step.trace_id, step_id)
- step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
- # ===== 事件流操作(用于 WebSocket 断线续传)=====
- async def get_events(
- self,
- trace_id: str,
- since_event_id: int = 0
- ) -> List[Dict[str, Any]]:
- """获取事件流"""
- events_file = self._get_events_file(trace_id)
- if not events_file.exists():
- return []
- events = []
- with events_file.open('r') as f:
- for line in f:
- try:
- event = json.loads(line.strip())
- if event.get("event_id", 0) > since_event_id:
- events.append(event)
- except Exception:
- continue
- return events
- async def append_event(
- self,
- trace_id: str,
- event_type: str,
- payload: Dict[str, Any]
- ) -> int:
- """追加事件,返回 event_id"""
- # 获取 trace 并递增 event_id
- trace = await self.get_trace(trace_id)
- if not trace:
- return 0
- trace.last_event_id += 1
- event_id = trace.last_event_id
- # 更新 trace 的 last_event_id
- await self.update_trace(trace_id, last_event_id=event_id)
- # 创建事件
- event = {
- "event_id": event_id,
- "event": event_type,
- "ts": datetime.now().isoformat(),
- **payload
- }
- # 追加到 events.jsonl
- events_file = self._get_events_file(trace_id)
- with events_file.open('a') as f:
- f.write(json.dumps(event, ensure_ascii=False) + '\n')
- return event_id
|