执行轨迹记录和存储的后端实现
职责定位:agent/execution 模块负责所有 Trace/Message 相关功能
agent/execution/
├── models.py # Trace/Message 数据模型
├── protocols.py # TraceStore 存储接口
├── fs_store.py # 文件系统存储实现
├── trace_id.py # Trace ID 生成工具
├── api.py # RESTful API
└── websocket.py # WebSocket 实时推送
设计原则:
一次完整的 LLM 交互(单次调用或 Agent 任务)。每个 Sub-Agent 都是独立的 Trace。
# 主 Trace
main_trace = Trace.create(mode="agent", task="探索代码库")
# Sub-Trace(由 delegate 或 explore 工具创建)
sub_trace = Trace(
trace_id="2f8d3a1c...@explore-20260204220012-001",
mode="agent",
task="探索 JWT 认证方案",
parent_trace_id="2f8d3a1c-4b6e-4f9a-8c2d-1e5b7a9f3c4d",
parent_goal_id="3",
agent_type="explore",
status="running"
)
# 字段说明
trace.trace_id # UUID(主 Trace)或 {parent}@{mode}-{timestamp}-{seq}(Sub-Trace)
trace.mode # "call" | "agent"
trace.task # 任务描述
trace.parent_trace_id # 父 Trace ID(Sub-Trace 专用)
trace.parent_goal_id # 触发的父 Goal ID(Sub-Trace 专用)
trace.agent_type # Agent 类型:explore, delegate 等
trace.status # "running" | "completed" | "failed"
trace.total_messages # Message 总数
trace.total_tokens # Token 总数
trace.total_cost # 总成本
trace.current_goal_id # 当前焦点 goal
Trace ID 格式:
2f8d3a1c-4b6e-4f9a-8c2d-1e5b7a9f3c4d{parent_uuid}@{mode}-{timestamp}-{seq},例如 2f8d3a1c...@explore-20260204220012-001实现:agent/execution/models.py:Trace
对应 LLM API 消息,加上元数据。通过 goal_id 关联 GoalTree 中的目标。
# assistant 消息(模型返回,可能含 text + tool_calls)
assistant_msg = Message.create(
trace_id=trace.trace_id,
role="assistant",
goal_id="3", # Goal ID(Trace 内部自增)
content={"text": "...", "tool_calls": [...]},
)
# tool 消息
tool_msg = Message.create(
trace_id=trace.trace_id,
role="tool",
goal_id="5",
tool_call_id="call_abc123",
content="工具执行结果",
)
description 字段(系统自动生成):
assistant 消息:优先取 content 中的 text,若无 text 则生成 "tool call: XX, XX"tool 消息:使用 tool name实现:agent/execution/models.py:Message
class TraceStore(Protocol):
# Trace 操作
async def create_trace(self, trace: Trace) -> str: ...
async def get_trace(self, trace_id: str) -> Optional[Trace]: ...
async def update_trace(self, trace_id: str, **updates) -> None: ...
async def list_traces(self, ...) -> List[Trace]: ...
# GoalTree 操作(每个 Trace 有独立的 GoalTree)
async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: ...
async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: ...
async def add_goal(self, trace_id: str, goal: Goal) -> None: ...
async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: ...
# Message 操作
async def add_message(self, message: Message) -> str: ...
async def get_message(self, message_id: str) -> Optional[Message]: ...
async def get_trace_messages(self, trace_id: str) -> List[Message]: ...
async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ...
async def update_message(self, message_id: str, **updates) -> None: ...
# 事件流(WebSocket 断线续传)
async def get_events(self, trace_id: str, since_event_id: int) -> List[Dict]: ...
async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ...
实现:agent/execution/protocols.py
from agent.execution import FileSystemTraceStore
store = FileSystemTraceStore(base_path=".trace")
目录结构:
.trace/
├── 2f8d3a1c-4b6e-4f9a-8c2d-1e5b7a9f3c4d/ # 主 Trace
│ ├── meta.json # Trace 元数据
│ ├── goal.json # GoalTree(扁平 JSON)
│ ├── messages/ # Messages
│ │ ├── {message_id}.json
│ │ └── ...
│ └── events.jsonl # 事件流
│
├── 2f8d3a1c...@explore-20260204220012-001/ # Sub-Trace A
│ ├── meta.json # parent_trace_id 指向主 Trace
│ ├── goal.json # 独立的 GoalTree
│ ├── messages/
│ └── events.jsonl
│
└── 2f8d3a1c...@explore-20260204220012-002/ # Sub-Trace B
└── ...
关键变化(相比旧设计):
branches/ 子目录实现:agent/execution/fs_store.py
GET /api/traces?mode=agent&status=running&limit=20
返回所有 Traces(包括主 Trace 和 Sub-Traces)。
GET /api/traces/{trace_id}
返回:
parent_trace_id == trace_id 的 Traces)GET /api/traces/{trace_id}/messages?goal_id=3
返回指定 Trace 的 Messages,可选按 Goal 过滤。
实现:agent/execution/api.py
ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0
| 事件 | 触发时机 | payload |
|---|---|---|
connected |
WebSocket 连接成功 | trace_id, current_event_id, goal_tree, sub_traces |
goal_added |
新增 Goal | goal 完整数据(含 stats, parent_id, type) |
goal_updated |
Goal 状态变化(含级联完成) | goal_id, updates, affected_goals(含级联完成的父节点) |
message_added |
新 Message | message 数据(含 goal_id),affected_goals |
sub_trace_started |
Sub-Trace 开始执行 | trace_id, parent_goal_id, agent_type, task |
sub_trace_completed |
Sub-Trace 完成 | trace_id, status, summary, stats |
trace_completed |
执行完成 | 统计信息 |
每次添加 Message 时,后端执行:
self_statsparent_id 链向上更新所有祖先的 cumulative_statsmessage_added 事件的 affected_goals 中推送所有受影响的 Goal 及其最新 stats当所有子 Goals 都完成时,自动完成父 Goal:
status == "completed"status = "completed"goal_updated 事件的 affected_goals 中包含级联完成的父节点实现:agent/execution/websocket.py
并行探索多个方向:
from agent.goal.explore import explore_tool
result = await explore_tool(
current_trace_id="main_trace_id",
current_goal_id="3",
branches=["JWT 方案", "Session 方案"],
store=store,
run_agent=run_agent_func
)
将大任务委托给独立 Sub-Agent:
from agent.goal.delegate import delegate_tool
result = await delegate_tool(
current_trace_id="main_trace_id",
current_goal_id="3",
task="实现用户登录功能",
store=store,
run_agent=run_agent_func
)
from agent import AgentRunner
from agent.execution import FileSystemTraceStore
store = FileSystemTraceStore(base_path=".trace")
runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
async for event in runner.run(task="探索代码库"):
print(event) # Trace 或 Message
# 获取主 Trace 的所有 Sub-Traces
all_traces = await store.list_traces(limit=1000)
sub_traces = [t for t in all_traces if t.parent_trace_id == main_trace_id]