architecture.md 52 KB

Agent Core 架构设计

本文档描述 Agent Core 模块的完整架构设计。

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖
  2. 文档分层,链接代码 - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议,或大量代码;决策依据或修改日志若有必要,可在 decisions.md 另行记录

系统概览

核心理念:所有 Agent 都是 Trace

类型 创建方式 父子关系 状态
主 Agent 直接调用 runner.run() 无 parent 正常执行
子 Agent 通过 agent 工具 parent_trace_id / parent_goal_id 指向父 正常执行
人类协助 通过 ask_human 工具 parent_trace_id 指向父 阻塞等待

核心架构

模块结构

agent/
├── core/                  # 核心引擎
│   ├── runner.py          # AgentRunner + 运行时配置
│   └── presets.py         # Agent 预设(explore、analyst 等)
│
├── trace/                 # 执行追踪(含计划管理)
│   ├── models.py          # Trace, Message
│   ├── goal_models.py     # Goal, GoalTree, GoalStats
│   ├── protocols.py       # TraceStore 接口
│   ├── store.py           # FileSystemTraceStore 实现
│   ├── goal_tool.py       # goal 工具(计划管理)
│   ├── compaction.py      # Context 压缩
│   ├── api.py             # REST API
│   ├── websocket.py       # WebSocket API
│   └── trace_id.py        # Trace ID 生成工具
│
├── tools/                 # 外部交互工具
│   ├── registry.py        # 工具注册表
│   ├── schema.py          # Schema 生成器
│   ├── models.py          # ToolResult, ToolContext
│   └── builtin/
│       ├── file/          # 文件操作(read, write, edit, glob, grep)
│       ├── browser/       # 浏览器自动化
│       ├── bash.py        # 命令执行
│       ├── sandbox.py     # 沙箱环境
│       ├── search.py      # 网络搜索
│       ├── webfetch.py    # 网页抓取
│       ├── skill.py       # 技能加载
│       └── subagent.py    # agent / evaluate 工具(子 Agent 创建与评估)
│
├── skill/                 # 技能系统
│   ├── models.py          # Skill
│   ├── skill_loader.py    # Skill 加载器
│   └── skills/            # 内置 Skills(自动注入 system prompt)
│       ├── planning.md    # 计划与 Goal 工具使用
│       ├── research.md    # 搜索与内容研究
│       └── browser.md     # 浏览器自动化
│
├── llm/                   # LLM 集成
│   ├── gemini.py          # Gemini Provider
│   ├── openrouter.py      # OpenRouter Provider(OpenAI 兼容格式)
│   ├── yescode.py         # Yescode Provider(Anthropic 原生 Messages API)
│   └── prompts/           # Prompt 工具

职责划分

模块 职责
core/ Agent 执行引擎 + 预设配置
trace/ 执行追踪 + 计划管理
tools/ 与外部世界交互(文件、命令、网络、浏览器)
skill/ 技能系统(Skills)
llm/ LLM Provider 适配

三层记忆模型

┌─────────────────────────────────────────────────────────────┐
│ Layer 3: Skills(技能库)                                     │
│ - Markdown 文件,存储领域知识和能力描述                        │
│ - 通过 skill 工具按需加载到对话历史                            │
└─────────────────────────────────────────────────────────────┘
                              ▲
                              │ 归纳
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Experience(经验库)                                 │
│ - 数据库存储,条件 + 规则 + 证据                              │
│ - 向量检索,注入到 system prompt                              │
└─────────────────────────────────────────────────────────────┘
                              ▲
                              │ 提取
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Trace(任务状态)                                    │
│ - 当前任务的工作记忆                                          │
│ - Trace + Messages 记录执行过程                               │
│ - Goals 管理执行计划                                          │
└─────────────────────────────────────────────────────────────┘

LLM Provider 适配

内部格式

框架内部统一使用 OpenAI 兼容格式(List[Dict])存储和传递消息。各 Provider 负责双向转换:

方向 说明
入(LLM 响应 → 框架) 提取 content、tool_calls、usage,转换为统一 Dict
出(框架 → LLM 请求) OpenAI 格式消息列表 → 各 API 原生格式

工具消息分组

存储层每个 tool result 独立一条 Message(OpenAI 格式最大公约数)。各 Provider 在出方向按 API 要求自行分组:

Provider 分组方式
OpenRouter 无需分组(OpenAI 原生支持独立 tool 消息)
Yescode _convert_messages_to_anthropic 合并连续 tool 消息为单个 user message
Gemini _convert_messages_to_gemini 通过 buffer 合并连续 tool 消息

跨 Provider 续跑:tool_call_id 规范化

不同 Provider 生成的 tool_call_id 格式不同(OpenAI: call_xxx,Anthropic: toolu_xxx,Gemini: 合成 call_0)。存储层按原样保存,不做规范化。

跨 Provider 续跑时,出方向转换前检测历史中的 tool_call_id 格式,不兼容时统一重写为目标格式(保持 tool_use / tool_result 配对一致)。同格式跳过,零开销。Gemini 按 function name 匹配,无需重写。

实现agent/llm/openrouter.py:_normalize_tool_call_ids, agent/llm/yescode.py:_normalize_tool_call_ids


核心流程:Agent Loop

参数分层

Layer 1: Infrastructure(基础设施,AgentRunner 构造时设置)
  trace_store, memory_store, tool_registry, llm_call, skills_dir, utility_llm_call

Layer 2: RunConfig(运行参数,每次 run 时指定)
  ├─ 模型层:model, temperature, max_iterations, tools
  └─ 框架层:trace_id, agent_type, uid, system_prompt, parent_trace_id, ...

Layer 3: Messages(任务消息,OpenAI SDK 格式 List[Dict])
  [{"role": "user", "content": "分析这张图的构图"}]

RunConfig

