# Trace 模块 - Context 管理 + 可视化 > 执行轨迹记录、存储和可视化 API --- ## 架构概览 **职责定位**:`agent/trace` 模块负责所有 Trace/Step 相关功能 ``` agent/trace/ ├── models.py # Trace/Step 数据模型 ├── protocols.py # TraceStore 存储接口 ├── memory_store.py # 内存存储实现 ├── api.py # RESTful API(懒加载) └── websocket.py # WebSocket 实时推送 ``` **设计原则**: - ✅ **高内聚**:所有 Trace 相关代码在一个模块 - ✅ **松耦合**:核心模型不依赖 FastAPI - ✅ **可扩展**:易于添加 PostgreSQL/Neo4j 实现 --- ## 核心模型 ### Trace - 执行轨迹 一次完整的 LLM 交互(单次调用或 Agent 任务) ```python from agent.trace import Trace trace = Trace.create( mode="agent", task="探索代码库", agent_type="researcher" ) # 字段说明 trace.trace_id # UUID trace.mode # "call" | "agent" trace.task # 任务描述 trace.status # "running" | "completed" | "failed" trace.total_steps # Step 总数 trace.total_tokens # Token 总数 trace.total_cost # 总成本 ``` ### Step - 执行步骤 Trace 中的原子操作,形成树结构 ```python from agent.trace import Step step = Step.create( trace_id=trace.trace_id, step_type="action", sequence=1, description="glob_files", parent_id=parent_step_id, # 树结构 data={ "tool_name": "glob_files", "arguments": {"pattern": "**/*.py"} } ) # Step 类型 # - goal: 目标/计划项 # - thought: 思考/分析 # - action: 工具调用 # - result: 工具结果 # - response: 最终回复 # - memory_read/write: 记忆操作 # - feedback: 人工反馈 ``` --- ## 存储接口 ### TraceStore Protocol 定义所有存储实现必须遵守的接口 ```python from agent.trace import TraceStore class MyCustomStore: """实现 TraceStore 接口的所有方法""" async def create_trace(self, trace: Trace) -> str: ... async def get_trace(self, trace_id: str) -> Optional[Trace]: ... async def list_traces(self, ...) -> List[Trace]: ... async def add_step(self, step: Step) -> str: ... async def get_step(self, step_id: str) -> Optional[Step]: ... async def get_trace_steps(self, trace_id: str) -> List[Step]: ... async def get_step_children(self, step_id: str) -> List[Step]: ... ``` ### MemoryTraceStore 内存存储实现(用于开发和测试) ```python from agent.trace import MemoryTraceStore store = MemoryTraceStore() # 使用方法 trace_id = await store.create_trace(trace) trace = await store.get_trace(trace_id) steps = await store.get_trace_steps(trace_id) ``` --- ## API 服务 ### 启动服务 ```bash # 1. 安装依赖 pip install -r requirements.txt # 2. 启动服务 python api_server.py # 3. 访问 API 文档 open http://localhost:8000/docs ``` ### RESTful 端点 #### 1. 列出 Traces ```http GET /api/traces?mode=agent&status=running&limit=20 ``` **响应**: ```json { "traces": [ { "trace_id": "abc123", "mode": "agent", "task": "探索代码库", "status": "running", "total_steps": 15, "total_tokens": 5000, "total_cost": 0.05 } ] } ``` #### 2. 获取完整树(小型 Trace) ```http GET /api/traces/{trace_id}/tree ``` **响应**:递归 Step 树(完整) #### 3. 懒加载节点(大型 Trace) ```http GET /api/traces/{trace_id}/node/{step_id}?expand=true&max_depth=2 ``` **参数**: - `step_id`: Step ID(`null` 表示根节点) - `expand`: 是否加载子节点 - `max_depth`: 递归深度(1-10) **核心算法**:简洁的层级懒加载(< 30 行) ```python async def _build_tree(store, trace_id, step_id, expand, max_depth, current_depth): # 1. 获取当前层节点 if step_id is None: nodes = [s for s in steps if s.parent_id is None] else: nodes = await store.get_step_children(step_id) # 2. 构建响应 result = [] for step in nodes: node_dict = step.to_dict() node_dict["children"] = [] # 3. 递归加载子节点(可选) if expand and current_depth < max_depth: node_dict["children"] = await _build_tree(...) result.append(node_dict) return result ``` ### WebSocket 推送 实时监听进行中 Trace 的更新 ```javascript // 连接 ws = new WebSocket(`/api/traces/${trace_id}/watch`) // 事件 ws.onmessage = (e) => { const event = JSON.parse(e.data) switch (event.event) { case "connected": console.log("已连接") break case "step_added": // 新增 Step addStepToTree(event.step) break case "step_updated": // Step 状态更新 updateStep(event.step_id, event.updates) break case "trace_completed": // Trace 完成 console.log("完成") ws.close() break } } ``` --- ## 使用场景 ### 1. Agent 执行时记录 Trace ```python from agent import AgentRunner from agent.trace import MemoryTraceStore # 初始化 store = MemoryTraceStore() runner = AgentRunner(trace_store=store, llm_call=my_llm_fn) # 执行 Agent(自动记录 Trace) async for event in runner.run(task="探索代码库"): print(event) # 查询 Trace traces = await store.list_traces(mode="agent", limit=10) steps = await store.get_trace_steps(traces[0].trace_id) ``` ### 2. 前端可视化(小型 Trace) ```javascript // 一次性加载完整树 const response = await fetch(`/api/traces/${traceId}/tree`) const { root_steps } = await response.json() // 渲染树 renderTree(root_steps) ``` ### 3. 前端可视化(大型 Trace) ```javascript // 懒加载:只加载根节点 const response = await fetch(`/api/traces/${traceId}/node/null?expand=false`) const { children } = await response.json() // 用户点击展开时 async function expandNode(stepId) { const response = await fetch( `/api/traces/${traceId}/node/${stepId}?expand=true&max_depth=1` ) const { children } = await response.json() return children } ``` ### 4. 实时监控进行中的任务 ```javascript // WebSocket 监听 ws = new WebSocket(`/api/traces/${traceId}/watch`) ws.onmessage = (e) => { const event = JSON.parse(e.data) if (event.event === "step_added") { // 实时添加新 Step 到 UI appendStep(event.step) } } ``` --- ## 扩展存储实现 ### PostgreSQL 实现(未来) ```python from agent.trace import TraceStore, Trace, Step class PostgreSQLTraceStore: """PostgreSQL 存储实现""" def __init__(self, connection_string: str): self.pool = create_pool(connection_string) async def create_trace(self, trace: Trace) -> str: async with self.pool.acquire() as conn: await conn.execute( "INSERT INTO traces (...) VALUES (...)", trace.to_dict() ) return trace.trace_id async def get_step_children(self, step_id: str) -> List[Step]: # 使用递归 CTE 优化查询 query = """ WITH RECURSIVE subtree AS ( SELECT * FROM steps WHERE parent_id = $1 ) SELECT * FROM subtree ORDER BY sequence """ # ... ``` --- ## 导入路径(唯一正确方式) ```python # ✅ 推荐导入 from agent.trace import Trace, Step, StepType, Status from agent.trace import TraceStore, MemoryTraceStore # ✅ 顶层导入(等价) from agent import Trace, Step, TraceStore # ❌ 旧导入(已删除,会报错) from agent.models.trace import Trace # ModuleNotFoundError from agent.storage.protocols import TraceStore # ImportError ``` --- ## 性能优化 ### 小型 Trace(< 100 Steps) - **推荐**:使用 `/tree` 一次性加载 - **优点**:最少请求数,前端体验最优 - **缺点**:单次响应较大 ### 大型 Trace(> 100 Steps) - **推荐**:使用 `/node/{step_id}` 懒加载 - **优点**:按需加载,内存占用小 - **缺点**:需要多次请求 ### WebSocket vs 轮询 - **进行中任务**:使用 WebSocket(实时推送) - **历史任务**:使用 RESTful(静态数据) --- ## 相关文档 - [agent/trace/models.py](../agent/trace/models.py) - Trace/Step 模型定义 - [agent/trace/api.py](../agent/trace/api.py) - RESTful API 实现 - [api_server.py](../api_server.py) - FastAPI 应用入口 - [requirements.txt](../requirements.txt) - FastAPI 依赖