# Trace 模块 - 执行记录存储 > 执行轨迹记录和存储的后端实现 --- ## 架构概览 **职责定位**:`agent/execution` 模块负责所有 Trace/Message 相关功能 ``` agent/execution/ ├── models.py # Trace/Message 数据模型 ├── protocols.py # TraceStore 存储接口 ├── fs_store.py # 文件系统存储实现 ├── api.py # RESTful API └── websocket.py # WebSocket 实时推送 ``` **设计原则**: - **高内聚**:所有 Trace 相关代码在一个模块 - **松耦合**:核心模型不依赖 FastAPI - **可扩展**:易于添加 PostgreSQL 等存储实现 --- ## 核心模型 ### Trace - 执行轨迹 一次完整的 LLM 交互(单次调用或 Agent 任务) ```python trace = Trace.create(mode="agent", task="探索代码库") trace.trace_id # UUID trace.mode # "call" | "agent" trace.task # 任务描述 trace.status # "running" | "completed" | "failed" trace.total_messages # Message 总数 trace.total_tokens # Token 总数 trace.total_cost # 总成本 trace.current_goal_id # 当前焦点 goal ``` **实现**:`agent/execution/models.py:Trace` ### Message - 执行消息 对应 LLM API 消息,加上元数据。通过 `goal_id` 和 `branch_id` 关联 GoalTree 中的目标。 ```python # assistant 消息(模型返回,可能含 text + tool_calls) assistant_msg = Message.create( trace_id=trace.trace_id, role="assistant", goal_id="3", # 内部 ID(纯自增) branch_id=None, # 主线消息 content={"text": "...", "tool_calls": [...]}, ) # 分支内的 tool 消息 tool_msg = Message.create( trace_id=trace.trace_id, role="tool", goal_id="1", # 分支内的 goal ID(分支内独立编号) branch_id="A", # 分支 A tool_call_id="call_abc123", content="工具执行结果", ) ``` **description 字段**(系统自动生成): - `assistant` 消息:优先取 content 中的 text,若无 text 则生成 "tool call: XX, XX" - `tool` 消息:使用 tool name **实现**:`agent/execution/models.py:Message` --- ## 存储接口 ### TraceStore Protocol ```python class TraceStore(Protocol): # Trace 操作 async def create_trace(self, trace: Trace) -> str: ... async def get_trace(self, trace_id: str) -> Optional[Trace]: ... async def update_trace(self, trace_id: str, **updates) -> None: ... async def list_traces(self, ...) -> List[Trace]: ... # GoalTree 操作 async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: ... async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: ... async def add_goal(self, trace_id: str, goal: Goal) -> None: ... async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: ... # Branch 操作(分支独立存储) async def create_branch(self, trace_id: str, branch: BranchContext) -> None: ... async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]: ... async def get_branch_detail(self, trace_id: str, branch_id: str) -> Optional[BranchDetail]: ... async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None: ... async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]: ... # Message 操作 async def add_message(self, message: Message) -> str: ... async def get_message(self, message_id: str) -> Optional[Message]: ... async def get_trace_messages(self, trace_id: str) -> List[Message]: ... async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ... async def get_messages_by_branch(self, trace_id: str, branch_id: str) -> List[Message]: ... async def update_message(self, message_id: str, **updates) -> None: ... # 事件流(WebSocket 断线续传) async def get_events(self, trace_id: str, since_event_id: int) -> List[Dict]: ... async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ... ``` **实现**:`agent/execution/protocols.py` ### FileSystemTraceStore ```python from agent.execution import FileSystemTraceStore store = FileSystemTraceStore(base_path=".trace") ``` **目录结构**: ``` .trace/{trace_id}/ ├── meta.json # Trace 元数据 ├── goal.json # 主线 GoalTree(扁平 JSON,通过 parent_id 构建层级) ├── messages/ # 主线 Messages(每条独立文件) │ ├── {message_id}.json │ └── ... ├── branches/ # 分支数据(独立存储) │ ├── A/ │ │ ├── meta.json # BranchContext 元数据 │ │ ├── goal.json # 分支 A 的 GoalTree │ │ └── messages/ # 分支 A 的 Messages │ └── B/ │ └── ... └── events.jsonl # 事件流(WebSocket 续传) ``` **实现**:`agent/execution/fs_store.py` --- ## REST API 端点 ### 1. 列出 Traces ```http GET /api/traces?mode=agent&status=running&limit=20 ``` ### 2. 获取 Trace + GoalTree + 分支元数据 ```http GET /api/traces/{trace_id} ``` 返回 Trace 元数据、主线 GoalTree(扁平列表,含所有 Goal 及其 stats)、分支元数据(不含分支内部 GoalTree)。 ### 3. 获取 Messages ```http GET /api/traces/{trace_id}/messages?goal_id=3 GET /api/traces/{trace_id}/messages?branch_id=A ``` 返回指定 Goal 或分支关联的所有 Messages(用于查看执行详情)。 ### 4. 获取分支详情(按需加载) ```http GET /api/traces/{trace_id}/branches/{branch_id} ``` 返回分支完整详情,包括分支内的 GoalTree(用于展开分支时加载)。 **实现**:`agent/execution/api.py` --- ## WebSocket 事件 ### 连接 ``` ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0 ``` ### 事件类型 | 事件 | 触发时机 | payload | |------|---------|---------| | `connected` | WebSocket 连接成功 | trace_id, current_event_id, goal_tree, branches(分支元数据) | | `goal_added` | 新增 Goal | goal 完整数据(含 stats, parent_id, branch_id, type) | | `goal_updated` | Goal 状态变化(含级联完成) | goal_id, updates, affected_goals(含 cumulative_stats) | | `message_added` | 新 Message | message 数据(含 goal_id, branch_id),affected_goals,affected_branches | | `branch_started` | 分支开始探索 | explore_start_id, branch 元数据 | | `branch_goal_added` | 分支内新增 Goal | branch_id, goal | | `branch_completed` | 分支完成 | explore_start_id, branch_id, summary, cumulative_stats, last_message | | `explore_completed` | 所有分支完成 | explore_start_id, merge_summary | | `trace_completed` | 执行完成 | 统计信息 | ### Stats 更新逻辑 每次添加 Message 时,后端执行: 1. 更新对应 Goal 的 `self_stats` 2. 沿 `parent_id` 链向上更新所有祖先的 `cumulative_stats` 3. 如果是分支内 Message(branch_id 非空),还需更新所属 Branch 的 `cumulative_stats` 4. 在 `message_added` 事件的 `affected_goals` 和 `affected_branches` 中推送所有受影响的最新 stats **分支统计**: - Branch 的 `cumulative_stats`:分支内所有 Goals 的累计统计 - explore_start Goal 的 `cumulative_stats`:所有关联分支的 cumulative_stats 之和 **实现**:`agent/execution/websocket.py` --- ## 使用场景 ### Agent 执行时记录 ```python from agent import AgentRunner from agent.execution import FileSystemTraceStore store = FileSystemTraceStore(base_path=".trace") runner = AgentRunner(trace_store=store, llm_call=my_llm_fn) async for event in runner.run(task="探索代码库"): print(event) # Trace 或 Message ``` --- ## 相关文档 - [frontend/API.md](../frontend/API.md) - 前端对接 API 文档 - [docs/context-management.md](./context-management.md) - Context 管理完整设计 - [agent/goal/models.py](../agent/goal/models.py) - GoalTree 模型定义