@dataclass
class RunConfig:
    # 模型层参数
    model: str = "gpt-4o"
    temperature: float = 0.3
    max_iterations: int = 200
    tools: Optional[List[str]] = None          # None = 全部已注册工具

    # 框架层参数
    agent_type: str = "default"
    uid: Optional[str] = None
    system_prompt: Optional[str] = None        # None = 从 skills 自动构建
    skills: Optional[List[str]] = None         # 注入 system prompt 的 skill 名称列表;None = 按 preset 决定
    enable_memory: bool = True
    auto_execute_tools: bool = True
    name: Optional[str] = None                 # 显示名称(空则由 utility_llm 自动生成)

    # Trace 控制
    trace_id: Optional[str] = None             # None = 新建
    parent_trace_id: Optional[str] = None      # 子 Agent 专用
    parent_goal_id: Optional[str] = None

    # 续跑控制
    after_sequence: Optional[int] = None       # 从哪条消息后续跑(message sequence)

实现agent/core/runner.py:RunConfig

三种运行模式

通过 RunConfig 参数自然区分,统一入口 run(messages, config)

模式 trace_id after_sequence messages 含义 API 端点
新建 None - 初始任务消息 POST /api/traces
续跑 已有 ID None 或 == head 追加到末尾的新消息 POST /api/traces/{id}/run
回溯 已有 ID 主路径上 < head 在插入点之后追加的新消息 POST /api/traces/{id}/run

Runner 根据 after_sequence 与当前 head_sequence 的关系自动判断行为,前端无需指定模式。

执行流程

async def run(messages: List[Dict], config: RunConfig = None) -> AsyncIterator[Union[Trace, Message]]:
    # Phase 1: PREPARE TRACE
    #   无 trace_id → 创建新 Trace(生成 name,初始化 GoalTree)
    #   有 trace_id + after_sequence 为 None 或 == head → 加载已有 Trace,状态置为 running
    #   有 trace_id + after_sequence < head → 加载 Trace,执行 rewind(快照 GoalTree,重建,设 parent_sequence)
    trace = await _prepare_trace(config)
    yield trace

    # Phase 2: BUILD HISTORY
    #   从 head_sequence 沿 parent chain 回溯构建主路径消息
    #   构建 system prompt(新建时注入 skills/experiences;续跑时复用已有)
    #   追加 input messages(设置 parent_sequence 指向当前 head)
    history, sequence = await _build_history(trace, messages, config)

    # Phase 3: AGENT LOOP
    for iteration in range(config.max_iterations):
        # 周期性注入 GoalTree + Active Collaborators(每 10 轮)
        if iteration % 10 == 0:
            inject_context(goal_tree, collaborators)

        response = await llm_call(messages=history, model=config.model, tools=tool_schemas)

        # 按需自动创建 root goal(兜底)
        # 记录 assistant Message
        # 执行工具,记录 tool Messages
        # 无 tool_calls 则 break

    # Phase 4: COMPLETE
    #   更新 Trace 状态 (completed/failed)
    trace.status = "completed"
    yield trace

实现agent/core/runner.py:AgentRunner

回溯(Rewind)

回溯通过 RunConfig(trace_id=..., after_sequence=N) 触发(N 在主路径上且 < head_sequence),在 Phase 1 中执行:

  1. 验证插入点:确保不截断在 assistant(tool_calls) 和 tool response 之间
  2. 快照 GoalTree:将当前完整 GoalTree 存入 events.jsonl(rewind 事件的 goal_tree_snapshot 字段)
  3. 按时间重建 GoalTree:以截断点消息的 created_at 为界,保留 created_at <= cutoff_time 的所有 goals(无论状态),丢弃 cutoff 之后创建的 goals,清空 current_id。将被保留的 in_progress goal 重置为 pending
  4. 设置 parent_sequence:新消息的 parent_sequence 指向 rewind 点,旧消息自动脱离主路径
  5. 更新 Tracehead_sequence 更新为新消息的 sequence,status 改回 running

新消息的 sequence 从 last_sequence + 1 开始(全局递增,不复用)。旧消息无需标记 abandoned,通过消息树结构自然隔离。

调用接口

三种模式共享同一入口 run(messages, config)

# 新建
async for item in runner.run(
    messages=[{"role": "user", "content": "分析项目架构"}],
    config=RunConfig(model="gpt-4o"),
):
    ...

# 续跑:在已有 trace 末尾追加消息继续执行
async for item in runner.run(
    messages=[{"role": "user", "content": "继续"}],
    config=RunConfig(trace_id="existing-trace-id"),
):
    ...

# 回溯:从指定 sequence 处切断,插入新消息重新执行
# after_sequence=5 表示新消息的 parent_sequence=5,从此处开始
async for item in runner.run(
    messages=[{"role": "user", "content": "换一个方案试试"}],
    config=RunConfig(trace_id="existing-trace-id", after_sequence=5),
):
    ...

# 重新生成:回溯后不插入新消息,直接基于已有消息重跑
async for item in runner.run(
    messages=[],
    config=RunConfig(trace_id="existing-trace-id", after_sequence=5),
):
    ...

after_sequence 的值是 message 的 sequence 号,可通过 GET /api/traces/{trace_id}/messages 查看。如果指定的 sequence 是一条带 tool_calls 的 assistant 消息,系统会自动将截断点扩展到其所有对应的 tool response 之后(安全截断)。

停止运行

# 停止正在运行的 Trace
await runner.stop(trace_id)

调用后 agent loop 在下一个检查点退出,Trace 状态置为 stopped,同时保存当前 head_sequence(确保续跑时能正确加载完整历史)。

消息完整性保护(orphaned tool_call 修复):续跑加载历史时,_build_history 自动检测并修复 orphaned tool_calls(_heal_orphaned_tool_calls)。当 agent 被 stop/crash 中断时,可能存在 assistant 的 tool_calls 没有对应的 tool results(包括部分完成的情况:3 个 tool_call 只有 1 个 tool_result)。直接发给 LLM 会导致 400 错误。

修复策略:为每个缺失的 tool_result 插入合成的中断通知(而非裁剪 assistant 消息):

工具类型 合成 tool_result 内容
普通工具 简短中断提示,建议重新调用
agent/evaluate 结构化中断信息,包含 sub_trace_id、执行统计、continue_from 用法指引

agent 工具的合成结果对齐正常返回值格式(含 sub_trace_id 字段),主 Agent 可直接使用 agent(task=..., continue_from=sub_trace_id) 续跑被中断的子 Agent。合成消息持久化存储,确保幂等。

