Agent Framework Analysis.md 22 KB

Reson Agent 框架执行流程深度分析

一、框架概览

这是一个基于 LLM 的可扩展 Agent 框架,核心理念是 "所有 Agent 都是 Trace"。框架支持多步工具调用、计划管理、子 Agent 协作、回溯重跑和上下文压缩。

核心特点

  • 统一的执行追踪:主 Agent、子 Agent、人类协助都统一为 Trace
  • 计划驱动:通过 GoalTree 管理执行计划
  • 工具生态:支持文件操作、浏览器自动化、命令执行、知识管理等
  • 记忆系统:跨会话的 Experience 和 Skill 管理
  • 上下文管理:两级压缩策略(过滤 + LLM 总结)

二、核心数据结构

2.1 感知层(Perception)- Message

定义位置: agent/trace/models.py

@dataclass
class Message:
    """LLM 交互中的单条消息"""
    message_id: str                    # 唯一标识
    trace_id: str                      # 所属 Trace
    role: Literal["user", "assistant", "tool", "system"]
    sequence: int                      # 消息序号(全局递增)
    parent_sequence: Optional[int]     # 父消息序号(构建消息树)
    goal_id: Optional[str]             # 关联的 Goal ID

    # 内容
    content: Union[str, Dict, List]    # 消息内容(支持多模态)
    tool_call_id: Optional[str]        # 工具调用 ID

    # 统计
    tokens: int = 0
    cost: float = 0.0
    duration_ms: int = 0

    created_at: datetime

关键特性

  • 通过 sequenceparent_sequence 构建消息树(支持回溯)
  • 通过 goal_id 关联到执行计划
  • 支持多模态内容(文本、图片、工具调用结果)

2.2 推理层(Reasoning)- Trace

定义位置: agent/trace/models.py

@dataclass
class Trace:
    """一次完整的 LLM 交互轨迹"""
    trace_id: str
    mode: Literal["call", "agent"]     # 单次调用 or Agent 模式

    # Agent 模式特有
    task: Optional[str]                # 任务描述
    agent_type: Optional[str]          # Agent 类型(explore/analyst等)

    # 父子关系(Sub-Trace)
    parent_trace_id: Optional[str]     # 父 Trace ID
    parent_goal_id: Optional[str]      # 启动该 Trace 的 Goal

    # 状态
    status: Literal["running", "completed", "failed", "stopped"]

    # 统计
    total_messages: int = 0
    total_tokens: int = 0
    total_prompt_tokens: int = 0
    total_completion_tokens: int = 0
    total_reasoning_tokens: int = 0    # o1/DeepSeek R1 推理 tokens
    total_cache_creation_tokens: int = 0
    total_cache_read_tokens: int = 0
    total_cost: float = 0.0
    total_duration_ms: int = 0

    # 进度追踪
    last_sequence: int = 0             # 最新消息序号
    head_sequence: int = 0             # 当前主路径头节点(用于续跑)

    # 配置
    model: Optional[str]
    tools: Optional[List[Dict]]        # 工具定义
    llm_params: Dict[str, Any]

    # 当前焦点 Goal
    current_goal_id: Optional[str]

关键特性

  • 通过 parent_trace_id 构建 Agent 层级关系
  • head_sequence 支持回溯重跑(rewind)
  • 详细的 token 和成本统计

2.3 计划层(Planning)- Goal & GoalTree

定义位置: agent/trace/goal_models.py

@dataclass
class Goal:
    """执行目标节点"""
    id: str                            # 纯自增 ID("1", "2", "3")
    description: str                   # 目标描述
    reason: str                        # 创建理由
    parent_id: Optional[str]           # 父 Goal(构建层级)

    type: GoalType = "normal"          # "normal" | "agent_call"
    status: GoalStatus = "pending"     # "pending" | "in_progress" | "completed" | "abandoned"
    summary: Optional[str]             # 完成/放弃时的总结

    # agent_call 特有
    sub_trace_ids: Optional[List[Dict[str, str]]]  # 启动的子 Trace
    agent_call_mode: Optional[str]     # "explore" | "delegate" | "sequential"

    # 统计
    self_stats: GoalStats              # 自身统计
    cumulative_stats: GoalStats        # 累计统计(含子孙)

    # 知识注入
    knowledge: Optional[List[Dict]]    # 相关知识列表
