执行轨迹记录、存储和可视化 API
职责定位:agent/trace 模块负责所有 Trace/Step 相关功能
agent/trace/
├── models.py # Trace/Step 数据模型
├── protocols.py # TraceStore 存储接口
├── memory_store.py # 内存存储实现
├── api.py # RESTful API(懒加载)
└── websocket.py # WebSocket 实时推送
设计原则:
一次完整的 LLM 交互(单次调用或 Agent 任务)
from agent.trace import Trace
trace = Trace.create(
mode="agent",
task="探索代码库",
agent_type="researcher"
)
# 字段说明
trace.trace_id # UUID
trace.mode # "call" | "agent"
trace.task # 任务描述
trace.status # "running" | "completed" | "failed"
trace.total_steps # Step 总数
trace.total_tokens # Token 总数
trace.total_cost # 总成本
Trace 中的原子操作,形成树结构
from agent.trace import Step
step = Step.create(
trace_id=trace.trace_id,
step_type="action",
sequence=1,
description="glob_files",
parent_id=parent_step_id, # 树结构
data={
"tool_name": "glob_files",
"arguments": {"pattern": "**/*.py"}
}
)
# Step 类型
# - goal: 目标/计划项
# - thought: 思考/分析
# - action: 工具调用
# - result: 工具结果
# - response: 最终回复
# - memory_read/write: 记忆操作
# - feedback: 人工反馈
定义所有存储实现必须遵守的接口
from agent.trace import TraceStore
class MyCustomStore:
"""实现 TraceStore 接口的所有方法"""
async def create_trace(self, trace: Trace) -> str: ...
async def get_trace(self, trace_id: str) -> Optional[Trace]: ...
async def list_traces(self, ...) -> List[Trace]: ...
async def add_step(self, step: Step) -> str: ...
async def get_step(self, step_id: str) -> Optional[Step]: ...
async def get_trace_steps(self, trace_id: str) -> List[Step]: ...
async def get_step_children(self, step_id: str) -> List[Step]: ...
内存存储实现(用于开发和测试)
from agent.trace import MemoryTraceStore
store = MemoryTraceStore()
# 使用方法
trace_id = await store.create_trace(trace)
trace = await store.get_trace(trace_id)
steps = await store.get_trace_steps(trace_id)
# 1. 安装依赖
pip install -r requirements.txt
# 2. 启动服务
python api_server.py
# 3. 访问 API 文档
open http://localhost:8000/docs
GET /api/traces?mode=agent&status=running&limit=20
响应:
{
"traces": [
{
"trace_id": "abc123",
"mode": "agent",
"task": "探索代码库",
"status": "running",
"total_steps": 15,
"total_tokens": 5000,
"total_cost": 0.05
}
]
}
GET /api/traces/{trace_id}/tree
响应:递归 Step 树(完整)
GET /api/traces/{trace_id}/node/{step_id}?expand=true&max_depth=2
参数:
step_id: Step ID(null 表示根节点)expand: 是否加载子节点max_depth: 递归深度(1-10)核心算法:简洁的层级懒加载(< 30 行)
async def _build_tree(store, trace_id, step_id, expand, max_depth, current_depth):
# 1. 获取当前层节点
if step_id is None:
nodes = [s for s in steps if s.parent_id is None]
else:
nodes = await store.get_step_children(step_id)
# 2. 构建响应
result = []
for step in nodes:
node_dict = step.to_dict()
node_dict["children"] = []
# 3. 递归加载子节点(可选)
if expand and current_depth < max_depth:
node_dict["children"] = await _build_tree(...)
result.append(node_dict)
return result
实时监听进行中 Trace 的更新
// 连接
ws = new WebSocket(`/api/traces/${trace_id}/watch`)
// 事件
ws.onmessage = (e) => {
const event = JSON.parse(e.data)
switch (event.event) {
case "connected":
console.log("已连接")
break
case "step_added":
// 新增 Step
addStepToTree(event.step)
break
case "step_updated":
// Step 状态更新
updateStep(event.step_id, event.updates)
break
case "trace_completed":
// Trace 完成
console.log("完成")
ws.close()
break
}
}
from agent import AgentRunner
from agent.trace import MemoryTraceStore
# 初始化
store = MemoryTraceStore()
runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
# 执行 Agent(自动记录 Trace)
async for event in runner.run(task="探索代码库"):
print(event)
# 查询 Trace
traces = await store.list_traces(mode="agent", limit=10)
steps = await store.get_trace_steps(traces[0].trace_id)
// 一次性加载完整树
const response = await fetch(`/api/traces/${traceId}/tree`)
const { root_steps } = await response.json()
// 渲染树
renderTree(root_steps)
// 懒加载:只加载根节点
const response = await fetch(`/api/traces/${traceId}/node/null?expand=false`)
const { children } = await response.json()
// 用户点击展开时
async function expandNode(stepId) {
const response = await fetch(
`/api/traces/${traceId}/node/${stepId}?expand=true&max_depth=1`
)
const { children } = await response.json()
return children
}
// WebSocket 监听
ws = new WebSocket(`/api/traces/${traceId}/watch`)
ws.onmessage = (e) => {
const event = JSON.parse(e.data)
if (event.event === "step_added") {
// 实时添加新 Step 到 UI
appendStep(event.step)
}
}
from agent.trace import TraceStore, Trace, Step
class PostgreSQLTraceStore:
"""PostgreSQL 存储实现"""
def __init__(self, connection_string: str):
self.pool = create_pool(connection_string)
async def create_trace(self, trace: Trace) -> str:
async with self.pool.acquire() as conn:
await conn.execute(
"INSERT INTO traces (...) VALUES (...)",
trace.to_dict()
)
return trace.trace_id
async def get_step_children(self, step_id: str) -> List[Step]:
# 使用递归 CTE 优化查询
query = """
WITH RECURSIVE subtree AS (
SELECT * FROM steps WHERE parent_id = $1
)
SELECT * FROM subtree ORDER BY sequence
"""
# ...
# ✅ 推荐导入
from agent.trace import Trace, Step, StepType, Status
from agent.trace import TraceStore, MemoryTraceStore
# ✅ 顶层导入(等价)
from agent import Trace, Step, TraceStore
# ❌ 旧导入(已删除,会报错)
from agent.models.trace import Trace # ModuleNotFoundError
from agent.storage.protocols import TraceStore # ImportError
/tree 一次性加载/node/{step_id} 懒加载