实现agent/core/runner.py:AgentRunner._heal_orphaned_tool_calls

  • run(messages, config)核心方法,流式返回 AsyncIterator[Union[Trace, Message]]
  • run_result(messages, config, on_event=None):便利方法,内部消费 run(),返回结构化结果。on_event 回调可实时接收每个 Trace/Message 事件(用于调试时输出子 Agent 执行过程)。主要用于 agent/evaluate 工具内部

REST API

查询端点

方法 路径 说明
GET /api/traces 列出 Traces
GET /api/traces/{id} 获取 Trace 详情(含 GoalTree、Sub-Traces)
GET /api/traces/{id}/messages 获取 Messages(支持 mode=main_path/all)
GET /api/traces/running 列出正在运行的 Trace
WS /api/traces/{id}/watch 实时事件推送

实现agent/trace/api.py, agent/trace/websocket.py

控制端点

需在 api_server.py 中配置 Runner。执行在后台异步进行,通过 WebSocket 监听进度。

方法 路径 说明
POST /api/traces 新建 Trace 并执行
POST /api/traces/{id}/run 运行(统一续跑 + 回溯)
POST /api/traces/{id}/stop 停止运行中的 Trace
POST /api/traces/{id}/reflect 触发反思,从执行历史中提取经验
POST /api/traces/{id}/compact 触发压缩,通过侧分支多轮 agent 模式压缩上下文
# 新建
curl -X POST http://localhost:8000/api/traces \
  -H "Content-Type: application/json" \
  -d '{"messages": [{"role": "user", "content": "分析项目架构"}], "model": "gpt-4o"}'

# 续跑(after_sequence 为 null 或省略)
curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
  -d '{"messages": [{"role": "user", "content": "继续深入分析"}]}'

# 回溯:从 sequence 5 处截断,插入新消息重新执行
curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
  -d '{"after_sequence": 5, "messages": [{"role": "user", "content": "换一个方案"}]}'

# 重新生成:回溯到 sequence 5,不插入新消息,直接重跑
curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
  -d '{"after_sequence": 5, "messages": []}'

# 停止
curl -X POST http://localhost:8000/api/traces/{trace_id}/stop

# 反思:通过侧分支多轮 agent 模式提取经验
curl -X POST http://localhost:8000/api/traces/{trace_id}/reflect \
  -d '{"focus": "为什么第三步选择了错误的方案"}'

# 压缩:通过侧分支多轮 agent 模式压缩上下文
curl -X POST http://localhost:8000/api/traces/{trace_id}/compact

响应立即返回 {"trace_id": "...", "status": "started"},通过 WS /api/traces/{trace_id}/watch 监听实时事件。

实现agent/trace/run_api.py

经验端点

方法 路径 说明
GET /api/experiences 读取经验文件内容

实现agent/trace/run_api.py


数据模型

Trace(任务执行)

一次完整的 Agent 执行。所有 Agent(主、子、人类协助)都是 Trace。

@dataclass
class Trace:
    trace_id: str
    mode: Literal["call", "agent"]           # 单次调用 or Agent 模式

    # Prompt 标识
    prompt_name: Optional[str] = None

    # Agent 模式特有
    task: Optional[str] = None
    agent_type: Optional[str] = None

    # 父子关系(Sub-Trace 特有)
    parent_trace_id: Optional[str] = None    # 父 Trace ID
    parent_goal_id: Optional[str] = None     # 哪个 Goal 启动的

    # 状态
    status: Literal["running", "completed", "failed", "stopped"] = "running"

    # 统计
    total_messages: int = 0
    total_tokens: int = 0                    # 总 tokens(prompt + completion)
    total_prompt_tokens: int = 0
    total_completion_tokens: int = 0
    total_cost: float = 0.0
    total_duration_ms: int = 0

    # 进度追踪
    last_sequence: int = 0                   # 最新 message 的 sequence(全局递增,不复用)
    head_sequence: int = 0                   # 当前主路径的头节点 sequence(用于 build_llm_messages)
    last_event_id: int = 0                   # 最新事件 ID(用于 WS 续传)

    # 配置
    uid: Optional[str] = None
    model: Optional[str] = None              # 默认模型
    tools: Optional[List[Dict]] = None       # 工具定义(OpenAI 格式)
    llm_params: Dict[str, Any] = {}          # LLM 参数(temperature 等)
    context: Dict[str, Any] = {}             # 元数据(含 collaborators 列表)

    # 当前焦点
    current_goal_id: Optional[str] = None

    # 结果
    result_summary: Optional[str] = None
    error_message: Optional[str] = None

    # 时间
    created_at: datetime
    completed_at: Optional[datetime] = None

实现agent/trace/models.py

Goal(目标节点)

计划中的一个目标,支持层级结构。单独存储于 goal.json

@dataclass
class Goal:
    id: str                                  # 内部 ID("1", "2"...)
    description: str
    reason: str = ""                         # 创建理由
    parent_id: Optional[str] = None          # 父 Goal ID
    type: GoalType = "normal"                # normal | agent_call
    status: GoalStatus = "pending"           # pending | in_progress | completed | abandoned
    summary: Optional[str] = None            # 完成/放弃时的总结

    # agent_call 特有(启动 Sub-Trace)
    sub_trace_ids: Optional[List[str]] = None
    agent_call_mode: Optional[str] = None    # explore | delegate | evaluate
    sub_trace_metadata: Optional[Dict] = None

    # 统计
    self_stats: GoalStats                    # 自身 Messages 统计
    cumulative_stats: GoalStats              # 包含子孙的累计统计

    created_at: datetime

Goal 类型

  • normal - 普通目标,由 Agent 直接执行
  • agent_call - 通过 agent/evaluate 工具创建的目标,会启动 Sub-Trace

agent_call 类型的 Goal

  • 调用 agent/evaluate 工具时自动设置
  • agent_call_mode 记录使用的模式(explore/delegate/evaluate)
  • sub_trace_ids 记录创建的所有 Sub-Trace ID
  • 状态转换:pending → in_progress(Sub-Trace 启动)→ completed(Sub-Trace 完成)
  • summary 包含格式化的汇总结果(explore 模式会汇总所有分支)