@dataclass
class GoalTree:
    """目标树 - 管理整个执行计划"""
    mission: str                       # 总体任务
    goals: List[Goal]                  # 扁平列表(通过 parent_id 构建层级)
    current_id: Optional[str]          # 当前焦点 Goal
    _next_id: int = 1                  # 下一个 Goal ID

    # 方法
    def add_goal(...)                  # 添加新 Goal
    def update_goal(...)               # 更新 Goal 状态
    def get_goal(...)                  # 获取 Goal
    def get_children(...)              # 获取子 Goals
    def get_pending_goals(...)         # 获取待执行 Goals
    def rewind_to(...)                 # 回溯到指定时间点

关键特性

  • 扁平列表 + parent_id 构建层级(避免递归复杂度)
  • 支持 agent_call 类型标记子 Agent 启动点
  • 统计信息分为自身和累计(用于可视化)

2.4 记忆层(Memory)- Experience & Skill

定义位置: agent/memory/models.py

@dataclass
class Experience:
    """经验规则(条件 + 规则 + 证据)"""
    exp_id: str
    scope: str                         # "agent:{type}" 或 "user:{uid}"

    # 核心三元组
    condition: str                     # 什么情况下适用
    rule: str                          # 应该怎么做
    evidence: List[str]                # 证据(step_ids)

    # 元数据
    source: Literal["execution", "feedback", "manual"]
    confidence: float = 0.5
    usage_count: int = 0
    success_rate: float = 0.0
@dataclass
class Skill:
    """技能 - 从经验归纳的高层知识"""
    skill_id: str
    scope: str
    name: str
    description: str
    category: str                      # "search", "reasoning", "writing"

    # 层次结构
    parent_id: Optional[str]

    # 内容
    content: Optional[str]             # 完整 Markdown 内容
    guidelines: List[str]
    derived_from: List[str]            # experience_ids

    version: int = 1

关键特性

  • Experience 是底层规则,Skill 是高层知识
  • 通过 scope 实现权限隔离(agent 级 / user 级)
  • Skill 支持 Markdown 格式,可直接注入 system prompt

2.5 执行层(Execution)- ToolResult & ToolContext

定义位置: agent/tools/models.py

@dataclass
class ToolContext:
    """工具执行上下文"""
    trace_id: str
    goal_id: Optional[str]
    uid: Optional[str]
    agent_type: Optional[str]

    # 回调
    trace_store: Optional[TraceStore]
    llm_call: Optional[Callable]

    # 知识管理
    knowledge_config: Optional[KnowledgeConfig]
@dataclass
class ToolResult:
    """工具执行结果"""
    output: str                        # 返回给 LLM 的文本
    base64_image: Optional[str]        # 可选图片(如截图)
    error: Optional[str]               # 错误信息

    # 子 Agent 专用
    sub_trace_id: Optional[str]

    # 模型使用(工具内部调用 LLM 时)
    model_usage: Optional[Dict]

三、执行流程详解

3.1 核心执行循环

入口: AgentRunner.run() - agent/core/runner.py:301

执行分为 3 个阶段

Phase 1: PREPARE TRACE(准备阶段)

async def _prepare_trace(messages, config):
    if config.trace_id:
        # 续跑或回溯:加载已有 Trace 和 GoalTree
        trace = await trace_store.get_trace(config.trace_id)
        goal_tree = await trace_store.get_goal_tree(config.trace_id)

        if config.after_sequence < trace.head_sequence:
            # 回溯模式:重置 GoalTree 到指定时间点
            sequence = await _rewind(trace_id, after_sequence, goal_tree)
        else:
            # 续跑模式:从 last_sequence + 1 开始
            sequence = trace.last_sequence + 1
    else:
        # 新建:创建 Trace 和 GoalTree
        trace = Trace.create(...)
        goal_tree = GoalTree(mission=task, goals=[])
        sequence = 1

    return trace, goal_tree, sequence

关键点

  • 通过 config.trace_idconfig.after_sequence 区分新建/续跑/回溯
  • 回溯时调用 goal_tree.rewind_to() 清理未来的 Goals

Phase 2: BUILD HISTORY(构建历史)

