trace-api.md 8.4 KB

Trace 模块 - Context 管理 + 可视化

执行轨迹记录、存储和可视化 API


架构概览

职责定位agent/execution 模块负责所有 Trace/Step 相关功能

agent/execution/
├── models.py          # Trace/Step 数据模型
├── protocols.py       # TraceStore 存储接口
├── memory_store.py    # 内存存储实现
├── api.py             # RESTful API(懒加载)
└── websocket.py       # WebSocket 实时推送

设计原则

  • 高内聚:所有 Trace 相关代码在一个模块
  • 松耦合:核心模型不依赖 FastAPI
  • 可扩展:易于添加 PostgreSQL/Neo4j 实现

核心模型

Trace - 执行轨迹

一次完整的 LLM 交互(单次调用或 Agent 任务)

from agent.trace import Trace

trace = Trace.create(
    mode="agent",
    task="探索代码库",
    agent_type="researcher"
)

# 字段说明
trace.trace_id        # UUID
trace.mode            # "call" | "agent"
trace.task            # 任务描述
trace.status          # "running" | "completed" | "failed"
trace.total_steps     # Step 总数
trace.total_tokens    # Token 总数
trace.total_cost      # 总成本

Step - 执行步骤

Trace 中的原子操作,形成树结构

from agent.trace import Step

step = Step.create(
    trace_id=trace.trace_id,
    step_type="action",
    sequence=1,
    description="glob_files",
    parent_id=parent_step_id,  # 树结构
    data={
        "tool_name": "glob_files",
        "arguments": {"pattern": "**/*.py"}
    }
)

# Step 类型
# - goal: 目标/计划项
# - thought: 思考/分析
# - action: 工具调用
# - result: 工具结果
# - response: 最终回复
# - memory_read/write: 记忆操作
# - feedback: 人工反馈

存储接口

TraceStore Protocol

定义所有存储实现必须遵守的接口

from agent.trace import TraceStore

class MyCustomStore:
    """实现 TraceStore 接口的所有方法"""

    async def create_trace(self, trace: Trace) -> str: ...
    async def get_trace(self, trace_id: str) -> Optional[Trace]: ...
    async def list_traces(self, ...) -> List[Trace]: ...

    async def add_step(self, step: Step) -> str: ...
    async def get_step(self, step_id: str) -> Optional[Step]: ...
    async def get_trace_steps(self, trace_id: str) -> List[Step]: ...
    async def get_step_children(self, step_id: str) -> List[Step]: ...

FileSystemTraceStore

文件系统存储实现(支持跨进程和持久化)

from agent.execution import FileSystemTraceStore

store = FileSystemTraceStore(base_path=".trace")

# 使用方法
trace_id = await store.create_trace(trace)
trace = await store.get_trace(trace_id)
steps = await store.get_trace_steps(trace_id)

API 服务

启动服务

# 1. 安装依赖
pip install -r requirements.txt

# 2. 启动服务
python api_server.py

# 3. 访问 API 文档
open http://localhost:8000/docs

RESTful 端点

1. 列出 Traces

GET /api/traces?mode=agent&status=running&limit=20

响应

{
  "traces": [
    {
      "trace_id": "abc123",
      "mode": "agent",
      "task": "探索代码库",
      "status": "running",
      "total_steps": 15,
      "total_tokens": 5000,
      "total_cost": 0.05
    }
  ]
}

2. 获取完整树(小型 Trace)

GET /api/traces/{trace_id}/tree

响应:递归 Step 树(完整)

3. 懒加载节点(大型 Trace)

GET /api/traces/{trace_id}/node/{step_id}?expand=true&max_depth=2

参数

  • step_id: Step ID(null 表示根节点)
  • expand: 是否加载子节点
  • max_depth: 递归深度(1-10)

核心算法:简洁的层级懒加载(< 30 行)