Goal 操作(通过 goal 工具):

  • add - 添加顶层目标
  • under - 在指定目标下添加子目标
  • after - 在指定目标后添加兄弟目标
  • focus - 切换焦点到指定目标
  • done - 完成当前目标(附带 summary)
  • abandon - 放弃当前目标(附带原因)

实现agent/trace/goal_models.py, agent/trace/goal_tool.py

Message(执行消息)

对应 LLM API 的消息,每条 Message 关联一个 Goal。消息通过 parent_sequence 形成树结构。

@dataclass
class Message:
    message_id: str                          # 格式:{trace_id}-{sequence:04d}
    trace_id: str
    role: Literal["system", "user", "assistant", "tool"]
    sequence: int                            # 全局顺序(递增,不复用)
    parent_sequence: Optional[int] = None    # 父消息的 sequence(构成消息树)
    goal_id: Optional[str] = None            # 关联的 Goal ID(初始消息为 None,系统会按需自动创建 root goal 兜底)
    description: str = ""                    # 系统自动生成的摘要
    tool_call_id: Optional[str] = None
    content: Any = None

    # 统计
    prompt_tokens: Optional[int] = None
    completion_tokens: Optional[int] = None
    cost: Optional[float] = None
    duration_ms: Optional[int] = None

    # LLM 响应信息(仅 role="assistant")
    finish_reason: Optional[str] = None

    created_at: datetime

    # [已弃用] 由 parent_sequence 树结构替代
    status: Literal["active", "abandoned"] = "active"
    abandoned_at: Optional[datetime] = None

消息树(Message Tree)

消息通过 parent_sequence 形成树。主路径 = 从 trace.head_sequence 沿 parent chain 回溯到 root。

正常对话:1 → 2 → 3 → 4 → 5       (每条的 parent 指向前一条)
Rewind 到 3:3 → 6(parent=3) → 7   (新主路径,4-5 自动脱离)
压缩 1-3:   8(summary, parent=None) → 6 → 7  (summary 跳过被压缩的消息)
侧分支:     5 → 6(branch_type="compression", parent=5) → 7(parent=6)
            5 → 8(summary, parent=5, 主路径)
            (侧分支消息 6-7 通过 parent_sequence 自然脱离主路径)

build_llm_messages = 从 trace.head_sequence 沿 parent_sequence 链回溯到 root,反转后返回。

关键设计:只要 trace.head_sequence 管理正确(始终指向主路径),get_main_path_messages() 自然返回主路径消息,侧分支消息通过 parent_sequence 链自动被跳过,无需额外过滤。

Message 提供格式转换方法:

  • to_llm_dict() → OpenAI 格式 Dict(用于 LLM 调用)
  • from_llm_dict(d, trace_id, sequence, goal_id) → 从 OpenAI 格式创建 Message

侧分支字段

  • branch_type: "compression" | "reflection" | None(主路径)
  • branch_id: 同一侧分支的消息共享 branch_id

实现agent/trace/models.py


Agent 预设

不同类型 Agent 的配置模板,控制工具权限和参数。

@dataclass
class AgentPreset:
    allowed_tools: Optional[List[str]] = None  # None 表示允许全部
    denied_tools: Optional[List[str]] = None   # 黑名单
    max_iterations: int = 30
    temperature: Optional[float] = None
    skills: Optional[List[str]] = None         # 注入 system prompt 的 skill 名称列表;None = 加载全部
    description: Optional[str] = None


_DEFAULT_SKILLS = ["planning", "research", "browser"]

AGENT_PRESETS = {
    "default": AgentPreset(
        allowed_tools=None,
        max_iterations=30,
        skills=_DEFAULT_SKILLS,
        description="默认 Agent,拥有全部工具权限",
    ),
    "explore": AgentPreset(
        allowed_tools=["read", "glob", "grep", "list_files"],
        denied_tools=["write", "edit", "bash", "task"],
        max_iterations=15,
        skills=["planning"],
        description="探索型 Agent,只读权限,用于代码分析",
    ),
    "analyst": AgentPreset(
        allowed_tools=["read", "glob", "grep", "web_search", "webfetch"],
        denied_tools=["write", "edit", "bash", "task"],
        temperature=0.3,
        max_iterations=25,
        skills=["planning", "research"],
        description="分析型 Agent,用于深度分析和研究",
    ),
}

实现agent/core/presets.py

用户自定义:项目级配置文件(如 examples/how/presets.json)可通过 register_preset() 注册额外预设。项目专用的 Agent 类型建议放在项目目录下,而非内置预设。


子 Trace 机制

通过 agent 工具创建子 Agent 执行任务。task 参数为字符串时为单任务(delegate),为列表时并行执行多任务(explore)。支持通过 messages 参数预置消息,通过 continue_from 参数续跑已有 Sub-Trace。

agent 工具负责创建 Sub-Trace 和初始化 GoalTree(因为需要设置自定义 context 元数据和命名规则),创建完成后将 trace_id 传给 RunConfig,由 Runner 接管后续执行。工具同时维护父 Trace 的 context["collaborators"] 列表。

跨设备 Agent 通信

支持跨设备的 Agent 间持续对话,通过远程 Trace ID 实现:

Trace ID 格式

  • 本地 Trace:abc-123
  • 远程 Trace:agent://terminal-agent-456/abc-123(协议 + Agent 地址 + 本地 ID)

使用方式

# 调用远程 Agent
result = agent(task="分析本地项目", agent_url="https://terminal-agent.local")
# 返回: {"sub_trace_id": "agent://terminal-agent.local/abc-123"}

# 续跑远程 Trace(持续对话)
result2 = agent(
    task="重点分析core模块",
    continue_from="agent://terminal-agent.local/abc-123",
    agent_url="https://terminal-agent.local"
)

实现HybridTraceStore 自动路由到本地或远程存储,远程访问通过 HTTP API 实现。

实现位置agent/trace/hybrid_store.py(规划中)

agent 工具

@tool(description="创建 Agent 执行任务")
async def agent(
    task: Union[str, List[str]],
    messages: Optional[Union[Messages, List[Messages]]] = None,
    continue_from: Optional[str] = None,
    agent_type: Optional[str] = None,
    skills: Optional[List[str]] = None,
    agent_url: Optional[str] = None,  # 远程 Agent 地址(跨设备)
    context: Optional[dict] = None,
) -> Dict[str, Any]:

参数

  • agent_type: 子 Agent 类型,决定工具权限和默认 skills(对应 AgentPreset 名称)
  • skills: 覆盖 preset 默认值,显式指定注入 system prompt 的 skill 列表
  • agent_url: 远程 Agent 地址,用于跨设备调用(返回远程 Trace ID)
  • continue_from: 支持本地或远程 Trace ID

单任务(delegate)task: str

  • 创建单个 Sub-Trace
  • 完整工具权限(除 agent/evaluate 外,防止递归)
  • 支持 continue_from 续跑已有 Sub-Trace(本地或远程)
  • 支持 messages 预置上下文消息

多任务(explore)task: List[str]

  • 使用 asyncio.gather() 并行执行所有任务
  • 每个任务创建独立的 Sub-Trace
  • 只读工具权限(read_file, grep_content, glob_files, goal)
  • messages 支持 1D(共享)或 2D(per-agent)
  • 不支持 continue_from
  • 汇总所有分支结果返回

evaluate 工具

@tool(description="评估目标执行结果是否满足要求")
async def evaluate(
    messages: Optional[Messages] = None,
    target_goal_id: Optional[str] = None,
    continue_from: Optional[str] = None,
    context: Optional[dict] = None,
) -> Dict[str, Any]:
  • 代码自动从 GoalTree 注入目标描述(无需 criteria 参数)
  • 模型把执行结果和上下文放在 messages
  • target_goal_id 默认为当前 goal_id
  • 只读工具权限
  • 返回评估结论和改进建议

消息类型别名

定义在 agent/trace/models.py,用于工具参数和 runner/LLM API 接口:

ChatMessage = Dict[str, Any]                          # 单条 OpenAI 格式消息
Messages = List[ChatMessage]                          # 消息列表
MessageContent = Union[str, List[Dict[str, str]]]     # content 字段(文本或多模态)

实现位置agent/tools/builtin/subagent.py

详细文档工具系统 - Agent/Evaluate 工具

ask_human 工具

创建阻塞式 Trace,等待人类通过 IM/邮件等渠道回复。

注意:此功能规划中,暂未实现。


Active Collaborators(活跃协作者)

任务执行中与模型密切协作的实体(子 Agent 或人类),按 与当前任务的关系 分类,而非按 human/agent 分类:

持久存在(外部可查) 任务内活跃(需要注入)
Agent 专用 Agent(代码审查等) 当前任务创建的子 Agent
Human 飞书通讯录 当前任务中正在对接的人

数据模型

活跃协作者存储在 trace.context["collaborators"]

{
    "name": "researcher",            # 名称(模型可见)
    "type": "agent",                 # agent | human
    "trace_id": "abc-@delegate-001", # trace_id(agent 场景)
    "status": "completed",           # running | waiting | completed | failed
    "summary": "方案A最优",          # 最近状态摘要
}

注入方式

与 GoalTree 一同周期性注入(每 10 轮),渲染为 Markdown:

## Active Collaborators
- researcher [agent, completed]: 方案A最优
- 谭景玉 [human, waiting]: 已发送方案确认,等待回复
- coder [agent, running]: 正在实现特征提取模块

列表为空时不注入。

维护

各工具负责更新 collaborators 列表(通过 context["store"] 写入 trace.context):

  • agent 工具:创建/续跑子 Agent 时更新
  • feishu 工具:发送消息/收到回复时更新
  • Runner 只负责读取和注入

持久联系人/Agent:通过工具按需查询(如 feishu_get_contact_list),不随任务注入。

实现agent/core/runner.py:AgentRunner._build_context_injection, agent/tools/builtin/subagent.py


Context Injection Hooks(上下文注入钩子)

概述

Context Injection Hooks 是一个可扩展机制,允许外部模块(如 A2A IM、监控系统)向 Agent 的周期性上下文注入中添加自定义内容。

设计理念

  • 周期性注入:每 10 轮自动注入,不打断执行
  • 可扩展:通过 hook 函数注册,无需修改 Runner 核心代码
  • 轻量提醒:只注入摘要/提醒,详细内容通过工具获取
  • LLM 自主决策:由 LLM 决定何时响应提醒

架构

Runner Loop (每 10 轮)
    ↓
_build_context_injection()
    ├─ GoalTree (内置)
    ├─ Active Collaborators (内置)
    └─ Context Hooks (可扩展)
         ├─ A2A IM Hook → "💬 3 条新消息"
         ├─ Monitor Hook → "⚠️ 内存使用 85%"
         └─ Custom Hook → 自定义内容
    ↓
注入为 system message
    ↓
LLM 看到提醒 → 决定是否调用工具

Hook 接口

# Hook 函数签名
def context_hook(trace: Trace, goal_tree: Optional[GoalTree]) -> Optional[str]:
    """
    生成要注入的上下文内容

    Args:
        trace: 当前 Trace
        goal_tree: 当前 GoalTree

    Returns:
        要注入的 Markdown 内容,None 表示无内容
    """
    return "## Custom Section\n\n内容..."

注册 Hook

# 创建 Runner 时注册
runner = AgentRunner(
    llm_call=llm_call,
    trace_store=trace_store,
    context_hooks=[hook1, hook2, hook3]  # 按顺序注入
)

实现

Runner 修改

# agent/core/runner.py

class AgentRunner:
    def __init__(
        self,
        # ... 现有参数
        context_hooks: Optional[List[Callable]] = None
    ):
        self.context_hooks = context_hooks or []

    def _build_context_injection(
        self,
        trace: Trace,
        goal_tree: Optional[GoalTree],
    ) -> str:
        """构建周期性注入的上下文(GoalTree + Active Collaborators + Hooks)"""
        parts = []

        # GoalTree(现有)
        if goal_tree and goal_tree.goals:
            parts.append(f"## Current Plan\n\n{goal_tree.to_prompt()}")
            # ... focus 提醒

        # Active Collaborators(现有)
        collaborators = trace.context.get("collaborators", [])
        if collaborators:
            lines = ["## Active Collaborators"]
            for c in collaborators:
                # ... 现有逻辑
            parts.append("\n".join(lines))

        # Context Hooks(新增)
        for hook in self.context_hooks:
            try:
                hook_content = hook(trace, goal_tree)
                if hook_content:
                    parts.append(hook_content)
            except Exception as e:
                logger.error(f"Context hook error: {e}")

        return "\n\n".join(parts)