async def _build_history(trace_id, messages, goal_tree, config, sequence):
    # 1. 加载历史消息(从 head_sequence 开始)
    history_messages = await trace_store.get_messages(
        trace_id,
        after_sequence=trace.head_sequence
    )
    history = [msg.to_llm_format() for msg in history_messages]

    # 2. 构建 system prompt
    system_prompt = await _build_system_prompt(config)
    history.insert(0, {"role": "system", "content": system_prompt})

    # 3. 追加新消息
    for msg in messages:
        message = Message.create(
            trace_id=trace_id,
            role=msg["role"],
            sequence=sequence,
            content=msg["content"],
            parent_sequence=head_seq
        )
        await trace_store.add_message(message)
        history.append(msg)
        head_seq = sequence
        sequence += 1

    return history, sequence, created_messages, head_seq

关键点

  • head_sequence 加载历史(支持回溯后的分支)
  • System prompt 包含 Skills、GoalTree、知识等
  • 新消息通过 parent_sequence 链接到主路径

Phase 3: AGENT LOOP(执行循环)

async def _agent_loop(trace, history, goal_tree, config, sequence):
    for iteration in range(config.max_iterations):
        # 1. 上下文压缩(两级策略)
        if needs_compression(history):
            # Level 1: 过滤已完成的 Goals
            history = filter_by_goal_status(history, goal_tree)

            # Level 2: LLM 总结(如果仍超阈值)
            if still_too_large(history):
                history = await _compress_history(history, goal_tree)

        # 2. 周期性注入上下文(每 10 轮)
        if iteration % 10 == 0:
            context = _build_context_injection(trace, goal_tree)
            history.append({"role": "system", "content": context})

        # 3. 调用 LLM
        result = await llm_call(
            messages=history,
            model=config.model,
            tools=tool_schemas,
            temperature=config.temperature
        )

        # 4. 保存 assistant 消息
        assistant_msg = Message.create(
            role="assistant",
            content=result["content"],
            sequence=sequence,
            ...
        )
        await trace_store.add_message(assistant_msg)
        history.append(assistant_msg.to_llm_format())
        sequence += 1

        # 5. 执行工具调用
        if result.get("tool_calls"):
            for tc in result["tool_calls"]:
                tool_result = await _execute_tool(tc, context)

                # 保存 tool 消息
                tool_msg = Message.create(
                    role="tool",
                    content=tool_result.output,
                    sequence=sequence,
                    ...
                )
                await trace_store.add_message(tool_msg)
                history.append(tool_msg.to_llm_format())
                sequence += 1

        # 6. 检查终止条件
        if no_tool_calls or reached_goal:
            break

    # 7. 完成反思
    if config.enable_reflection:
        reflection = await _generate_reflection(history)
        await trace_store.update_trace(trace_id, summary=reflection)

关键点

  • 两级压缩:先过滤已完成 Goals,再 LLM 总结
  • 周期性上下文注入:避免 Agent 迷失方向
  • 工具执行:支持同步和异步工具
  • 完成反思:生成任务总结

3.2 上下文压缩策略

位置: agent/trace/compaction.py

Level 1: 过滤策略

def filter_by_goal_status(messages, goal_tree):
    """保留 system、user 和活跃 Goal 的消息"""
    active_goal_ids = {
        g.id for g in goal_tree.goals
        if g.status in ["pending", "in_progress"]
    }

    filtered = []
    for msg in messages:
        if msg["role"] in ["system", "user"]:
            filtered.append(msg)
        elif msg.get("goal_id") in active_goal_ids:
            filtered.append(msg)

    return filtered

Level 2: LLM 总结

async def _compress_history(history, goal_tree):
    """使用 LLM 总结已完成的 Goals"""
    completed_goals = [g for g in goal_tree.goals if g.status == "completed"]

    for goal in completed_goals:
        # 提取该 Goal 的所有消息
        goal_messages = [m for m in history if m.get("goal_id") == goal.id]

        # 调用 LLM 总结
        summary_prompt = build_compression_prompt(goal, goal_messages)
        summary = await llm_call(messages=[{"role": "user", "content": summary_prompt}])

        # 替换为单条总结消息
        summary_msg = {
            "role": "system",
            "content": f"[Goal {goal.id} 总结]\n{summary}"
        }

        # 从 history 中移除原消息,插入总结
        history = [m for m in history if m.get("goal_id") != goal.id]
        history.append(summary_msg)

    return history

关键点

  • Level 1 快速过滤,无需 LLM 调用
  • Level 2 保留语义信息,避免信息丢失
  • 压缩后的消息仍保留在 Trace 中(可回溯)

