trace-api.md 7.7 KB

Trace 模块 - 执行记录存储

执行轨迹记录和存储的后端实现


架构概览

职责定位agent/execution 模块负责所有 Trace/Message 相关功能

agent/execution/
├── models.py          # Trace/Message 数据模型
├── protocols.py       # TraceStore 存储接口
├── fs_store.py        # 文件系统存储实现
├── api.py             # RESTful API
└── websocket.py       # WebSocket 实时推送

设计原则

  • 高内聚:所有 Trace 相关代码在一个模块
  • 松耦合:核心模型不依赖 FastAPI
  • 可扩展:易于添加 PostgreSQL 等存储实现

核心模型

Trace - 执行轨迹

一次完整的 LLM 交互(单次调用或 Agent 任务)

trace = Trace.create(mode="agent", task="探索代码库")

trace.trace_id        # UUID
trace.mode            # "call" | "agent"
trace.task            # 任务描述
trace.status          # "running" | "completed" | "failed"
trace.total_messages  # Message 总数
trace.total_tokens    # Token 总数
trace.total_cost      # 总成本
trace.current_goal_id # 当前焦点 goal

实现agent/execution/models.py:Trace

Message - 执行消息

对应 LLM API 消息,加上元数据。通过 goal_idbranch_id 关联 GoalTree 中的目标。

# assistant 消息(模型返回,可能含 text + tool_calls)
assistant_msg = Message.create(
    trace_id=trace.trace_id,
    role="assistant",
    goal_id="3",                    # 内部 ID(纯自增)
    branch_id=None,                 # 主线消息
    content={"text": "...", "tool_calls": [...]},
)

# 分支内的 tool 消息
tool_msg = Message.create(
    trace_id=trace.trace_id,
    role="tool",
    goal_id="1",                    # 分支内的 goal ID(分支内独立编号)
    branch_id="A",                  # 分支 A
    tool_call_id="call_abc123",
    content="工具执行结果",
)

实现agent/execution/models.py:Message


存储接口

TraceStore Protocol

class TraceStore(Protocol):
    # Trace 操作
    async def create_trace(self, trace: Trace) -> str: ...
    async def get_trace(self, trace_id: str) -> Optional[Trace]: ...
    async def update_trace(self, trace_id: str, **updates) -> None: ...
    async def list_traces(self, ...) -> List[Trace]: ...

    # GoalTree 操作
    async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: ...
    async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: ...
    async def add_goal(self, trace_id: str, goal: Goal) -> None: ...
    async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: ...

    # Branch 操作(分支独立存储)
    async def create_branch(self, trace_id: str, branch: BranchContext) -> None: ...
    async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]: ...
    async def get_branch_detail(self, trace_id: str, branch_id: str) -> Optional[BranchDetail]: ...
    async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None: ...
    async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]: ...

    # Message 操作
    async def add_message(self, message: Message) -> str: ...
    async def get_message(self, message_id: str) -> Optional[Message]: ...
    async def get_trace_messages(self, trace_id: str) -> List[Message]: ...
    async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ...
    async def get_messages_by_branch(self, trace_id: str, branch_id: str) -> List[Message]: ...
    async def update_message(self, message_id: str, **updates) -> None: ...

    # 事件流(WebSocket 断线续传)
    async def get_events(self, trace_id: str, since_event_id: int) -> List[Dict]: ...
    async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ...

实现agent/execution/protocols.py

FileSystemTraceStore

from agent.execution import FileSystemTraceStore

store = FileSystemTraceStore(base_path=".trace")

目录结构

.trace/{trace_id}/
├── meta.json           # Trace 元数据
├── goal.json           # 主线 GoalTree(扁平 JSON,通过 parent_id 构建层级)
├── messages/           # 主线 Messages(每条独立文件)
│   ├── {message_id}.json
│   └── ...
├── branches/           # 分支数据(独立存储)
│   ├── A/
│   │   ├── meta.json   # BranchContext 元数据
│   │   ├── goal.json   # 分支 A 的 GoalTree
│   │   └── messages/   # 分支 A 的 Messages
│   └── B/
│       └── ...
└── events.jsonl        # 事件流(WebSocket 续传)

实现agent/execution/fs_store.py


REST API 端点

1. 列出 Traces

GET /api/traces?mode=agent&status=running&limit=20

2. 获取 Trace + GoalTree + 分支元数据

GET /api/traces/{trace_id}

返回 Trace 元数据、主线 GoalTree(扁平列表,含所有 Goal 及其 stats)、分支元数据(不含分支内部 GoalTree)。

3. 获取 Messages

GET /api/traces/{trace_id}/messages?goal_id=3
GET /api/traces/{trace_id}/messages?branch_id=A

返回指定 Goal 或分支关联的所有 Messages(用于查看执行详情)。

4. 获取分支详情(按需加载)

GET /api/traces/{trace_id}/branches/{branch_id}

返回分支完整详情,包括分支内的 GoalTree(用于展开分支时加载)。

实现agent/execution/api.py


WebSocket 事件

连接

ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0

事件类型

事件 触发时机 payload
connected WebSocket 连接成功 trace_id, current_event_id, goal_tree, branches(分支元数据)
goal_added 新增 Goal goal 完整数据(含 stats, parent_id, branch_id, type)
goal_updated Goal 状态变化(含级联完成) goal_id, updates, affected_goals(含 cumulative_stats)
message_added 新 Message message 数据(含 goal_id, branch_id),affected_goals,affected_branches
branch_started 分支开始探索 explore_start_id, branch 元数据
branch_goal_added 分支内新增 Goal branch_id, goal
branch_completed 分支完成 explore_start_id, branch_id, summary, cumulative_stats, last_message
explore_completed 所有分支完成 explore_start_id, merge_summary
trace_completed 执行完成 统计信息

Stats 更新逻辑

每次添加 Message 时,后端执行:

  1. 更新对应 Goal 的 self_stats
  2. 沿 parent_id 链向上更新所有祖先的 cumulative_stats
  3. 如果是分支内 Message(branch_id 非空),还需更新所属 Branch 的 cumulative_stats
  4. message_added 事件的 affected_goalsaffected_branches 中推送所有受影响的最新 stats

分支统计

  • Branch 的 cumulative_stats:分支内所有 Goals 的累计统计
  • explore_start Goal 的 cumulative_stats:所有关联分支的 cumulative_stats 之和

实现agent/execution/websocket.py


使用场景

Agent 执行时记录

from agent import AgentRunner
from agent.execution import FileSystemTraceStore

store = FileSystemTraceStore(base_path=".trace")
runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)

async for event in runner.run(task="探索代码库"):
    print(event)  # Trace 或 Message

相关文档