实现位置agent/core/runner.py:AgentRunner._build_context_injection(待实现)

示例:A2A IM Hook

# agent/tools/builtin/a2a_im.py

class A2AMessageQueue:
    """A2A IM 消息队列"""

    def __init__(self):
        self._messages: List[Dict] = []

    def push(self, message: Dict):
        """Gateway 推送消息时调用"""
        self._messages.append(message)

    def pop_all(self) -> List[Dict]:
        """check_messages 工具调用时清空"""
        messages = self._messages
        self._messages = []
        return messages

    def get_summary(self) -> Optional[str]:
        """获取消息摘要(用于 context injection)"""
        if not self._messages:
            return None

        count = len(self._messages)
        latest = self._messages[-1]
        from_agent = latest.get("from_agent_id", "unknown")

        if count == 1:
            return f"💬 来自 {from_agent} 的 1 条新消息(使用 check_messages 工具查看)"
        else:
            return f"💬 {count} 条新消息,最新来自 {from_agent}(使用 check_messages 工具查看)"


def create_a2a_context_hook(message_queue: A2AMessageQueue):
    """创建 A2A IM 的 context hook"""

    def a2a_context_hook(trace: Trace, goal_tree: Optional[GoalTree]) -> Optional[str]:
        """注入 A2A IM 消息提醒"""
        summary = message_queue.get_summary()
        if not summary:
            return None

        return f"## Messages\n\n{summary}"

    return a2a_context_hook


@tool(description="检查来自其他 Agent 的新消息")
async def check_messages(ctx: ToolContext) -> ToolResult:
    """检查并获取来自其他 Agent 的新消息"""
    message_queue: A2AMessageQueue = ctx.context.get("a2a_message_queue")
    if not message_queue:
        return ToolResult(title="消息队列未初始化", output="")

    messages = message_queue.pop_all()

    if not messages:
        return ToolResult(title="无新消息", output="")

    # 格式化消息
    lines = [f"收到 {len(messages)} 条新消息:\n"]
    for i, msg in enumerate(messages, 1):
        from_agent = msg.get("from_agent_id", "unknown")
        content = msg.get("content", "")
        conv_id = msg.get("conversation_id", "")
        lines.append(f"{i}. 来自 {from_agent}")
        lines.append(f"   对话 ID: {conv_id}")
        lines.append(f"   内容: {content}")
        lines.append("")

    return ToolResult(
        title=f"收到 {len(messages)} 条新消息",
        output="\n".join(lines)
    )

实现位置agent/tools/builtin/a2a_im.py(待实现)

配置示例

# api_server.py

from agent.tools.builtin.a2a_im import (
    A2AMessageQueue,
    create_a2a_context_hook,
    check_messages
)

# 创建消息队列
message_queue = A2AMessageQueue()

# 创建 context hook
a2a_hook = create_a2a_context_hook(message_queue)

# 创建 Runner 时注入 hook
runner = AgentRunner(
    llm_call=llm_call,
    trace_store=trace_store,
    context_hooks=[a2a_hook]
)

# 注册 check_messages 工具
tool_registry.register(check_messages)

# 启动 Gateway webhook 端点
@app.post("/webhook/a2a-messages")
async def receive_a2a_message(message: dict):
    """接收来自 Gateway 的消息"""
    message_queue.push(message)
    return {"status": "received"}

注入效果

## Current Plan
1. [in_progress] 分析代码架构
   1.1. [completed] 读取项目结构
   1.2. [in_progress] 分析核心模块

## Active Collaborators
- researcher [agent, completed]: 已完成调研

## Messages
💬 来自 code-reviewer 的 1 条新消息(使用 check_messages 工具查看)

其他应用场景

监控告警

def create_monitor_hook(monitor):
    def monitor_hook(trace, goal_tree):
        alerts = monitor.get_alerts()
        if not alerts:
            return None
        return f"## System Alerts\n\n⚠️ {len(alerts)} 条告警(使用 check_alerts 工具查看)"
    return monitor_hook

定时提醒

def create_timer_hook(timer):
    def timer_hook(trace, goal_tree):
        if timer.should_remind():
            return "## Reminder\n\n⏰ 任务已执行 30 分钟,建议检查进度"
        return None
    return timer_hook

实现位置:各模块自行实现 hook 函数


Active Collaborators(活跃协作者)

任务执行中与模型密切协作的实体(子 Agent 或人类),按 与当前任务的关系 分类,而非按 human/agent 分类:

持久存在(外部可查) 任务内活跃(需要注入)
Agent 专用 Agent(代码审查等) 当前任务创建的子 Agent
Human 飞书通讯录 当前任务中正在对接的人

数据模型

活跃协作者存储在 trace.context["collaborators"]

{
    "name": "researcher",            # 名称(模型可见)
    "type": "agent",                 # agent | human
    "trace_id": "abc-@delegate-001", # trace_id(agent 场景)
    "status": "completed",           # running | waiting | completed | failed
    "summary": "方案A最优",          # 最近状态摘要
}

注入方式

与 GoalTree 一同周期性注入(每 10 轮),渲染为 Markdown:

## Active Collaborators
- researcher [agent, completed]: 方案A最优
- 谭景玉 [human, waiting]: 已发送方案确认,等待回复
- coder [agent, running]: 正在实现特征提取模块

列表为空时不注入。

维护

各工具负责更新 collaborators 列表(通过 context["store"] 写入 trace.context):

  • agent 工具:创建/续跑子 Agent 时更新
  • feishu 工具:发送消息/收到回复时更新
  • Runner 只负责读取和注入

持久联系人/Agent:通过工具按需查询(如 feishu_get_contact_list),不随任务注入。

实现agent/core/runner.py:AgentRunner._build_context_injection, agent/tools/builtin/subagent.py


工具系统

核心概念