3.3 工具执行机制

位置: agent/tools/registry.py

工具注册

@tool(
    description="读取文件内容",
    requires_confirmation=False,
    hidden_params=["uid", "context"]
)
async def read_file(
    file_path: str,
    uid: str = "",
    context: Optional[ToolContext] = None
) -> ToolResult:
    """读取指定文件的内容

    Args:
        file_path: 文件路径
    """
    content = Path(file_path).read_text()
    return ToolResult(output=content)

关键特性

  • @tool 装饰器自动生成 OpenAI Tool Schema
  • hidden_params 不暴露给 LLM(由框架注入)
  • inject_params 支持自动注入上下文

工具调用流程

async def _execute_tool(tool_call, context):
    tool_name = tool_call["function"]["name"]
    args = json.loads(tool_call["function"]["arguments"])

    # 1. 获取工具函数
    tool_func = registry.get_tool(tool_name)

    # 2. 注入隐藏参数
    args["uid"] = context.uid
    args["context"] = context

    # 3. 执行工具
    try:
        result = await tool_func(**args)
        return result
    except Exception as e:
        return ToolResult(error=str(e))

3.4 子 Agent 协作

位置: agent/tools/builtin/subagent.py

agent 工具

@tool(description="启动子 Agent 执行任务")
async def agent(
    mission: str,
    agent_type: str = "explore",
    context: Optional[ToolContext] = None
) -> ToolResult:
    """启动子 Agent

    Args:
        mission: 子任务描述
        agent_type: Agent 类型(explore/analyst/delegate)
    """
    # 1. 创建子 Trace
    sub_config = RunConfig(
        agent_type=agent_type,
        parent_trace_id=context.trace_id,
        parent_goal_id=context.goal_id,
        ...
    )

    # 2. 运行子 Agent
    sub_runner = AgentRunner(...)
    result = await sub_runner.run_result(
        messages=[{"role": "user", "content": mission}],
        config=sub_config
    )

    # 3. 返回结果
    return ToolResult(
        output=result["summary"],
        sub_trace_id=result["trace_id"]
    )

关键点

  • 子 Agent 通过 parent_trace_idparent_goal_id 关联父 Agent
  • 支持多种 Agent 类型(explore/analyst/delegate)
  • 子 Agent 的结果作为工具调用结果返回

四、业务场景适配

4.1 Skill 系统

位置: agent/memory/skills/

Skill 是注入到 system prompt 的领域知识,支持:

内置 Skills

  • planning.md: 计划管理和 Goal 工具使用
  • research.md: 搜索和内容研究
  • browser.md: 浏览器自动化
  • core.md: 核心能力

自定义 Skills

# 数据分析 Skill

## 能力描述
你是一个数据分析专家,擅长从数据中提取洞察。

## 工作流程
1. 理解分析目标
2. 探索数据结构
3. 清洗和转换数据
4. 应用统计方法
5. 可视化结果
6. 撰写分析报告

## 工具使用
- 使用 `read_file` 加载数据
- 使用 `bash_command` 运行 Python 脚本
- 使用 `write_file` 保存结果

加载方式

config = RunConfig(
    skills=["planning", "research", "data_analysis"],
    ...
)

4.2 Agent 预设

位置: agent/core/presets.py

预设定义不同类型 Agent 的工具权限和参数:

AGENT_PRESETS = {
    "explore": {
        "skills": ["planning", "research"],
        "tools": ["read_file", "glob_files", "grep_content", "search_posts"],
        "max_iterations": 50,
    },
    "analyst": {
        "skills": ["planning", "research"],
        "tools": ["read_file", "bash_command", "write_file"],
        "max_iterations": 100,
    },
    "browser": {
        "skills": ["browser"],
        "tools": ["browser_*"],  # 所有浏览器工具
        "max_iterations": 200,
    },
}

使用方式

config = RunConfig(agent_type="explore", ...)

4.3 知识管理

位置: agent/tools/builtin/knowledge.py

支持知识的保存、检索和注入:

知识保存

@tool(description="保存知识条目")
async def knowledge_save(
    title: str,
    content: str,
    tags: List[str],
    context: Optional[ToolContext] = None
) -> ToolResult:
    """保存知识到 KnowHub"""
    knowledge_id = await knowhub_client.save(
        title=title,
        content=content,
        tags=tags,
        scope=f"user:{context.uid}"
    )
    return ToolResult(output=f"已保存知识: {knowledge_id}")