async def _build_tree(store, trace_id, step_id, expand, max_depth, current_depth):
    # 1. 获取当前层节点
    if step_id is None:
        nodes = [s for s in steps if s.parent_id is None]
    else:
        nodes = await store.get_step_children(step_id)

    # 2. 构建响应
    result = []
    for step in nodes:
        node_dict = step.to_dict()
        node_dict["children"] = []

        # 3. 递归加载子节点(可选)
        if expand and current_depth < max_depth:
            node_dict["children"] = await _build_tree(...)

        result.append(node_dict)

    return result

WebSocket 推送

实时监听进行中 Trace 的更新

// 连接
ws = new WebSocket(`/api/traces/${trace_id}/watch`)

// 事件
ws.onmessage = (e) => {
  const event = JSON.parse(e.data)

  switch (event.event) {
    case "connected":
      console.log("已连接")
      break
    case "step_added":
      // 新增 Step
      addStepToTree(event.step)
      break
    case "step_updated":
      // Step 状态更新
      updateStep(event.step_id, event.updates)
      break
    case "trace_completed":
      // Trace 完成
      console.log("完成")
      ws.close()
      break
  }
}

使用场景

1. Agent 执行时记录 Trace

from agent import AgentRunner
from agent.execution import FileSystemTraceStore

# 初始化
store = FileSystemTraceStore(base_path=".trace")
runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)

# 执行 Agent(自动记录 Trace)
async for event in runner.run(task="探索代码库"):
    print(event)

# 查询 Trace
traces = await store.list_traces(mode="agent", limit=10)
steps = await store.get_trace_steps(traces[0].trace_id)

2. 前端可视化(小型 Trace)

// 一次性加载完整树
const response = await fetch(`/api/traces/${traceId}/tree`)
const { root_steps } = await response.json()

// 渲染树
renderTree(root_steps)

3. 前端可视化(大型 Trace)

// 懒加载:只加载根节点
const response = await fetch(`/api/traces/${traceId}/node/null?expand=false`)
const { children } = await response.json()

// 用户点击展开时
async function expandNode(stepId) {
  const response = await fetch(
    `/api/traces/${traceId}/node/${stepId}?expand=true&max_depth=1`
  )
  const { children } = await response.json()
  return children
}

4. 实时监控进行中的任务

// WebSocket 监听
ws = new WebSocket(`/api/traces/${traceId}/watch`)
ws.onmessage = (e) => {
  const event = JSON.parse(e.data)
  if (event.event === "step_added") {
    // 实时添加新 Step 到 UI
    appendStep(event.step)
  }
}

扩展存储实现

PostgreSQL 实现(未来)

from agent.trace import TraceStore, Trace, Step

class PostgreSQLTraceStore:
    """PostgreSQL 存储实现"""

    def __init__(self, connection_string: str):
        self.pool = create_pool(connection_string)

    async def create_trace(self, trace: Trace) -> str:
        async with self.pool.acquire() as conn:
            await conn.execute(
                "INSERT INTO traces (...) VALUES (...)",
                trace.to_dict()
            )
        return trace.trace_id

    async def get_step_children(self, step_id: str) -> List[Step]:
        # 使用递归 CTE 优化查询
        query = """
        WITH RECURSIVE subtree AS (
          SELECT * FROM steps WHERE parent_id = $1
        )
        SELECT * FROM subtree ORDER BY sequence
        """
        # ...

导入路径(唯一正确方式)

# ✅ 推荐导入
from agent.execution import Trace, Step, StepType, StepStatus
from agent.execution import TraceStore, FileSystemTraceStore

# ✅ 顶层导入(等价)
from agent import Trace, Step, TraceStore

# ❌ 旧导入(已删除,会报错)
from agent.models.trace import Trace  # ModuleNotFoundError
from agent.storage.protocols import TraceStore  # ImportError

性能优化

小型 Trace(< 100 Steps)

  • 推荐:使用 /tree 一次性加载
  • 优点:最少请求数,前端体验最优
  • 缺点:单次响应较大

大型 Trace(> 100 Steps)

  • 推荐:使用 /node/{step_id} 懒加载
  • 优点:按需加载,内存占用小
  • 缺点:需要多次请求

WebSocket vs 轮询

  • 进行中任务:使用 WebSocket(实时推送)
  • 历史任务:使用 RESTful(静态数据)

相关文档