执行轨迹记录和存储的后端实现
职责定位:agent/execution 模块负责所有 Trace/Message 相关功能
agent/execution/
├── models.py # Trace/Message 数据模型
├── protocols.py # TraceStore 存储接口
├── fs_store.py # 文件系统存储实现
├── api.py # RESTful API
└── websocket.py # WebSocket 实时推送
设计原则:
一次完整的 LLM 交互(单次调用或 Agent 任务)
trace = Trace.create(mode="agent", task="探索代码库")
trace.trace_id # UUID
trace.mode # "call" | "agent"
trace.task # 任务描述
trace.status # "running" | "completed" | "failed"
trace.total_messages # Message 总数
trace.total_tokens # Token 总数
trace.total_cost # 总成本
trace.current_goal_id # 当前焦点 goal
实现:agent/execution/models.py:Trace
对应 LLM API 消息,加上元数据。通过 goal_id 和 branch_id 关联 GoalTree 中的目标。
# assistant 消息(模型返回,可能含 text + tool_calls)
assistant_msg = Message.create(
trace_id=trace.trace_id,
role="assistant",
goal_id="3", # 内部 ID(纯自增)
branch_id=None, # 主线消息
content={"text": "...", "tool_calls": [...]},
)
# 分支内的 tool 消息
tool_msg = Message.create(
trace_id=trace.trace_id,
role="tool",
goal_id="1", # 分支内的 goal ID(分支内独立编号)
branch_id="A", # 分支 A
tool_call_id="call_abc123",
content="工具执行结果",
)
实现:agent/execution/models.py:Message
class TraceStore(Protocol):
# Trace 操作
async def create_trace(self, trace: Trace) -> str: ...
async def get_trace(self, trace_id: str) -> Optional[Trace]: ...
async def update_trace(self, trace_id: str, **updates) -> None: ...
async def list_traces(self, ...) -> List[Trace]: ...
# GoalTree 操作
async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: ...
async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: ...
async def add_goal(self, trace_id: str, goal: Goal) -> None: ...
async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: ...
# Branch 操作(分支独立存储)
async def create_branch(self, trace_id: str, branch: BranchContext) -> None: ...
async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]: ...
async def get_branch_detail(self, trace_id: str, branch_id: str) -> Optional[BranchDetail]: ...
async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None: ...
async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]: ...
# Message 操作
async def add_message(self, message: Message) -> str: ...
async def get_message(self, message_id: str) -> Optional[Message]: ...
async def get_trace_messages(self, trace_id: str) -> List[Message]: ...
async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ...
async def get_messages_by_branch(self, trace_id: str, branch_id: str) -> List[Message]: ...
async def update_message(self, message_id: str, **updates) -> None: ...
# 事件流(WebSocket 断线续传)
async def get_events(self, trace_id: str, since_event_id: int) -> List[Dict]: ...
async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ...
实现:agent/execution/protocols.py
from agent.execution import FileSystemTraceStore
store = FileSystemTraceStore(base_path=".trace")
目录结构:
.trace/{trace_id}/
├── meta.json # Trace 元数据
├── goal.json # 主线 GoalTree(扁平 JSON,通过 parent_id 构建层级)
├── messages/ # 主线 Messages(每条独立文件)
│ ├── {message_id}.json
│ └── ...
├── branches/ # 分支数据(独立存储)
│ ├── A/
│ │ ├── meta.json # BranchContext 元数据
│ │ ├── goal.json # 分支 A 的 GoalTree
│ │ └── messages/ # 分支 A 的 Messages
│ └── B/
│ └── ...
└── events.jsonl # 事件流(WebSocket 续传)
实现:agent/execution/fs_store.py
GET /api/traces?mode=agent&status=running&limit=20
GET /api/traces/{trace_id}
返回 Trace 元数据、主线 GoalTree(扁平列表,含所有 Goal 及其 stats)、分支元数据(不含分支内部 GoalTree)。
GET /api/traces/{trace_id}/messages?goal_id=3
GET /api/traces/{trace_id}/messages?branch_id=A
返回指定 Goal 或分支关联的所有 Messages(用于查看执行详情)。
GET /api/traces/{trace_id}/branches/{branch_id}
返回分支完整详情,包括分支内的 GoalTree(用于展开分支时加载)。
实现:agent/execution/api.py
ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0
| 事件 | 触发时机 | payload |
|---|---|---|
connected |
WebSocket 连接成功 | trace_id, current_event_id, goal_tree, branches(分支元数据) |
goal_added |
新增 Goal | goal 完整数据(含 stats, parent_id, branch_id, type) |
goal_updated |
Goal 状态变化(含级联完成) | goal_id, updates, affected_goals(含 cumulative_stats) |
message_added |
新 Message | message 数据(含 goal_id, branch_id),affected_goals,affected_branches |
branch_started |
分支开始探索 | explore_start_id, branch 元数据 |
branch_goal_added |
分支内新增 Goal | branch_id, goal |
branch_completed |
分支完成 | explore_start_id, branch_id, summary, cumulative_stats, last_message |
explore_completed |
所有分支完成 | explore_start_id, merge_summary |
trace_completed |
执行完成 | 统计信息 |
每次添加 Message 时,后端执行:
self_statsparent_id 链向上更新所有祖先的 cumulative_statscumulative_statsmessage_added 事件的 affected_goals 和 affected_branches 中推送所有受影响的最新 stats分支统计:
cumulative_stats:分支内所有 Goals 的累计统计cumulative_stats:所有关联分支的 cumulative_stats 之和实现:agent/execution/websocket.py
from agent import AgentRunner
from agent.execution import FileSystemTraceStore
store = FileSystemTraceStore(base_path=".trace")
runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
async for event in runner.run(task="探索代码库"):
print(event) # Trace 或 Message