知识检索

@tool(description="搜索知识库")
async def knowledge_search(
    query: str,
    limit: int = 5,
    context: Optional[ToolContext] = None
) -> ToolResult:
    """搜索相关知识"""
    results = await knowhub_client.search(
        query=query,
        scope=f"user:{context.uid}",
        limit=limit
    )
    return ToolResult(output=format_knowledge(results))

自动注入

# 在 Goal 创建时自动检索相关知识
async def _inject_knowledge_to_goal(goal, context):
    if context.knowledge_config.auto_inject:
        results = await knowledge_search(
            query=goal.description,
            limit=3
        )
        goal.knowledge = results

4.4 多模态支持

位置: agent/docs/multimodal.md

支持图片和 PDF 处理:

图片处理

# 工具返回图片
@tool(description="截图")
async def browser_screenshot(...) -> ToolResult:
    screenshot_data = await browser.screenshot()
    return ToolResult(
        output="截图已生成",
        base64_image=screenshot_data
    )

# 框架自动转换为 LLM 格式
tool_content = [
    {"type": "text", "text": "截图已生成"},
    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]

PDF 处理

@tool(description="读取 PDF")
async def read_pdf(file_path: str) -> ToolResult:
    """提取 PDF 文本和图片"""
    import fitz  # PyMuPDF

    doc = fitz.open(file_path)
    text = ""
    images = []

    for page in doc:
        text += page.get_text()
        for img in page.get_images():
            images.append(extract_image(img))

    return ToolResult(
        output=text,
        base64_image=images[0] if images else None
    )

五、数据存储

5.1 文件系统存储

位置: agent/trace/store.py

.trace/
├── {trace_id}/
│   ├── trace.json          # Trace 元数据
│   ├── goal_tree.json      # GoalTree
│   ├── messages/
│   │   ├── 1.json          # Message 1
│   │   ├── 2.json          # Message 2
│   │   ├── 3.json
│   │   └── 3.png           # 截图(与 message 同名)
│   └── model_usage.jsonl   # 模型使用记录

5.2 数据持久化

class FileSystemTraceStore(TraceStore):
    async def add_message(self, message: Message):
        """保存消息"""
        msg_file = self._get_messages_dir(message.trace_id) / f"{message.sequence}.json"
        msg_file.write_text(json.dumps(message.to_dict(), ensure_ascii=False, indent=2))

    async def get_messages(self, trace_id: str, after_sequence: int = 0):
        """加载消息"""
        msg_dir = self._get_messages_dir(trace_id)
        messages = []
        for file in sorted(msg_dir.glob("*.json")):
            seq = int(file.stem)
            if seq > after_sequence:
                data = json.loads(file.read_text())
                messages.append(Message.from_dict(data))
        return messages

六、总结

核心优势

  1. 统一抽象:所有 Agent 都是 Trace,简化了架构
  2. 灵活计划:GoalTree 支持动态调整和回溯
  3. 可扩展工具@tool 装饰器简化工具开发
  4. 智能压缩:两级策略平衡性能和语义
  5. 跨会话记忆:Experience 和 Skill 支持持续学习

适配业务场景的关键

  1. 自定义 Skills:编写领域专属的 Markdown 文档
  2. 注册工具:使用 @tool 装饰器添加业务工具
  3. 配置预设:定义不同场景的 Agent 类型
  4. 知识管理:利用 KnowHub 存储和检索领域知识
  5. 多模态处理:支持图片、PDF 等富媒体输入

执行流程总结

用户输入
  ↓
Phase 1: 准备 Trace(新建/续跑/回溯)
  ↓
Phase 2: 构建历史(加载消息 + 注入 Skills)
  ↓
Phase 3: Agent 循环
  ├─ 上下文压缩(过滤 + 总结)
  ├─ 周期性注入(GoalTree + 协作者)
  ├─ 调用 LLM
  ├─ 执行工具(可能启动子 Agent)
  ├─ 更新 GoalTree
  └─ 检查终止条件
  ↓
完成反思(生成总结)
  ↓
返回结果

这个框架通过清晰的数据结构和执行流程,实现了高度可扩展的 Agent 系统,能够适配各种复杂的业务场景。