@tool()
async def my_tool(arg: str, ctx: ToolContext) -> ToolResult:
    return ToolResult(
        title="Success",
        output="Result content",
        long_term_memory="Short summary"  # 可选:压缩后保留的摘要
    )
类型 作用
@tool 装饰器,自动注册工具并生成 Schema
ToolResult 工具执行结果,支持双层记忆
ToolContext 工具执行上下文,依赖注入

工具分类

目录 工具 说明
trace/ goal Agent 内部计划管理
builtin/ agent, evaluate 子 Agent 创建与评估
builtin/file/ read, write, edit, glob, grep 文件操作
builtin/browser/ browser actions 浏览器自动化
builtin/ bash, sandbox, search, webfetch, skill, ask_human 其他工具

双层记忆管理

大输出(如网页抓取)只传给 LLM 一次,之后用摘要替代:

ToolResult(
    output="<10K tokens 的完整内容>",
    long_term_memory="Extracted 10000 chars from amazon.com",
    include_output_only_once=True
)

详细文档工具系统


Skills 系统

分类

类型 加载位置 加载时机
内置 Skill System Prompt Agent 启动时自动注入
项目 Skill System Prompt Agent 启动时按 preset/call-site 过滤后注入
普通 Skill 对话消息 模型调用 skill 工具时

目录结构

agent/skill/skills/         # 内置 Skills(始终加载)
├── planning.md              # 计划与 Goal 工具使用
├── research.md              # 搜索与内容研究
└── browser.md               # 浏览器自动化

./skills/                    # 项目自定义 Skills

Skills 过滤(call-site 选择)

不同 Agent 类型所需的 skills 不同。过滤优先级:

  1. agent() 工具的 skills 参数(显式指定,最高优先级)
  2. AgentPreset.skills(preset 默认值)
  3. None(加载全部,向后兼容)

示例:调用子 Agent 时只注入解构相关 skill:

agent(task="...", agent_type="deconstruct", skills=["planning", "deconstruct"])

实现agent/skill/skill_loader.py

详细文档Skills 使用指南


Experiences 系统

从执行历史中提取的经验规则,用于指导未来任务。

存储规范

经验以 Markdown 文件存储(默认 ./.cache/experiences.md),人类可读、可编辑、可版本控制。

文件格式:

---
id: ex_001
trace_id: trace-xxx
category: tool_usage
tags: {state: ["large_file", "dirty_repo"], intent: ["batch_edit", "safe_modify"]}
metrics: {helpful: 12, harmful: 0}
created_at: 2026-02-12 15:30
---

---
id: ex_002
...

反思机制(Reflect)

通过 POST /api/traces/{id}/reflect 触发,旨在将原始执行历史提炼为可复用的知识。

1. 分叉反思:在 trace 末尾追加 user message(含反思与打标 Prompt),作为侧枝执行。
2. 结构化生成:
    ·归类:将经验分配至 tool_usage(工具)、logic_flow(逻辑)、environment(环境)等。
    ·打标:提取 state(环境状态)与 intent(用户意图)语义标签。
    ·量化:初始 helpful 设为 1。
3. 持久化:将带有元数据的 Markdown 块追加至 experiences.md。

实现:agent/trace/run_api.py:reflect_trace

语义注入与匹配流程

新建 Trace 时,Runner 采用“分析-检索-注入”三阶段策略,实现精准经验推荐。

1. 意图预分析
Runner 调用 utility_llm 对初始任务进行语义提取:
    -输入:"优化这个项目的 Docker 构建速度"
    -输出:{state: ["docker", "ci"], intent: ["optimization"]}
2. 语义检索
    在 _load_experiences 中根据标签进行语义匹配(优先匹配 intent,其次是 state),筛选出相关度最高的 Top-K 条经验。
3. 精准注入
    将匹配到的经验注入第一条 user message 末尾:
# _build_history 中(仅新建模式):
if not config.trace_id:
    relevant_ex = self.experience_retriever.search(task_tags)
    if relevant_ex:
        formatted_ex = "\n".join([f"- [{e.id}] {e.content} (Helpful: {e.helpful})" for e in relevant_ex])
        first_user_msg["content"] += f"\n\n## 参考经验\n\n{formatted_ex}"

实现:agent/core/runner.py:AgentRunner._build_history

经验获取工具

不再仅限于启动时自动注入,而是通过内置工具供 Agent 在需要时主动调用。当执行结果不符合预期或进入未知领域时,Agent 应优先使用此工具。 工具定义:

@tool(description="根据当前任务状态和意图,从经验库中检索相关的历史经验")
async def get_experience(
    intent: Optional[str] = None, 
    state: Optional[str] = None
) -> Dict[str, Any]:
    """
    参数:
        intent: 想要达成的目标意图 (如 "optimization", "debug")
        state: 当前环境或遇到的问题状态 (如 "docker_build_fail", "permission_denied")
    """

实现: agent/tools/builtin/experience.py

  • 语义匹配与应用流程 当 Agent 调用 get_experience 时,系统执行以下逻辑:
    1. 语义检索:根据传入的 intent 或 state 标签,在 experiences.md 中进行匹配。匹配权重:intent > state > helpful 评分。
    2. 动态注入:工具返回匹配到的 Top-K 条经验(含 ID 和内容)。
    3. 策略应用:Agent 接收到工具返回的经验后,需在后续 thought 中声明所选用的策略 ID(如 [ex_001]),并据此调整 goal_tree 或工具调用序列。

Context 压缩

两级压缩策略

Level 1:GoalTree 过滤(确定性,零成本)

每轮 agent loop 构建 llm_messages 时自动执行:

  • 始终保留:system prompt、第一条 user message(含 GoalTree 精简视图)、当前 focus goal 的消息
  • 跳过 completed/abandoned goals 的消息(信息已在 GoalTree summary 中)
  • 通过 Message Tree 的 parent_sequence 实现跳过

大多数情况下 Level 1 足够。

Level 2:LLM 总结(仅在 Level 1 后仍超限时触发)

触发条件:Level 1 之后 token 数仍超过阈值(默认 max_tokens × 0.8)。

