这是一个基于 LLM 的可扩展 Agent 框架,核心理念是 "所有 Agent 都是 Trace"。框架支持多步工具调用、计划管理、子 Agent 协作、回溯重跑和上下文压缩。
定义位置: agent/trace/models.py
@dataclass
class Message:
"""LLM 交互中的单条消息"""
message_id: str # 唯一标识
trace_id: str # 所属 Trace
role: Literal["user", "assistant", "tool", "system"]
sequence: int # 消息序号(全局递增)
parent_sequence: Optional[int] # 父消息序号(构建消息树)
goal_id: Optional[str] # 关联的 Goal ID
# 内容
content: Union[str, Dict, List] # 消息内容(支持多模态)
tool_call_id: Optional[str] # 工具调用 ID
# 统计
tokens: int = 0
cost: float = 0.0
duration_ms: int = 0
created_at: datetime
关键特性:
sequence 和 parent_sequence 构建消息树(支持回溯)goal_id 关联到执行计划定义位置: agent/trace/models.py
@dataclass
class Trace:
"""一次完整的 LLM 交互轨迹"""
trace_id: str
mode: Literal["call", "agent"] # 单次调用 or Agent 模式
# Agent 模式特有
task: Optional[str] # 任务描述
agent_type: Optional[str] # Agent 类型(explore/analyst等)
# 父子关系(Sub-Trace)
parent_trace_id: Optional[str] # 父 Trace ID
parent_goal_id: Optional[str] # 启动该 Trace 的 Goal
# 状态
status: Literal["running", "completed", "failed", "stopped"]
# 统计
total_messages: int = 0
total_tokens: int = 0
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
total_reasoning_tokens: int = 0 # o1/DeepSeek R1 推理 tokens
total_cache_creation_tokens: int = 0
total_cache_read_tokens: int = 0
total_cost: float = 0.0
total_duration_ms: int = 0
# 进度追踪
last_sequence: int = 0 # 最新消息序号
head_sequence: int = 0 # 当前主路径头节点(用于续跑)
# 配置
model: Optional[str]
tools: Optional[List[Dict]] # 工具定义
llm_params: Dict[str, Any]
# 当前焦点 Goal
current_goal_id: Optional[str]
关键特性:
parent_trace_id 构建 Agent 层级关系head_sequence 支持回溯重跑(rewind)定义位置: agent/trace/goal_models.py
@dataclass
class Goal:
"""执行目标节点"""
id: str # 纯自增 ID("1", "2", "3")
description: str # 目标描述
reason: str # 创建理由
parent_id: Optional[str] # 父 Goal(构建层级)
type: GoalType = "normal" # "normal" | "agent_call"
status: GoalStatus = "pending" # "pending" | "in_progress" | "completed" | "abandoned"
summary: Optional[str] # 完成/放弃时的总结
# agent_call 特有
sub_trace_ids: Optional[List[Dict[str, str]]] # 启动的子 Trace
agent_call_mode: Optional[str] # "explore" | "delegate" | "sequential"
# 统计
self_stats: GoalStats # 自身统计
cumulative_stats: GoalStats # 累计统计(含子孙)
# 知识注入
knowledge: Optional[List[Dict]] # 相关知识列表
@dataclass
class GoalTree:
"""目标树 - 管理整个执行计划"""
mission: str # 总体任务
goals: List[Goal] # 扁平列表(通过 parent_id 构建层级)
current_id: Optional[str] # 当前焦点 Goal
_next_id: int = 1 # 下一个 Goal ID
# 方法
def add_goal(...) # 添加新 Goal
def update_goal(...) # 更新 Goal 状态
def get_goal(...) # 获取 Goal
def get_children(...) # 获取子 Goals
def get_pending_goals(...) # 获取待执行 Goals
def rewind_to(...) # 回溯到指定时间点
关键特性:
parent_id 构建层级(避免递归复杂度)agent_call 类型标记子 Agent 启动点定义位置: agent/memory/models.py
@dataclass
class Experience:
"""经验规则(条件 + 规则 + 证据)"""
exp_id: str
scope: str # "agent:{type}" 或 "user:{uid}"
# 核心三元组
condition: str # 什么情况下适用
rule: str # 应该怎么做
evidence: List[str] # 证据(step_ids)
# 元数据
source: Literal["execution", "feedback", "manual"]
confidence: float = 0.5
usage_count: int = 0
success_rate: float = 0.0
@dataclass
class Skill:
"""技能 - 从经验归纳的高层知识"""
skill_id: str
scope: str
name: str
description: str
category: str # "search", "reasoning", "writing"
# 层次结构
parent_id: Optional[str]
# 内容
content: Optional[str] # 完整 Markdown 内容
guidelines: List[str]
derived_from: List[str] # experience_ids
version: int = 1
关键特性:
scope 实现权限隔离(agent 级 / user 级)定义位置: agent/tools/models.py
@dataclass
class ToolContext:
"""工具执行上下文"""
trace_id: str
goal_id: Optional[str]
uid: Optional[str]
agent_type: Optional[str]
# 回调
trace_store: Optional[TraceStore]
llm_call: Optional[Callable]
# 知识管理
knowledge_config: Optional[KnowledgeConfig]
@dataclass
class ToolResult:
"""工具执行结果"""
output: str # 返回给 LLM 的文本
base64_image: Optional[str] # 可选图片(如截图)
error: Optional[str] # 错误信息
# 子 Agent 专用
sub_trace_id: Optional[str]
# 模型使用(工具内部调用 LLM 时)
model_usage: Optional[Dict]
入口: AgentRunner.run() - agent/core/runner.py:301
执行分为 3 个阶段:
async def _prepare_trace(messages, config):
if config.trace_id:
# 续跑或回溯:加载已有 Trace 和 GoalTree
trace = await trace_store.get_trace(config.trace_id)
goal_tree = await trace_store.get_goal_tree(config.trace_id)
if config.after_sequence < trace.head_sequence:
# 回溯模式:重置 GoalTree 到指定时间点
sequence = await _rewind(trace_id, after_sequence, goal_tree)
else:
# 续跑模式:从 last_sequence + 1 开始
sequence = trace.last_sequence + 1
else:
# 新建:创建 Trace 和 GoalTree
trace = Trace.create(...)
goal_tree = GoalTree(mission=task, goals=[])
sequence = 1
return trace, goal_tree, sequence
关键点:
config.trace_id 和 config.after_sequence 区分新建/续跑/回溯goal_tree.rewind_to() 清理未来的 Goalsasync def _build_history(trace_id, messages, goal_tree, config, sequence):
# 1. 加载历史消息(从 head_sequence 开始)
history_messages = await trace_store.get_messages(
trace_id,
after_sequence=trace.head_sequence
)
history = [msg.to_llm_format() for msg in history_messages]
# 2. 构建 system prompt
system_prompt = await _build_system_prompt(config)
history.insert(0, {"role": "system", "content": system_prompt})
# 3. 追加新消息
for msg in messages:
message = Message.create(
trace_id=trace_id,
role=msg["role"],
sequence=sequence,
content=msg["content"],
parent_sequence=head_seq
)
await trace_store.add_message(message)
history.append(msg)
head_seq = sequence
sequence += 1
return history, sequence, created_messages, head_seq
关键点:
head_sequence 加载历史(支持回溯后的分支)parent_sequence 链接到主路径async def _agent_loop(trace, history, goal_tree, config, sequence):
for iteration in range(config.max_iterations):
# 1. 上下文压缩(两级策略)
if needs_compression(history):
# Level 1: 过滤已完成的 Goals
history = filter_by_goal_status(history, goal_tree)
# Level 2: LLM 总结(如果仍超阈值)
if still_too_large(history):
history = await _compress_history(history, goal_tree)
# 2. 周期性注入上下文(每 10 轮)
if iteration % 10 == 0:
context = _build_context_injection(trace, goal_tree)
history.append({"role": "system", "content": context})
# 3. 调用 LLM
result = await llm_call(
messages=history,
model=config.model,
tools=tool_schemas,
temperature=config.temperature
)
# 4. 保存 assistant 消息
assistant_msg = Message.create(
role="assistant",
content=result["content"],
sequence=sequence,
...
)
await trace_store.add_message(assistant_msg)
history.append(assistant_msg.to_llm_format())
sequence += 1
# 5. 执行工具调用
if result.get("tool_calls"):
for tc in result["tool_calls"]:
tool_result = await _execute_tool(tc, context)
# 保存 tool 消息
tool_msg = Message.create(
role="tool",
content=tool_result.output,
sequence=sequence,
...
)
await trace_store.add_message(tool_msg)
history.append(tool_msg.to_llm_format())
sequence += 1
# 6. 检查终止条件
if no_tool_calls or reached_goal:
break
# 7. 完成反思
if config.enable_reflection:
reflection = await _generate_reflection(history)
await trace_store.update_trace(trace_id, summary=reflection)
关键点:
位置: agent/trace/compaction.py
def filter_by_goal_status(messages, goal_tree):
"""保留 system、user 和活跃 Goal 的消息"""
active_goal_ids = {
g.id for g in goal_tree.goals
if g.status in ["pending", "in_progress"]
}
filtered = []
for msg in messages:
if msg["role"] in ["system", "user"]:
filtered.append(msg)
elif msg.get("goal_id") in active_goal_ids:
filtered.append(msg)
return filtered
async def _compress_history(history, goal_tree):
"""使用 LLM 总结已完成的 Goals"""
completed_goals = [g for g in goal_tree.goals if g.status == "completed"]
for goal in completed_goals:
# 提取该 Goal 的所有消息
goal_messages = [m for m in history if m.get("goal_id") == goal.id]
# 调用 LLM 总结
summary_prompt = build_compression_prompt(goal, goal_messages)
summary = await llm_call(messages=[{"role": "user", "content": summary_prompt}])
# 替换为单条总结消息
summary_msg = {
"role": "system",
"content": f"[Goal {goal.id} 总结]\n{summary}"
}
# 从 history 中移除原消息,插入总结
history = [m for m in history if m.get("goal_id") != goal.id]
history.append(summary_msg)
return history
关键点:
位置: agent/tools/registry.py
@tool(
description="读取文件内容",
requires_confirmation=False,
hidden_params=["uid", "context"]
)
async def read_file(
file_path: str,
uid: str = "",
context: Optional[ToolContext] = None
) -> ToolResult:
"""读取指定文件的内容
Args:
file_path: 文件路径
"""
content = Path(file_path).read_text()
return ToolResult(output=content)
关键特性:
@tool 装饰器自动生成 OpenAI Tool Schemahidden_params 不暴露给 LLM(由框架注入)inject_params 支持自动注入上下文async def _execute_tool(tool_call, context):
tool_name = tool_call["function"]["name"]
args = json.loads(tool_call["function"]["arguments"])
# 1. 获取工具函数
tool_func = registry.get_tool(tool_name)
# 2. 注入隐藏参数
args["uid"] = context.uid
args["context"] = context
# 3. 执行工具
try:
result = await tool_func(**args)
return result
except Exception as e:
return ToolResult(error=str(e))
位置: agent/tools/builtin/subagent.py
@tool(description="启动子 Agent 执行任务")
async def agent(
mission: str,
agent_type: str = "explore",
context: Optional[ToolContext] = None
) -> ToolResult:
"""启动子 Agent
Args:
mission: 子任务描述
agent_type: Agent 类型(explore/analyst/delegate)
"""
# 1. 创建子 Trace
sub_config = RunConfig(
agent_type=agent_type,
parent_trace_id=context.trace_id,
parent_goal_id=context.goal_id,
...
)
# 2. 运行子 Agent
sub_runner = AgentRunner(...)
result = await sub_runner.run_result(
messages=[{"role": "user", "content": mission}],
config=sub_config
)
# 3. 返回结果
return ToolResult(
output=result["summary"],
sub_trace_id=result["trace_id"]
)
关键点:
parent_trace_id 和 parent_goal_id 关联父 Agent位置: agent/memory/skills/
Skill 是注入到 system prompt 的领域知识,支持:
planning.md: 计划管理和 Goal 工具使用research.md: 搜索和内容研究browser.md: 浏览器自动化core.md: 核心能力# 数据分析 Skill
## 能力描述
你是一个数据分析专家,擅长从数据中提取洞察。
## 工作流程
1. 理解分析目标
2. 探索数据结构
3. 清洗和转换数据
4. 应用统计方法
5. 可视化结果
6. 撰写分析报告
## 工具使用
- 使用 `read_file` 加载数据
- 使用 `bash_command` 运行 Python 脚本
- 使用 `write_file` 保存结果
加载方式:
config = RunConfig(
skills=["planning", "research", "data_analysis"],
...
)
位置: agent/core/presets.py
预设定义不同类型 Agent 的工具权限和参数:
AGENT_PRESETS = {
"explore": {
"skills": ["planning", "research"],
"tools": ["read_file", "glob_files", "grep_content", "search_posts"],
"max_iterations": 50,
},
"analyst": {
"skills": ["planning", "research"],
"tools": ["read_file", "bash_command", "write_file"],
"max_iterations": 100,
},
"browser": {
"skills": ["browser"],
"tools": ["browser_*"], # 所有浏览器工具
"max_iterations": 200,
},
}
使用方式:
config = RunConfig(agent_type="explore", ...)
位置: agent/tools/builtin/knowledge.py
支持知识的保存、检索和注入:
@tool(description="保存知识条目")
async def knowledge_save(
title: str,
content: str,
tags: List[str],
context: Optional[ToolContext] = None
) -> ToolResult:
"""保存知识到 KnowHub"""
knowledge_id = await knowhub_client.save(
title=title,
content=content,
tags=tags,
scope=f"user:{context.uid}"
)
return ToolResult(output=f"已保存知识: {knowledge_id}")
@tool(description="搜索知识库")
async def knowledge_search(
query: str,
limit: int = 5,
context: Optional[ToolContext] = None
) -> ToolResult:
"""搜索相关知识"""
results = await knowhub_client.search(
query=query,
scope=f"user:{context.uid}",
limit=limit
)
return ToolResult(output=format_knowledge(results))
# 在 Goal 创建时自动检索相关知识
async def _inject_knowledge_to_goal(goal, context):
if context.knowledge_config.auto_inject:
results = await knowledge_search(
query=goal.description,
limit=3
)
goal.knowledge = results
位置: agent/docs/multimodal.md
支持图片和 PDF 处理:
# 工具返回图片
@tool(description="截图")
async def browser_screenshot(...) -> ToolResult:
screenshot_data = await browser.screenshot()
return ToolResult(
output="截图已生成",
base64_image=screenshot_data
)
# 框架自动转换为 LLM 格式
tool_content = [
{"type": "text", "text": "截图已生成"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
@tool(description="读取 PDF")
async def read_pdf(file_path: str) -> ToolResult:
"""提取 PDF 文本和图片"""
import fitz # PyMuPDF
doc = fitz.open(file_path)
text = ""
images = []
for page in doc:
text += page.get_text()
for img in page.get_images():
images.append(extract_image(img))
return ToolResult(
output=text,
base64_image=images[0] if images else None
)
位置: agent/trace/store.py
.trace/
├── {trace_id}/
│ ├── trace.json # Trace 元数据
│ ├── goal_tree.json # GoalTree
│ ├── messages/
│ │ ├── 1.json # Message 1
│ │ ├── 2.json # Message 2
│ │ ├── 3.json
│ │ └── 3.png # 截图(与 message 同名)
│ └── model_usage.jsonl # 模型使用记录
class FileSystemTraceStore(TraceStore):
async def add_message(self, message: Message):
"""保存消息"""
msg_file = self._get_messages_dir(message.trace_id) / f"{message.sequence}.json"
msg_file.write_text(json.dumps(message.to_dict(), ensure_ascii=False, indent=2))
async def get_messages(self, trace_id: str, after_sequence: int = 0):
"""加载消息"""
msg_dir = self._get_messages_dir(trace_id)
messages = []
for file in sorted(msg_dir.glob("*.json")):
seq = int(file.stem)
if seq > after_sequence:
data = json.loads(file.read_text())
messages.append(Message.from_dict(data))
return messages
@tool 装饰器简化工具开发@tool 装饰器添加业务工具用户输入
↓
Phase 1: 准备 Trace(新建/续跑/回溯)
↓
Phase 2: 构建历史(加载消息 + 注入 Skills)
↓
Phase 3: Agent 循环
├─ 上下文压缩(过滤 + 总结)
├─ 周期性注入(GoalTree + 协作者)
├─ 调用 LLM
├─ 执行工具(可能启动子 Agent)
├─ 更新 GoalTree
└─ 检查终止条件
↓
完成反思(生成总结)
↓
返回结果
这个框架通过清晰的数据结构和执行流程,实现了高度可扩展的 Agent 系统,能够适配各种复杂的业务场景。