执行轨迹记录和存储的后端实现
职责定位:agent/trace 模块负责所有 Trace/Message 相关功能
agent/trace/
├── models.py # Trace/Message 数据模型
├── goal_models.py # Goal/GoalTree 数据模型
├── protocols.py # TraceStore 存储接口
├── store.py # 文件系统存储实现
├── trace_id.py # Trace ID 生成工具
├── api.py # RESTful 查询 API
├── run_api.py # 控制 API(run/stop/reflect)
├── websocket.py # WebSocket 实时推送
├── goal_tool.py # goal 工具(计划管理)
└── compaction.py # Context 压缩
设计原则:
一次完整的 LLM 交互(单次调用或 Agent 任务)。每个 Sub-Agent 都是独立的 Trace。
# 主 Trace
main_trace = Trace.create(mode="agent", task="探索代码库")
# Sub-Trace(由 delegate 或 explore 工具创建)
sub_trace = Trace(
trace_id="2f8d3a1c...@explore-20260204220012-001",
mode="agent",
task="探索 JWT 认证方案",
parent_trace_id="2f8d3a1c-4b6e-4f9a-8c2d-1e5b7a9f3c4d",
parent_goal_id="3",
agent_type="explore",
status="running"
)
# 字段说明
trace.trace_id # UUID(主 Trace)或 {parent}@{mode}-{timestamp}-{seq}(Sub-Trace)
trace.mode # "call" | "agent"
trace.task # 任务描述
trace.parent_trace_id # 父 Trace ID(Sub-Trace 专用)
trace.parent_goal_id # 触发的父 Goal ID(Sub-Trace 专用)
trace.agent_type # Agent 类型:explore, delegate 等
trace.status # "running" | "completed" | "failed" | "stopped"
trace.total_messages # Message 总数
trace.total_tokens # Token 总数
trace.total_cost # 总成本
trace.current_goal_id # 当前焦点 goal
trace.head_sequence # 当前主路径头节点 sequence(用于 build_llm_messages)
Trace ID 格式:
2f8d3a1c-4b6e-4f9a-8c2d-1e5b7a9f3c4d{parent_uuid}@{mode}-{timestamp}-{seq},例如 2f8d3a1c...@explore-20260204220012-001实现:agent/trace/models.py:Trace
对应 LLM API 消息,加上元数据。通过 goal_id 关联 GoalTree 中的目标。通过 parent_sequence 形成消息树。
# assistant 消息(模型返回,可能含 text + tool_calls)
assistant_msg = Message.create(
trace_id=trace.trace_id,
role="assistant",
goal_id="3", # Goal ID(Trace 内部自增)
content={"text": "...", "tool_calls": [...]},
parent_sequence=5, # 父消息的 sequence
)
# tool 消息
tool_msg = Message.create(
trace_id=trace.trace_id,
role="tool",
goal_id="5",
tool_call_id="call_abc123",
content="工具执行结果",
parent_sequence=6,
)
parent_sequence:指向父消息的 sequence,构成消息树。主路径 = 从 trace.head_sequence 沿 parent chain 回溯到 root。
description 字段(系统自动生成):
assistant 消息:优先取 content 中的 text,若无 text 则生成 "tool call: XX, XX"tool 消息:使用 tool name实现:agent/trace/models.py:Message
class TraceStore(Protocol):
# Trace 操作
async def create_trace(self, trace: Trace) -> str: ...
async def get_trace(self, trace_id: str) -> Optional[Trace]: ...
async def update_trace(self, trace_id: str, **updates) -> None: ...
async def list_traces(self, ...) -> List[Trace]: ...
# GoalTree 操作(每个 Trace 有独立的 GoalTree)
async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]: ...
async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None: ...
async def add_goal(self, trace_id: str, goal: Goal) -> None: ...
async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None: ...
# Message 操作
async def add_message(self, message: Message) -> str: ...
async def get_message(self, message_id: str) -> Optional[Message]: ...
async def get_trace_messages(self, trace_id: str) -> List[Message]: ...
async def get_main_path_messages(self, trace_id: str, head_sequence: int) -> List[Message]: ...
async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ...
async def update_message(self, message_id: str, **updates) -> None: ...
# 事件流(WebSocket 断线续传)
async def get_events(self, trace_id: str, since_event_id: int) -> List[Dict]: ...
async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ...
实现:agent/trace/protocols.py
from agent.trace import FileSystemTraceStore
store = FileSystemTraceStore(base_path=".trace")
目录结构:
.trace/
├── 2f8d3a1c-4b6e-4f9a-8c2d-1e5b7a9f3c4d/ # 主 Trace
│ ├── meta.json # Trace 元数据
│ ├── goal.json # GoalTree(扁平 JSON)
│ ├── messages/ # Messages
│ │ ├── {message_id}.json
│ │ └── ...
│ └── events.jsonl # 事件流
│
├── 2f8d3a1c...@explore-20260204220012-001/ # Sub-Trace A
│ ├── meta.json # parent_trace_id 指向主 Trace
│ ├── goal.json # 独立的 GoalTree
│ ├── messages/
│ └── events.jsonl
│
└── 2f8d3a1c...@explore-20260204220012-002/ # Sub-Trace B
└── ...
关键变化(相比旧设计):
branches/ 子目录实现:agent/trace/store.py
GET /api/traces?mode=agent&status=running&limit=20
返回所有 Traces(包括主 Trace 和 Sub-Traces)。
GET /api/traces/{trace_id}
返回:
parent_trace_id == trace_id 的 Traces)GET /api/traces/{trace_id}/messages?mode=main_path&head=15&goal_id=3
返回指定 Trace 的 Messages。参数:
mode: main_path(默认)| all — 返回主路径消息或全部消息head: 可选 sequence 值 — 指定主路径的 head(默认用 trace.head_sequence,仅 mode=main_path 有效)goal_id: 可选,按 Goal 过滤实现:agent/trace/api.py
需在 api_server.py 中配置 Runner。执行在后台异步进行,通过 WebSocket 监听进度。
POST /api/traces
Content-Type: application/json
{
"messages": [
{"role": "system", "content": "自定义 system prompt(可选,不传则从 skills 自动构建)"},
{"role": "user", "content": "分析项目架构"}
],
"model": "gpt-4o",
"temperature": 0.3,
"max_iterations": 200,
"tools": null,
"name": "任务名称",
"uid": "user_id"
}
POST /api/traces/{trace_id}/run
Content-Type: application/json
{
"messages": [{"role": "user", "content": "..."}],
"after_sequence": null
}
after_sequence: null(或省略)→ 从末尾续跑after_sequence: N(主路径上且 < head)→ 回溯到 sequence N 后运行messages: [] + after_sequence: N → 重新生成Runner 根据 after_sequence 与 head_sequence 的关系自动判断续跑/回溯行为。
POST /api/traces/{trace_id}/stop
设置取消信号,agent loop 在下一个检查点退出,Trace 状态置为 stopped。
GET /api/traces/running
POST /api/traces/{trace_id}/reflect
Content-Type: application/json
{
"focus": "可选,反思重点"
}
在 trace 末尾追加一条包含反思 prompt 的 user message,作为侧枝运行。
使用 max_iterations=1, tools=[] 进行单轮无工具 LLM 调用,生成经验总结,
结果自动追加到 ./cache/experiences.md。head_sequence 通过 try/finally 保证恢复。
GET /api/experiences
返回 ./cache/experiences.md 的文件内容。
实现:agent/trace/run_api.py
ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0
| 事件 | 触发时机 | payload |
|---|---|---|
connected |
WebSocket 连接成功 | trace_id, current_event_id, goal_tree, sub_traces |
goal_added |
新增 Goal | goal 完整数据(含 stats, parent_id, type) |
goal_updated |
Goal 状态变化(含级联完成) | goal_id, updates, affected_goals(含级联完成的父节点) |
message_added |
新 Message | message 数据(含 goal_id),affected_goals |
sub_trace_started |
Sub-Trace 开始执行 | trace_id, parent_goal_id, agent_type, task |
sub_trace_completed |
Sub-Trace 完成 | trace_id, status, summary, stats |
rewind |
回溯执行 | after_sequence, head_sequence, goal_tree_snapshot |
trace_completed |
执行完成 | 统计信息 |
每次添加 Message 时,后端执行:
self_statsparent_id 链向上更新所有祖先的 cumulative_statsmessage_added 事件的 affected_goals 中推送所有受影响的 Goal 及其最新 stats当所有子 Goals 都完成时,自动完成父 Goal:
status == "completed"status = "completed"goal_updated 事件的 affected_goals 中包含级联完成的父节点实现:agent/trace/websocket.py
并行探索多个方向:
from agent.goal.explore import explore_tool
result = await explore_tool(
current_trace_id="main_trace_id",
current_goal_id="3",
branches=["JWT 方案", "Session 方案"],
store=store,
run_agent=run_agent_func
)
将大任务委托给独立 Sub-Agent:
from agent.goal.delegate import delegate_tool
result = await delegate_tool(
current_trace_id="main_trace_id",
current_goal_id="3",
task="实现用户登录功能",
store=store,
run_agent=run_agent_func
)
from agent import AgentRunner
from agent.trace import FileSystemTraceStore
store = FileSystemTraceStore(base_path=".trace")
runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
async for event in runner.run(task="探索代码库"):
print(event) # Trace 或 Message
# 获取主 Trace 的所有 Sub-Traces
all_traces = await store.list_traces(limit=1000)
sub_traces = [t for t in all_traces if t.parent_trace_id == main_trace_id]