流程:

  1. 经验提取:在消息列表末尾追加反思 prompt,进入侧分支 agent 模式(最多 5 轮),LLM 可调用工具(如 knowledge_search, knowledge_save)进行多轮推理。反思消息标记为 branch_type="reflection",不在主路径上
  2. 压缩:在消息列表末尾追加压缩 prompt(含 GoalTree 完整视图),进入侧分支 agent 模式(最多 5 轮),LLM 可调用工具(如 goal_status)辅助压缩。压缩消息标记为 branch_type="compression",完成后创建 summary 消息,其 parent_sequence 跳过被压缩的范围

侧分支模式:压缩和反思在同一 agent loop 中通过状态机实现,复用主路径的缓存和工具配置,支持多轮推理。

GoalTree 双视图

to_prompt() 支持两种模式:

  • include_summary=False(默认):精简视图,用于日常周期性注入
  • include_summary=True:含所有 completed goals 的 summary,用于 Level 2 压缩时提供上下文

压缩存储

  • 原始消息永远保留在 messages/
  • 压缩 summary 作为普通 Message 存储
  • 侧分支消息通过 branch_typebranch_id 标记,查询主路径时自动过滤
  • 通过 parent_sequence 树结构实现跳过,无需 compression events 或 skip list
  • Rewind 到压缩区域内时,summary 脱离主路径,原始消息自动恢复

实现agent/core/runner.py:_agent_loop, agent/trace/compaction.py, agent/trace/goal_models.py


存储接口

class TraceStore(Protocol):
    async def create_trace(self, trace: Trace) -> None: ...
    async def get_trace(self, trace_id: str) -> Trace: ...
    async def update_trace(self, trace_id: str, **updates) -> None: ...
    async def add_message(self, message: Message) -> None: ...
    async def get_trace_messages(self, trace_id: str) -> List[Message]: ...
    async def get_main_path_messages(self, trace_id: str, head_sequence: int) -> List[Message]: ...
    async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ...
    async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ...

get_main_path_messageshead_sequence 沿 parent_sequence 链回溯,返回主路径上的有序消息列表。

实现

  • 协议定义:agent/trace/protocols.py
  • 本地存储:agent/trace/store.py:FileSystemTraceStore
  • 远程存储:agent/trace/remote_store.py:RemoteTraceStore(规划中)
  • 混合存储:agent/trace/hybrid_store.py:HybridTraceStore(规划中)

跨设备存储

HybridTraceStore 根据 Trace ID 自动路由到本地或远程存储:

Trace ID 格式 存储位置 访问方式
abc-123 本地文件系统 FileSystemTraceStore
agent://host/abc-123 远程 Agent HTTP API(RemoteTraceStore

RemoteTraceStore 通过 HTTP API 访问远程 Trace:

  • GET /api/traces/{trace_id} - 获取 Trace 元数据
  • GET /api/traces/{trace_id}/messages - 获取消息历史
  • POST /api/traces/{trace_id}/run - 续跑(追加消息并执行)

认证:通过 API Key 认证,配置在 config/agents.yaml

实现位置agent/trace/hybrid_store.py, agent/trace/remote_store.py(规划中)

存储结构

.trace/
├── {trace_id}/
│   ├── meta.json        # Trace 元数据(含 tools 定义)
│   ├── goal.json        # GoalTree(mission + goals 列表)
│   ├── events.jsonl     # 事件流(goal 变更、sub_trace 生命周期等)
│   └── messages/        # Messages
│       ├── {trace_id}-0001.json
│       └── ...
│
└── {trace_id}@explore-{序号}-{timestamp}-001/  # 子 Trace
    └── ...

events.jsonl 说明

  • 记录 Trace 执行过程中的关键事件
  • 每行一个 JSON 对象,包含 event_id、event 类型、时间戳等
  • 主要事件类型:goal_added, goal_updated, sub_trace_started, sub_trace_completed, rewind
  • 用于实时监控和历史回放

Sub-Trace 目录命名

  • Explore: {parent}@explore-{序号:03d}-{timestamp}-001
  • Delegate: {parent}@delegate-{timestamp}-001
  • Evaluate: {parent}@evaluate-{timestamp}-001

meta.json 示例

{
  "trace_id": "0415dc38-...",
  "mode": "agent",
  "task": "分析代码结构",
  "agent_type": "default",
  "status": "running",
  "model": "google/gemini-2.5-flash",
  "tools": [...],
  "llm_params": {"temperature": 0.3},
  "context": {
    "collaborators": [
      {"name": "researcher", "type": "agent", "trace_id": "...", "status": "completed", "summary": "方案A最优"}
    ]
  },
  "current_goal_id": "3"
}

设计决策

详见 设计决策文档

核心决策

  1. 所有 Agent 都是 Trace - 主 Agent、子 Agent、人类协助统一为 Trace,通过 parent_trace_idspawn_tool 区分

  2. trace/ 模块统一管理执行状态 - 合并原 execution/ 和 goal/,包含计划管理和 Agent 内部控制工具

  3. tools/ 专注外部交互 - 文件、命令、网络、浏览器等与外部世界的交互

  4. Agent 预设替代 Sub-Agent 配置 - 通过 core/presets.py 定义不同类型 Agent 的工具权限和参数


相关文档

文档 内容
Context 管理 Goals、压缩、Plan 注入策略
工具系统 工具定义、注册、双层记忆
Skills 指南 Skill 分类、编写、加载
多模态支持 图片、PDF 处理
知识管理 知识结构、检索、提取机制
Scope 设计 知识可见性和权限控制
Agent 设计决策 Agent Core 架构决策记录
Gateway 设计决策 Gateway 架构决策记录
组织级概览 组织级 Agent 系统架构和规划
Enterprise 实现 认证、审计、多租户技术实现
测试指南 测试策略和命令
A2A 协议调研 行业 A2A 通信协议和框架对比
A2A 跨设备通信 跨设备 Agent 通信方案(内部)
A2A Trace 存储 跨设备 Trace 存储方案详细设计
MAMP 协议 与外部 Agent 系统的通用交互协议
A2A IM 系统 Agent 即时通讯系统架构和实现
Gateway 架构 Gateway 三层架构和设计决策
Gateway 部署 Gateway 部署模式和配置
Gateway API Gateway API 完整参考