# Reson Agent 框架执行流程深度分析 ## 一、框架概览 这是一个基于 LLM 的可扩展 Agent 框架,核心理念是 **"所有 Agent 都是 Trace"**。框架支持多步工具调用、计划管理、子 Agent 协作、回溯重跑和上下文压缩。 ### 核心特点 - **统一的执行追踪**:主 Agent、子 Agent、人类协助都统一为 Trace - **计划驱动**:通过 GoalTree 管理执行计划 - **工具生态**:支持文件操作、浏览器自动化、命令执行、知识管理等 - **记忆系统**:跨会话的 Experience 和 Skill 管理 - **上下文管理**:两级压缩策略(过滤 + LLM 总结) --- ## 二、核心数据结构 ### 2.1 感知层(Perception)- Message **定义位置**: `agent/trace/models.py` ```python @dataclass class Message: """LLM 交互中的单条消息""" message_id: str # 唯一标识 trace_id: str # 所属 Trace role: Literal["user", "assistant", "tool", "system"] sequence: int # 消息序号(全局递增) parent_sequence: Optional[int] # 父消息序号(构建消息树) goal_id: Optional[str] # 关联的 Goal ID # 内容 content: Union[str, Dict, List] # 消息内容(支持多模态) tool_call_id: Optional[str] # 工具调用 ID # 统计 tokens: int = 0 cost: float = 0.0 duration_ms: int = 0 created_at: datetime ``` **关键特性**: - 通过 `sequence` 和 `parent_sequence` 构建消息树(支持回溯) - 通过 `goal_id` 关联到执行计划 - 支持多模态内容(文本、图片、工具调用结果) ### 2.2 推理层(Reasoning)- Trace **定义位置**: `agent/trace/models.py` ```python @dataclass class Trace: """一次完整的 LLM 交互轨迹""" trace_id: str mode: Literal["call", "agent"] # 单次调用 or Agent 模式 # Agent 模式特有 task: Optional[str] # 任务描述 agent_type: Optional[str] # Agent 类型(explore/analyst等) # 父子关系(Sub-Trace) parent_trace_id: Optional[str] # 父 Trace ID parent_goal_id: Optional[str] # 启动该 Trace 的 Goal # 状态 status: Literal["running", "completed", "failed", "stopped"] # 统计 total_messages: int = 0 total_tokens: int = 0 total_prompt_tokens: int = 0 total_completion_tokens: int = 0 total_reasoning_tokens: int = 0 # o1/DeepSeek R1 推理 tokens total_cache_creation_tokens: int = 0 total_cache_read_tokens: int = 0 total_cost: float = 0.0 total_duration_ms: int = 0 # 进度追踪 last_sequence: int = 0 # 最新消息序号 head_sequence: int = 0 # 当前主路径头节点(用于续跑) # 配置 model: Optional[str] tools: Optional[List[Dict]] # 工具定义 llm_params: Dict[str, Any] # 当前焦点 Goal current_goal_id: Optional[str] ``` **关键特性**: - 通过 `parent_trace_id` 构建 Agent 层级关系 - `head_sequence` 支持回溯重跑(rewind) - 详细的 token 和成本统计 ### 2.3 计划层(Planning)- Goal & GoalTree **定义位置**: `agent/trace/goal_models.py` ```python @dataclass class Goal: """执行目标节点""" id: str # 纯自增 ID("1", "2", "3") description: str # 目标描述 reason: str # 创建理由 parent_id: Optional[str] # 父 Goal(构建层级) type: GoalType = "normal" # "normal" | "agent_call" status: GoalStatus = "pending" # "pending" | "in_progress" | "completed" | "abandoned" summary: Optional[str] # 完成/放弃时的总结 # agent_call 特有 sub_trace_ids: Optional[List[Dict[str, str]]] # 启动的子 Trace agent_call_mode: Optional[str] # "explore" | "delegate" | "sequential" # 统计 self_stats: GoalStats # 自身统计 cumulative_stats: GoalStats # 累计统计(含子孙) # 知识注入 knowledge: Optional[List[Dict]] # 相关知识列表 ``` ```python @dataclass class GoalTree: """目标树 - 管理整个执行计划""" mission: str # 总体任务 goals: List[Goal] # 扁平列表(通过 parent_id 构建层级) current_id: Optional[str] # 当前焦点 Goal _next_id: int = 1 # 下一个 Goal ID # 方法 def add_goal(...) # 添加新 Goal def update_goal(...) # 更新 Goal 状态 def get_goal(...) # 获取 Goal def get_children(...) # 获取子 Goals def get_pending_goals(...) # 获取待执行 Goals def rewind_to(...) # 回溯到指定时间点 ``` **关键特性**: - 扁平列表 + `parent_id` 构建层级(避免递归复杂度) - 支持 `agent_call` 类型标记子 Agent 启动点 - 统计信息分为自身和累计(用于可视化) ### 2.4 记忆层(Memory)- Experience & Skill **定义位置**: `agent/memory/models.py` ```python @dataclass class Experience: """经验规则(条件 + 规则 + 证据)""" exp_id: str scope: str # "agent:{type}" 或 "user:{uid}" # 核心三元组 condition: str # 什么情况下适用 rule: str # 应该怎么做 evidence: List[str] # 证据(step_ids) # 元数据 source: Literal["execution", "feedback", "manual"] confidence: float = 0.5 usage_count: int = 0 success_rate: float = 0.0 ``` ```python @dataclass class Skill: """技能 - 从经验归纳的高层知识""" skill_id: str scope: str name: str description: str category: str # "search", "reasoning", "writing" # 层次结构 parent_id: Optional[str] # 内容 content: Optional[str] # 完整 Markdown 内容 guidelines: List[str] derived_from: List[str] # experience_ids version: int = 1 ``` **关键特性**: - Experience 是底层规则,Skill 是高层知识 - 通过 `scope` 实现权限隔离(agent 级 / user 级) - Skill 支持 Markdown 格式,可直接注入 system prompt ### 2.5 执行层(Execution)- ToolResult & ToolContext **定义位置**: `agent/tools/models.py` ```python @dataclass class ToolContext: """工具执行上下文""" trace_id: str goal_id: Optional[str] uid: Optional[str] agent_type: Optional[str] # 回调 trace_store: Optional[TraceStore] llm_call: Optional[Callable] # 知识管理 knowledge_config: Optional[KnowledgeConfig] ``` ```python @dataclass class ToolResult: """工具执行结果""" output: str # 返回给 LLM 的文本 base64_image: Optional[str] # 可选图片(如截图) error: Optional[str] # 错误信息 # 子 Agent 专用 sub_trace_id: Optional[str] # 模型使用(工具内部调用 LLM 时) model_usage: Optional[Dict] ``` --- ## 三、执行流程详解 ### 3.1 核心执行循环 **入口**: `AgentRunner.run()` - `agent/core/runner.py:301` 执行分为 **3 个阶段**: #### Phase 1: PREPARE TRACE(准备阶段) ```python async def _prepare_trace(messages, config): if config.trace_id: # 续跑或回溯:加载已有 Trace 和 GoalTree trace = await trace_store.get_trace(config.trace_id) goal_tree = await trace_store.get_goal_tree(config.trace_id) if config.after_sequence < trace.head_sequence: # 回溯模式:重置 GoalTree 到指定时间点 sequence = await _rewind(trace_id, after_sequence, goal_tree) else: # 续跑模式:从 last_sequence + 1 开始 sequence = trace.last_sequence + 1 else: # 新建:创建 Trace 和 GoalTree trace = Trace.create(...) goal_tree = GoalTree(mission=task, goals=[]) sequence = 1 return trace, goal_tree, sequence ``` **关键点**: - 通过 `config.trace_id` 和 `config.after_sequence` 区分新建/续跑/回溯 - 回溯时调用 `goal_tree.rewind_to()` 清理未来的 Goals #### Phase 2: BUILD HISTORY(构建历史) ```python async def _build_history(trace_id, messages, goal_tree, config, sequence): # 1. 加载历史消息(从 head_sequence 开始) history_messages = await trace_store.get_messages( trace_id, after_sequence=trace.head_sequence ) history = [msg.to_llm_format() for msg in history_messages] # 2. 构建 system prompt system_prompt = await _build_system_prompt(config) history.insert(0, {"role": "system", "content": system_prompt}) # 3. 追加新消息 for msg in messages: message = Message.create( trace_id=trace_id, role=msg["role"], sequence=sequence, content=msg["content"], parent_sequence=head_seq ) await trace_store.add_message(message) history.append(msg) head_seq = sequence sequence += 1 return history, sequence, created_messages, head_seq ``` **关键点**: - 从 `head_sequence` 加载历史(支持回溯后的分支) - System prompt 包含 Skills、GoalTree、知识等 - 新消息通过 `parent_sequence` 链接到主路径 #### Phase 3: AGENT LOOP(执行循环) ```python async def _agent_loop(trace, history, goal_tree, config, sequence): for iteration in range(config.max_iterations): # 1. 上下文压缩(两级策略) if needs_compression(history): # Level 1: 过滤已完成的 Goals history = filter_by_goal_status(history, goal_tree) # Level 2: LLM 总结(如果仍超阈值) if still_too_large(history): history = await _compress_history(history, goal_tree) # 2. 周期性注入上下文(每 10 轮) if iteration % 10 == 0: context = _build_context_injection(trace, goal_tree) history.append({"role": "system", "content": context}) # 3. 调用 LLM result = await llm_call( messages=history, model=config.model, tools=tool_schemas, temperature=config.temperature ) # 4. 保存 assistant 消息 assistant_msg = Message.create( role="assistant", content=result["content"], sequence=sequence, ... ) await trace_store.add_message(assistant_msg) history.append(assistant_msg.to_llm_format()) sequence += 1 # 5. 执行工具调用 if result.get("tool_calls"): for tc in result["tool_calls"]: tool_result = await _execute_tool(tc, context) # 保存 tool 消息 tool_msg = Message.create( role="tool", content=tool_result.output, sequence=sequence, ... ) await trace_store.add_message(tool_msg) history.append(tool_msg.to_llm_format()) sequence += 1 # 6. 检查终止条件 if no_tool_calls or reached_goal: break # 7. 完成反思 if config.enable_reflection: reflection = await _generate_reflection(history) await trace_store.update_trace(trace_id, summary=reflection) ``` **关键点**: - **两级压缩**:先过滤已完成 Goals,再 LLM 总结 - **周期性上下文注入**:避免 Agent 迷失方向 - **工具执行**:支持同步和异步工具 - **完成反思**:生成任务总结 ### 3.2 上下文压缩策略 **位置**: `agent/trace/compaction.py` #### Level 1: 过滤策略 ```python def filter_by_goal_status(messages, goal_tree): """保留 system、user 和活跃 Goal 的消息""" active_goal_ids = { g.id for g in goal_tree.goals if g.status in ["pending", "in_progress"] } filtered = [] for msg in messages: if msg["role"] in ["system", "user"]: filtered.append(msg) elif msg.get("goal_id") in active_goal_ids: filtered.append(msg) return filtered ``` #### Level 2: LLM 总结 ```python async def _compress_history(history, goal_tree): """使用 LLM 总结已完成的 Goals""" completed_goals = [g for g in goal_tree.goals if g.status == "completed"] for goal in completed_goals: # 提取该 Goal 的所有消息 goal_messages = [m for m in history if m.get("goal_id") == goal.id] # 调用 LLM 总结 summary_prompt = build_compression_prompt(goal, goal_messages) summary = await llm_call(messages=[{"role": "user", "content": summary_prompt}]) # 替换为单条总结消息 summary_msg = { "role": "system", "content": f"[Goal {goal.id} 总结]\n{summary}" } # 从 history 中移除原消息,插入总结 history = [m for m in history if m.get("goal_id") != goal.id] history.append(summary_msg) return history ``` **关键点**: - Level 1 快速过滤,无需 LLM 调用 - Level 2 保留语义信息,避免信息丢失 - 压缩后的消息仍保留在 Trace 中(可回溯) ### 3.3 工具执行机制 **位置**: `agent/tools/registry.py` #### 工具注册 ```python @tool( description="读取文件内容", requires_confirmation=False, hidden_params=["uid", "context"] ) async def read_file( file_path: str, uid: str = "", context: Optional[ToolContext] = None ) -> ToolResult: """读取指定文件的内容 Args: file_path: 文件路径 """ content = Path(file_path).read_text() return ToolResult(output=content) ``` **关键特性**: - `@tool` 装饰器自动生成 OpenAI Tool Schema - `hidden_params` 不暴露给 LLM(由框架注入) - `inject_params` 支持自动注入上下文 #### 工具调用流程 ```python async def _execute_tool(tool_call, context): tool_name = tool_call["function"]["name"] args = json.loads(tool_call["function"]["arguments"]) # 1. 获取工具函数 tool_func = registry.get_tool(tool_name) # 2. 注入隐藏参数 args["uid"] = context.uid args["context"] = context # 3. 执行工具 try: result = await tool_func(**args) return result except Exception as e: return ToolResult(error=str(e)) ``` ### 3.4 子 Agent 协作 **位置**: `agent/tools/builtin/subagent.py` #### agent 工具 ```python @tool(description="启动子 Agent 执行任务") async def agent( mission: str, agent_type: str = "explore", context: Optional[ToolContext] = None ) -> ToolResult: """启动子 Agent Args: mission: 子任务描述 agent_type: Agent 类型(explore/analyst/delegate) """ # 1. 创建子 Trace sub_config = RunConfig( agent_type=agent_type, parent_trace_id=context.trace_id, parent_goal_id=context.goal_id, ... ) # 2. 运行子 Agent sub_runner = AgentRunner(...) result = await sub_runner.run_result( messages=[{"role": "user", "content": mission}], config=sub_config ) # 3. 返回结果 return ToolResult( output=result["summary"], sub_trace_id=result["trace_id"] ) ``` **关键点**: - 子 Agent 通过 `parent_trace_id` 和 `parent_goal_id` 关联父 Agent - 支持多种 Agent 类型(explore/analyst/delegate) - 子 Agent 的结果作为工具调用结果返回 --- ## 四、业务场景适配 ### 4.1 Skill 系统 **位置**: `agent/memory/skills/` Skill 是注入到 system prompt 的领域知识,支持: #### 内置 Skills - `planning.md`: 计划管理和 Goal 工具使用 - `research.md`: 搜索和内容研究 - `browser.md`: 浏览器自动化 - `core.md`: 核心能力 #### 自定义 Skills ```markdown # 数据分析 Skill ## 能力描述 你是一个数据分析专家,擅长从数据中提取洞察。 ## 工作流程 1. 理解分析目标 2. 探索数据结构 3. 清洗和转换数据 4. 应用统计方法 5. 可视化结果 6. 撰写分析报告 ## 工具使用 - 使用 `read_file` 加载数据 - 使用 `bash_command` 运行 Python 脚本 - 使用 `write_file` 保存结果 ``` **加载方式**: ```python config = RunConfig( skills=["planning", "research", "data_analysis"], ... ) ``` ### 4.2 Agent 预设 **位置**: `agent/core/presets.py` 预设定义不同类型 Agent 的工具权限和参数: ```python AGENT_PRESETS = { "explore": { "skills": ["planning", "research"], "tools": ["read_file", "glob_files", "grep_content", "search_posts"], "max_iterations": 50, }, "analyst": { "skills": ["planning", "research"], "tools": ["read_file", "bash_command", "write_file"], "max_iterations": 100, }, "browser": { "skills": ["browser"], "tools": ["browser_*"], # 所有浏览器工具 "max_iterations": 200, }, } ``` **使用方式**: ```python config = RunConfig(agent_type="explore", ...) ``` ### 4.3 知识管理 **位置**: `agent/tools/builtin/knowledge.py` 支持知识的保存、检索和注入: #### 知识保存 ```python @tool(description="保存知识条目") async def knowledge_save( title: str, content: str, tags: List[str], context: Optional[ToolContext] = None ) -> ToolResult: """保存知识到 KnowHub""" knowledge_id = await knowhub_client.save( title=title, content=content, tags=tags, scope=f"user:{context.uid}" ) return ToolResult(output=f"已保存知识: {knowledge_id}") ``` #### 知识检索 ```python @tool(description="搜索知识库") async def knowledge_search( query: str, limit: int = 5, context: Optional[ToolContext] = None ) -> ToolResult: """搜索相关知识""" results = await knowhub_client.search( query=query, scope=f"user:{context.uid}", limit=limit ) return ToolResult(output=format_knowledge(results)) ``` #### 自动注入 ```python # 在 Goal 创建时自动检索相关知识 async def _inject_knowledge_to_goal(goal, context): if context.knowledge_config.auto_inject: results = await knowledge_search( query=goal.description, limit=3 ) goal.knowledge = results ``` ### 4.4 多模态支持 **位置**: `agent/docs/multimodal.md` 支持图片和 PDF 处理: #### 图片处理 ```python # 工具返回图片 @tool(description="截图") async def browser_screenshot(...) -> ToolResult: screenshot_data = await browser.screenshot() return ToolResult( output="截图已生成", base64_image=screenshot_data ) # 框架自动转换为 LLM 格式 tool_content = [ {"type": "text", "text": "截图已生成"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} ] ``` #### PDF 处理 ```python @tool(description="读取 PDF") async def read_pdf(file_path: str) -> ToolResult: """提取 PDF 文本和图片""" import fitz # PyMuPDF doc = fitz.open(file_path) text = "" images = [] for page in doc: text += page.get_text() for img in page.get_images(): images.append(extract_image(img)) return ToolResult( output=text, base64_image=images[0] if images else None ) ``` --- ## 五、数据存储 ### 5.1 文件系统存储 **位置**: `agent/trace/store.py` ``` .trace/ ├── {trace_id}/ │ ├── trace.json # Trace 元数据 │ ├── goal_tree.json # GoalTree │ ├── messages/ │ │ ├── 1.json # Message 1 │ │ ├── 2.json # Message 2 │ │ ├── 3.json │ │ └── 3.png # 截图(与 message 同名) │ └── model_usage.jsonl # 模型使用记录 ``` ### 5.2 数据持久化 ```python class FileSystemTraceStore(TraceStore): async def add_message(self, message: Message): """保存消息""" msg_file = self._get_messages_dir(message.trace_id) / f"{message.sequence}.json" msg_file.write_text(json.dumps(message.to_dict(), ensure_ascii=False, indent=2)) async def get_messages(self, trace_id: str, after_sequence: int = 0): """加载消息""" msg_dir = self._get_messages_dir(trace_id) messages = [] for file in sorted(msg_dir.glob("*.json")): seq = int(file.stem) if seq > after_sequence: data = json.loads(file.read_text()) messages.append(Message.from_dict(data)) return messages ``` --- ## 六、总结 ### 核心优势 1. **统一抽象**:所有 Agent 都是 Trace,简化了架构 2. **灵活计划**:GoalTree 支持动态调整和回溯 3. **可扩展工具**:`@tool` 装饰器简化工具开发 4. **智能压缩**:两级策略平衡性能和语义 5. **跨会话记忆**:Experience 和 Skill 支持持续学习 ### 适配业务场景的关键 1. **自定义 Skills**:编写领域专属的 Markdown 文档 2. **注册工具**:使用 `@tool` 装饰器添加业务工具 3. **配置预设**:定义不同场景的 Agent 类型 4. **知识管理**:利用 KnowHub 存储和检索领域知识 5. **多模态处理**:支持图片、PDF 等富媒体输入 ### 执行流程总结 ``` 用户输入 ↓ Phase 1: 准备 Trace(新建/续跑/回溯) ↓ Phase 2: 构建历史(加载消息 + 注入 Skills) ↓ Phase 3: Agent 循环 ├─ 上下文压缩(过滤 + 总结) ├─ 周期性注入(GoalTree + 协作者) ├─ 调用 LLM ├─ 执行工具(可能启动子 Agent) ├─ 更新 GoalTree └─ 检查终止条件 ↓ 完成反思(生成总结) ↓ 返回结果 ``` 这个框架通过清晰的数据结构和执行流程,实现了高度可扩展的 Agent 系统,能够适配各种复杂的业务场景。