本文档描述 Agent Core 模块的完整架构设计。
module/file.py:function_namedecisions.md 另行记录核心理念:所有 Agent 都是 Trace
| 类型 | 创建方式 | 父子关系 | 状态 |
|---|---|---|---|
| 主 Agent | 直接调用 runner.run() |
无 parent | 正常执行 |
| 子 Agent | 通过 agent 工具 |
parent_trace_id / parent_goal_id 指向父 |
正常执行 |
| 人类协助 | 通过 ask_human 工具 |
parent_trace_id 指向父 |
阻塞等待 |
agent/
├── core/ # 核心引擎
│ ├── runner.py # AgentRunner + 运行时配置
│ └── presets.py # Agent 预设(explore、analyst 等)
│
├── trace/ # 执行追踪(含计划管理)
│ ├── models.py # Trace, Message
│ ├── goal_models.py # Goal, GoalTree, GoalStats
│ ├── protocols.py # TraceStore 接口
│ ├── store.py # FileSystemTraceStore 实现
│ ├── goal_tool.py # goal 工具(计划管理)
│ ├── compaction.py # Context 压缩
│ ├── api.py # REST API
│ ├── websocket.py # WebSocket API
│ └── trace_id.py # Trace ID 生成工具
│
├── tools/ # 外部交互工具
│ ├── registry.py # 工具注册表
│ ├── schema.py # Schema 生成器
│ ├── models.py # ToolResult, ToolContext
│ └── builtin/
│ ├── file/ # 文件操作(read, write, edit, glob, grep)
│ ├── browser/ # 浏览器自动化
│ ├── bash.py # 命令执行
│ ├── sandbox.py # 沙箱环境
│ ├── search.py # 网络搜索
│ ├── webfetch.py # 网页抓取
│ ├── skill.py # 技能加载
│ └── subagent.py # agent / evaluate 工具(子 Agent 创建与评估)
│
├── memory/ # 跨会话记忆
│ ├── models.py # Experience, Skill
│ ├── protocols.py # MemoryStore 接口
│ ├── stores.py # 存储实现
│ ├── skill_loader.py # Skill 加载器
│ └── skills/ # 内置 Skills(自动注入 system prompt)
│ ├── planning.md # 计划与 Goal 工具使用
│ ├── research.md # 搜索与内容研究
│ └── browser.md # 浏览器自动化
│
├── llm/ # LLM 集成
│ ├── gemini.py # Gemini Provider
│ ├── openrouter.py # OpenRouter Provider(OpenAI 兼容格式)
│ ├── yescode.py # Yescode Provider(Anthropic 原生 Messages API)
│ └── prompts/ # Prompt 工具
| 模块 | 职责 |
|---|---|
| core/ | Agent 执行引擎 + 预设配置 |
| trace/ | 执行追踪 + 计划管理 |
| tools/ | 与外部世界交互(文件、命令、网络、浏览器) |
| memory/ | 跨会话知识(Skills、Experiences) |
| llm/ | LLM Provider 适配 |
┌─────────────────────────────────────────────────────────────┐
│ Layer 3: Skills(技能库) │
│ - Markdown 文件,存储领域知识和能力描述 │
│ - 通过 skill 工具按需加载到对话历史 │
└─────────────────────────────────────────────────────────────┘
▲
│ 归纳
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Experience(经验库) │
│ - 数据库存储,条件 + 规则 + 证据 │
│ - 向量检索,注入到 system prompt │
└─────────────────────────────────────────────────────────────┘
▲
│ 提取
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Trace(任务状态) │
│ - 当前任务的工作记忆 │
│ - Trace + Messages 记录执行过程 │
│ - Goals 管理执行计划 │
└─────────────────────────────────────────────────────────────┘
框架内部统一使用 OpenAI 兼容格式(List[Dict])存储和传递消息。各 Provider 负责双向转换:
| 方向 | 说明 |
|---|---|
| 入(LLM 响应 → 框架) | 提取 content、tool_calls、usage,转换为统一 Dict |
| 出(框架 → LLM 请求) | OpenAI 格式消息列表 → 各 API 原生格式 |
存储层每个 tool result 独立一条 Message(OpenAI 格式最大公约数)。各 Provider 在出方向按 API 要求自行分组:
| Provider | 分组方式 |
|---|---|
| OpenRouter | 无需分组(OpenAI 原生支持独立 tool 消息) |
| Yescode | _convert_messages_to_anthropic 合并连续 tool 消息为单个 user message |
| Gemini | _convert_messages_to_gemini 通过 buffer 合并连续 tool 消息 |
不同 Provider 生成的 tool_call_id 格式不同(OpenAI: call_xxx,Anthropic: toolu_xxx,Gemini: 合成 call_0)。存储层按原样保存,不做规范化。
跨 Provider 续跑时,出方向转换前检测历史中的 tool_call_id 格式,不兼容时统一重写为目标格式(保持 tool_use / tool_result 配对一致)。同格式跳过,零开销。Gemini 按 function name 匹配,无需重写。
实现:agent/llm/openrouter.py:_normalize_tool_call_ids, agent/llm/yescode.py:_normalize_tool_call_ids
Layer 1: Infrastructure(基础设施,AgentRunner 构造时设置)
trace_store, memory_store, tool_registry, llm_call, skills_dir, utility_llm_call
Layer 2: RunConfig(运行参数,每次 run 时指定)
├─ 模型层:model, temperature, max_iterations, tools
└─ 框架层:trace_id, agent_type, uid, system_prompt, parent_trace_id, ...
Layer 3: Messages(任务消息,OpenAI SDK 格式 List[Dict])
[{"role": "user", "content": "分析这张图的构图"}]
@dataclass
class RunConfig:
# 模型层参数
model: str = "gpt-4o"
temperature: float = 0.3
max_iterations: int = 200
tools: Optional[List[str]] = None # None = 全部已注册工具
# 框架层参数
agent_type: str = "default"
uid: Optional[str] = None
system_prompt: Optional[str] = None # None = 从 skills 自动构建
skills: Optional[List[str]] = None # 注入 system prompt 的 skill 名称列表;None = 按 preset 决定
enable_memory: bool = True
auto_execute_tools: bool = True
name: Optional[str] = None # 显示名称(空则由 utility_llm 自动生成)
# Trace 控制
trace_id: Optional[str] = None # None = 新建
parent_trace_id: Optional[str] = None # 子 Agent 专用
parent_goal_id: Optional[str] = None
# 续跑控制
after_sequence: Optional[int] = None # 从哪条消息后续跑(message sequence)
实现:agent/core/runner.py:RunConfig
通过 RunConfig 参数自然区分,统一入口 run(messages, config):
| 模式 | trace_id | after_sequence | messages 含义 | API 端点 |
|---|---|---|---|---|
| 新建 | None | - | 初始任务消息 | POST /api/traces |
| 续跑 | 已有 ID | None 或 == head | 追加到末尾的新消息 | POST /api/traces/{id}/run |
| 回溯 | 已有 ID | 主路径上 < head | 在插入点之后追加的新消息 | POST /api/traces/{id}/run |
Runner 根据 after_sequence 与当前 head_sequence 的关系自动判断行为,前端无需指定模式。
async def run(messages: List[Dict], config: RunConfig = None) -> AsyncIterator[Union[Trace, Message]]:
# Phase 1: PREPARE TRACE
# 无 trace_id → 创建新 Trace(生成 name,初始化 GoalTree)
# 有 trace_id + after_sequence 为 None 或 == head → 加载已有 Trace,状态置为 running
# 有 trace_id + after_sequence < head → 加载 Trace,执行 rewind(快照 GoalTree,重建,设 parent_sequence)
trace = await _prepare_trace(config)
yield trace
# Phase 2: BUILD HISTORY
# 从 head_sequence 沿 parent chain 回溯构建主路径消息
# 构建 system prompt(新建时注入 skills/experiences;续跑时复用已有)
# 追加 input messages(设置 parent_sequence 指向当前 head)
history, sequence = await _build_history(trace, messages, config)
# Phase 3: AGENT LOOP
for iteration in range(config.max_iterations):
# 周期性注入 GoalTree + Active Collaborators(每 10 轮)
if iteration % 10 == 0:
inject_context(goal_tree, collaborators)
response = await llm_call(messages=history, model=config.model, tools=tool_schemas)
# 按需自动创建 root goal(兜底)
# 记录 assistant Message
# 执行工具,记录 tool Messages
# 无 tool_calls 则 break
# Phase 4: COMPLETE
# 更新 Trace 状态 (completed/failed)
trace.status = "completed"
yield trace
实现:agent/core/runner.py:AgentRunner
回溯通过 RunConfig(trace_id=..., after_sequence=N) 触发(N 在主路径上且 < head_sequence),在 Phase 1 中执行:
events.jsonl(rewind 事件的 goal_tree_snapshot 字段)created_at 为界,保留 created_at <= cutoff_time 的所有 goals(无论状态),丢弃 cutoff 之后创建的 goals,清空 current_id。将被保留的 in_progress goal 重置为 pendingparent_sequence 指向 rewind 点,旧消息自动脱离主路径head_sequence 更新为新消息的 sequence,status 改回 running新消息的 sequence 从 last_sequence + 1 开始(全局递增,不复用)。旧消息无需标记 abandoned,通过消息树结构自然隔离。
三种模式共享同一入口 run(messages, config):
# 新建
async for item in runner.run(
messages=[{"role": "user", "content": "分析项目架构"}],
config=RunConfig(model="gpt-4o"),
):
...
# 续跑:在已有 trace 末尾追加消息继续执行
async for item in runner.run(
messages=[{"role": "user", "content": "继续"}],
config=RunConfig(trace_id="existing-trace-id"),
):
...
# 回溯:从指定 sequence 处切断,插入新消息重新执行
# after_sequence=5 表示新消息的 parent_sequence=5,从此处开始
async for item in runner.run(
messages=[{"role": "user", "content": "换一个方案试试"}],
config=RunConfig(trace_id="existing-trace-id", after_sequence=5),
):
...
# 重新生成:回溯后不插入新消息,直接基于已有消息重跑
async for item in runner.run(
messages=[],
config=RunConfig(trace_id="existing-trace-id", after_sequence=5),
):
...
after_sequence 的值是 message 的 sequence 号,可通过 GET /api/traces/{trace_id}/messages 查看。如果指定的 sequence 是一条带 tool_calls 的 assistant 消息,系统会自动将截断点扩展到其所有对应的 tool response 之后(安全截断)。
停止运行:
# 停止正在运行的 Trace
await runner.stop(trace_id)
调用后 agent loop 在下一个检查点退出,Trace 状态置为 stopped,同时保存当前 head_sequence(确保续跑时能正确加载完整历史)。
消息完整性保护(orphaned tool_call 修复):续跑加载历史时,_build_history 自动检测并修复 orphaned tool_calls(_heal_orphaned_tool_calls)。当 agent 被 stop/crash 中断时,可能存在 assistant 的 tool_calls 没有对应的 tool results(包括部分完成的情况:3 个 tool_call 只有 1 个 tool_result)。直接发给 LLM 会导致 400 错误。
修复策略:为每个缺失的 tool_result 插入合成的中断通知(而非裁剪 assistant 消息):
| 工具类型 | 合成 tool_result 内容 |
|---|---|
| 普通工具 | 简短中断提示,建议重新调用 |
| agent/evaluate | 结构化中断信息,包含 sub_trace_id、执行统计、continue_from 用法指引 |
agent 工具的合成结果对齐正常返回值格式(含 sub_trace_id 字段),主 Agent 可直接使用 agent(task=..., continue_from=sub_trace_id) 续跑被中断的子 Agent。合成消息持久化存储,确保幂等。
实现:agent/core/runner.py:AgentRunner._heal_orphaned_tool_calls
run(messages, config):核心方法,流式返回 AsyncIterator[Union[Trace, Message]]run_result(messages, config, on_event=None):便利方法,内部消费 run(),返回结构化结果。on_event 回调可实时接收每个 Trace/Message 事件(用于调试时输出子 Agent 执行过程)。主要用于 agent/evaluate 工具内部| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/traces |
列出 Traces |
| GET | /api/traces/{id} |
获取 Trace 详情(含 GoalTree、Sub-Traces) |
| GET | /api/traces/{id}/messages |
获取 Messages(支持 mode=main_path/all) |
| GET | /api/traces/running |
列出正在运行的 Trace |
| WS | /api/traces/{id}/watch |
实时事件推送 |
实现:agent/trace/api.py, agent/trace/websocket.py
需在 api_server.py 中配置 Runner。执行在后台异步进行,通过 WebSocket 监听进度。
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/traces |
新建 Trace 并执行 |
| POST | /api/traces/{id}/run |
运行(统一续跑 + 回溯) |
| POST | /api/traces/{id}/stop |
停止运行中的 Trace |
| POST | /api/traces/{id}/reflect |
触发反思,从执行历史中提取经验 |
# 新建
curl -X POST http://localhost:8000/api/traces \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "分析项目架构"}], "model": "gpt-4o"}'
# 续跑(after_sequence 为 null 或省略)
curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
-d '{"messages": [{"role": "user", "content": "继续深入分析"}]}'
# 回溯:从 sequence 5 处截断,插入新消息重新执行
curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
-d '{"after_sequence": 5, "messages": [{"role": "user", "content": "换一个方案"}]}'
# 重新生成:回溯到 sequence 5,不插入新消息,直接重跑
curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
-d '{"after_sequence": 5, "messages": []}'
# 停止
curl -X POST http://localhost:8000/api/traces/{trace_id}/stop
# 反思:追加反思 prompt 运行,结果追加到 experiences 文件
curl -X POST http://localhost:8000/api/traces/{trace_id}/reflect \
-d '{"focus": "为什么第三步选择了错误的方案"}'
响应立即返回 {"trace_id": "...", "status": "started"},通过 WS /api/traces/{trace_id}/watch 监听实时事件。
实现:agent/trace/run_api.py
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/experiences |
读取经验文件内容 |
实现:agent/trace/run_api.py
一次完整的 Agent 执行。所有 Agent(主、子、人类协助)都是 Trace。
@dataclass
class Trace:
trace_id: str
mode: Literal["call", "agent"] # 单次调用 or Agent 模式
# Prompt 标识
prompt_name: Optional[str] = None
# Agent 模式特有
task: Optional[str] = None
agent_type: Optional[str] = None
# 父子关系(Sub-Trace 特有)
parent_trace_id: Optional[str] = None # 父 Trace ID
parent_goal_id: Optional[str] = None # 哪个 Goal 启动的
# 状态
status: Literal["running", "completed", "failed", "stopped"] = "running"
# 统计
total_messages: int = 0
total_tokens: int = 0 # 总 tokens(prompt + completion)
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
total_cost: float = 0.0
total_duration_ms: int = 0
# 进度追踪
last_sequence: int = 0 # 最新 message 的 sequence(全局递增,不复用)
head_sequence: int = 0 # 当前主路径的头节点 sequence(用于 build_llm_messages)
last_event_id: int = 0 # 最新事件 ID(用于 WS 续传)
# 配置
uid: Optional[str] = None
model: Optional[str] = None # 默认模型
tools: Optional[List[Dict]] = None # 工具定义(OpenAI 格式)
llm_params: Dict[str, Any] = {} # LLM 参数(temperature 等)
context: Dict[str, Any] = {} # 元数据(含 collaborators 列表)
# 当前焦点
current_goal_id: Optional[str] = None
# 结果
result_summary: Optional[str] = None
error_message: Optional[str] = None
# 时间
created_at: datetime
completed_at: Optional[datetime] = None
实现:agent/trace/models.py
计划中的一个目标,支持层级结构。单独存储于 goal.json。
@dataclass
class Goal:
id: str # 内部 ID("1", "2"...)
description: str
reason: str = "" # 创建理由
parent_id: Optional[str] = None # 父 Goal ID
type: GoalType = "normal" # normal | agent_call
status: GoalStatus = "pending" # pending | in_progress | completed | abandoned
summary: Optional[str] = None # 完成/放弃时的总结
# agent_call 特有(启动 Sub-Trace)
sub_trace_ids: Optional[List[str]] = None
agent_call_mode: Optional[str] = None # explore | delegate | evaluate
sub_trace_metadata: Optional[Dict] = None
# 统计
self_stats: GoalStats # 自身 Messages 统计
cumulative_stats: GoalStats # 包含子孙的累计统计
created_at: datetime
Goal 类型:
normal - 普通目标,由 Agent 直接执行agent_call - 通过 agent/evaluate 工具创建的目标,会启动 Sub-Traceagent_call 类型的 Goal:
agent/evaluate 工具时自动设置agent_call_mode 记录使用的模式(explore/delegate/evaluate)sub_trace_ids 记录创建的所有 Sub-Trace IDsummary 包含格式化的汇总结果(explore 模式会汇总所有分支)Goal 操作(通过 goal 工具):
add - 添加顶层目标under - 在指定目标下添加子目标after - 在指定目标后添加兄弟目标focus - 切换焦点到指定目标done - 完成当前目标(附带 summary)abandon - 放弃当前目标(附带原因)实现:agent/trace/goal_models.py, agent/trace/goal_tool.py
对应 LLM API 的消息,每条 Message 关联一个 Goal。消息通过 parent_sequence 形成树结构。
@dataclass
class Message:
message_id: str # 格式:{trace_id}-{sequence:04d}
trace_id: str
role: Literal["system", "user", "assistant", "tool"]
sequence: int # 全局顺序(递增,不复用)
parent_sequence: Optional[int] = None # 父消息的 sequence(构成消息树)
goal_id: Optional[str] = None # 关联的 Goal ID(初始消息为 None,系统会按需自动创建 root goal 兜底)
description: str = "" # 系统自动生成的摘要
tool_call_id: Optional[str] = None
content: Any = None
# 统计
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
cost: Optional[float] = None
duration_ms: Optional[int] = None
# LLM 响应信息(仅 role="assistant")
finish_reason: Optional[str] = None
created_at: datetime
# [已弃用] 由 parent_sequence 树结构替代
status: Literal["active", "abandoned"] = "active"
abandoned_at: Optional[datetime] = None
消息树(Message Tree):
消息通过 parent_sequence 形成树。主路径 = 从 trace.head_sequence 沿 parent chain 回溯到 root。
正常对话:1 → 2 → 3 → 4 → 5 (每条的 parent 指向前一条)
Rewind 到 3:3 → 6(parent=3) → 7 (新主路径,4-5 自动脱离)
压缩 1-3: 8(summary, parent=None) → 6 → 7 (summary 跳过被压缩的消息)
反思分支: 5 → 9(reflect, parent=5) → 10 (侧枝,不在主路径上)
build_llm_messages = 从 head 沿 parent_sequence 链回溯到 root,反转后返回。
Message 提供格式转换方法:
to_llm_dict() → OpenAI 格式 Dict(用于 LLM 调用)from_llm_dict(d, trace_id, sequence, goal_id) → 从 OpenAI 格式创建 Message实现:agent/trace/models.py
不同类型 Agent 的配置模板,控制工具权限和参数。
@dataclass
class AgentPreset:
allowed_tools: Optional[List[str]] = None # None 表示允许全部
denied_tools: Optional[List[str]] = None # 黑名单
max_iterations: int = 30
temperature: Optional[float] = None
skills: Optional[List[str]] = None # 注入 system prompt 的 skill 名称列表;None = 加载全部
description: Optional[str] = None
_DEFAULT_SKILLS = ["planning", "research", "browser"]
AGENT_PRESETS = {
"default": AgentPreset(
allowed_tools=None,
max_iterations=30,
skills=_DEFAULT_SKILLS,
description="默认 Agent,拥有全部工具权限",
),
"explore": AgentPreset(
allowed_tools=["read", "glob", "grep", "list_files"],
denied_tools=["write", "edit", "bash", "task"],
max_iterations=15,
skills=["planning"],
description="探索型 Agent,只读权限,用于代码分析",
),
"analyst": AgentPreset(
allowed_tools=["read", "glob", "grep", "web_search", "webfetch"],
denied_tools=["write", "edit", "bash", "task"],
temperature=0.3,
max_iterations=25,
skills=["planning", "research"],
description="分析型 Agent,用于深度分析和研究",
),
}
实现:agent/core/presets.py
用户自定义:项目级配置文件(如 examples/how/presets.json)可通过 register_preset() 注册额外预设。项目专用的 Agent 类型建议放在项目目录下,而非内置预设。
通过 agent 工具创建子 Agent 执行任务。task 参数为字符串时为单任务(delegate),为列表时并行执行多任务(explore)。支持通过 messages 参数预置消息,通过 continue_from 参数续跑已有 Sub-Trace。
agent 工具负责创建 Sub-Trace 和初始化 GoalTree(因为需要设置自定义 context 元数据和命名规则),创建完成后将 trace_id 传给 RunConfig,由 Runner 接管后续执行。工具同时维护父 Trace 的 context["collaborators"] 列表。
支持跨设备的 Agent 间持续对话,通过远程 Trace ID 实现:
Trace ID 格式:
abc-123agent://terminal-agent-456/abc-123(协议 + Agent 地址 + 本地 ID)使用方式:
# 调用远程 Agent
result = agent(task="分析本地项目", agent_url="https://terminal-agent.local")
# 返回: {"sub_trace_id": "agent://terminal-agent.local/abc-123"}
# 续跑远程 Trace(持续对话)
result2 = agent(
task="重点分析core模块",
continue_from="agent://terminal-agent.local/abc-123",
agent_url="https://terminal-agent.local"
)
实现:HybridTraceStore 自动路由到本地或远程存储,远程访问通过 HTTP API 实现。
实现位置:agent/trace/hybrid_store.py(规划中)
@tool(description="创建 Agent 执行任务")
async def agent(
task: Union[str, List[str]],
messages: Optional[Union[Messages, List[Messages]]] = None,
continue_from: Optional[str] = None,
agent_type: Optional[str] = None,
skills: Optional[List[str]] = None,
agent_url: Optional[str] = None, # 远程 Agent 地址(跨设备)
context: Optional[dict] = None,
) -> Dict[str, Any]:
参数:
agent_type: 子 Agent 类型,决定工具权限和默认 skills(对应 AgentPreset 名称)skills: 覆盖 preset 默认值,显式指定注入 system prompt 的 skill 列表agent_url: 远程 Agent 地址,用于跨设备调用(返回远程 Trace ID)continue_from: 支持本地或远程 Trace ID单任务(delegate):task: str
continue_from 续跑已有 Sub-Trace(本地或远程)messages 预置上下文消息多任务(explore):task: List[str]
asyncio.gather() 并行执行所有任务messages 支持 1D(共享)或 2D(per-agent)continue_from@tool(description="评估目标执行结果是否满足要求")
async def evaluate(
messages: Optional[Messages] = None,
target_goal_id: Optional[str] = None,
continue_from: Optional[str] = None,
context: Optional[dict] = None,
) -> Dict[str, Any]:
messages 中target_goal_id 默认为当前 goal_id定义在 agent/trace/models.py,用于工具参数和 runner/LLM API 接口:
ChatMessage = Dict[str, Any] # 单条 OpenAI 格式消息
Messages = List[ChatMessage] # 消息列表
MessageContent = Union[str, List[Dict[str, str]]] # content 字段(文本或多模态)
实现位置:agent/tools/builtin/subagent.py
创建阻塞式 Trace,等待人类通过 IM/邮件等渠道回复。
注意:此功能规划中,暂未实现。
任务执行中与模型密切协作的实体(子 Agent 或人类),按 与当前任务的关系 分类,而非按 human/agent 分类:
| 持久存在(外部可查) | 任务内活跃(需要注入) | |
|---|---|---|
| Agent | 专用 Agent(代码审查等) | 当前任务创建的子 Agent |
| Human | 飞书通讯录 | 当前任务中正在对接的人 |
活跃协作者存储在 trace.context["collaborators"]:
{
"name": "researcher", # 名称(模型可见)
"type": "agent", # agent | human
"trace_id": "abc-@delegate-001", # trace_id(agent 场景)
"status": "completed", # running | waiting | completed | failed
"summary": "方案A最优", # 最近状态摘要
}
与 GoalTree 一同周期性注入(每 10 轮),渲染为 Markdown:
## Active Collaborators
- researcher [agent, completed]: 方案A最优
- 谭景玉 [human, waiting]: 已发送方案确认,等待回复
- coder [agent, running]: 正在实现特征提取模块
列表为空时不注入。
各工具负责更新 collaborators 列表(通过 context["store"] 写入 trace.context):
agent 工具:创建/续跑子 Agent 时更新feishu 工具:发送消息/收到回复时更新持久联系人/Agent:通过工具按需查询(如 feishu_get_contact_list),不随任务注入。
实现:agent/core/runner.py:AgentRunner._build_context_injection, agent/tools/builtin/subagent.py
Context Injection Hooks 是一个可扩展机制,允许外部模块(如 A2A IM、监控系统)向 Agent 的周期性上下文注入中添加自定义内容。
Runner Loop (每 10 轮)
↓
_build_context_injection()
├─ GoalTree (内置)
├─ Active Collaborators (内置)
└─ Context Hooks (可扩展)
├─ A2A IM Hook → "💬 3 条新消息"
├─ Monitor Hook → "⚠️ 内存使用 85%"
└─ Custom Hook → 自定义内容
↓
注入为 system message
↓
LLM 看到提醒 → 决定是否调用工具
# Hook 函数签名
def context_hook(trace: Trace, goal_tree: Optional[GoalTree]) -> Optional[str]:
"""
生成要注入的上下文内容
Args:
trace: 当前 Trace
goal_tree: 当前 GoalTree
Returns:
要注入的 Markdown 内容,None 表示无内容
"""
return "## Custom Section\n\n内容..."
# 创建 Runner 时注册
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
context_hooks=[hook1, hook2, hook3] # 按顺序注入
)
Runner 修改:
# agent/core/runner.py
class AgentRunner:
def __init__(
self,
# ... 现有参数
context_hooks: Optional[List[Callable]] = None
):
self.context_hooks = context_hooks or []
def _build_context_injection(
self,
trace: Trace,
goal_tree: Optional[GoalTree],
) -> str:
"""构建周期性注入的上下文(GoalTree + Active Collaborators + Hooks)"""
parts = []
# GoalTree(现有)
if goal_tree and goal_tree.goals:
parts.append(f"## Current Plan\n\n{goal_tree.to_prompt()}")
# ... focus 提醒
# Active Collaborators(现有)
collaborators = trace.context.get("collaborators", [])
if collaborators:
lines = ["## Active Collaborators"]
for c in collaborators:
# ... 现有逻辑
parts.append("\n".join(lines))
# Context Hooks(新增)
for hook in self.context_hooks:
try:
hook_content = hook(trace, goal_tree)
if hook_content:
parts.append(hook_content)
except Exception as e:
logger.error(f"Context hook error: {e}")
return "\n\n".join(parts)
实现位置:agent/core/runner.py:AgentRunner._build_context_injection(待实现)
# agent/tools/builtin/a2a_im.py
class A2AMessageQueue:
"""A2A IM 消息队列"""
def __init__(self):
self._messages: List[Dict] = []
def push(self, message: Dict):
"""Gateway 推送消息时调用"""
self._messages.append(message)
def pop_all(self) -> List[Dict]:
"""check_messages 工具调用时清空"""
messages = self._messages
self._messages = []
return messages
def get_summary(self) -> Optional[str]:
"""获取消息摘要(用于 context injection)"""
if not self._messages:
return None
count = len(self._messages)
latest = self._messages[-1]
from_agent = latest.get("from_agent_id", "unknown")
if count == 1:
return f"💬 来自 {from_agent} 的 1 条新消息(使用 check_messages 工具查看)"
else:
return f"💬 {count} 条新消息,最新来自 {from_agent}(使用 check_messages 工具查看)"
def create_a2a_context_hook(message_queue: A2AMessageQueue):
"""创建 A2A IM 的 context hook"""
def a2a_context_hook(trace: Trace, goal_tree: Optional[GoalTree]) -> Optional[str]:
"""注入 A2A IM 消息提醒"""
summary = message_queue.get_summary()
if not summary:
return None
return f"## Messages\n\n{summary}"
return a2a_context_hook
@tool(description="检查来自其他 Agent 的新消息")
async def check_messages(ctx: ToolContext) -> ToolResult:
"""检查并获取来自其他 Agent 的新消息"""
message_queue: A2AMessageQueue = ctx.context.get("a2a_message_queue")
if not message_queue:
return ToolResult(title="消息队列未初始化", output="")
messages = message_queue.pop_all()
if not messages:
return ToolResult(title="无新消息", output="")
# 格式化消息
lines = [f"收到 {len(messages)} 条新消息:\n"]
for i, msg in enumerate(messages, 1):
from_agent = msg.get("from_agent_id", "unknown")
content = msg.get("content", "")
conv_id = msg.get("conversation_id", "")
lines.append(f"{i}. 来自 {from_agent}")
lines.append(f" 对话 ID: {conv_id}")
lines.append(f" 内容: {content}")
lines.append("")
return ToolResult(
title=f"收到 {len(messages)} 条新消息",
output="\n".join(lines)
)
实现位置:agent/tools/builtin/a2a_im.py(待实现)
# api_server.py
from agent.tools.builtin.a2a_im import (
A2AMessageQueue,
create_a2a_context_hook,
check_messages
)
# 创建消息队列
message_queue = A2AMessageQueue()
# 创建 context hook
a2a_hook = create_a2a_context_hook(message_queue)
# 创建 Runner 时注入 hook
runner = AgentRunner(
llm_call=llm_call,
trace_store=trace_store,
context_hooks=[a2a_hook]
)
# 注册 check_messages 工具
tool_registry.register(check_messages)
# 启动 Gateway webhook 端点
@app.post("/webhook/a2a-messages")
async def receive_a2a_message(message: dict):
"""接收来自 Gateway 的消息"""
message_queue.push(message)
return {"status": "received"}
## Current Plan
1. [in_progress] 分析代码架构
1.1. [completed] 读取项目结构
1.2. [in_progress] 分析核心模块
## Active Collaborators
- researcher [agent, completed]: 已完成调研
## Messages
💬 来自 code-reviewer 的 1 条新消息(使用 check_messages 工具查看)
监控告警:
def create_monitor_hook(monitor):
def monitor_hook(trace, goal_tree):
alerts = monitor.get_alerts()
if not alerts:
return None
return f"## System Alerts\n\n⚠️ {len(alerts)} 条告警(使用 check_alerts 工具查看)"
return monitor_hook
定时提醒:
def create_timer_hook(timer):
def timer_hook(trace, goal_tree):
if timer.should_remind():
return "## Reminder\n\n⏰ 任务已执行 30 分钟,建议检查进度"
return None
return timer_hook
实现位置:各模块自行实现 hook 函数
任务执行中与模型密切协作的实体(子 Agent 或人类),按 与当前任务的关系 分类,而非按 human/agent 分类:
| 持久存在(外部可查) | 任务内活跃(需要注入) | |
|---|---|---|
| Agent | 专用 Agent(代码审查等) | 当前任务创建的子 Agent |
| Human | 飞书通讯录 | 当前任务中正在对接的人 |
活跃协作者存储在 trace.context["collaborators"]:
{
"name": "researcher", # 名称(模型可见)
"type": "agent", # agent | human
"trace_id": "abc-@delegate-001", # trace_id(agent 场景)
"status": "completed", # running | waiting | completed | failed
"summary": "方案A最优", # 最近状态摘要
}
与 GoalTree 一同周期性注入(每 10 轮),渲染为 Markdown:
## Active Collaborators
- researcher [agent, completed]: 方案A最优
- 谭景玉 [human, waiting]: 已发送方案确认,等待回复
- coder [agent, running]: 正在实现特征提取模块
列表为空时不注入。
各工具负责更新 collaborators 列表(通过 context["store"] 写入 trace.context):
agent 工具:创建/续跑子 Agent 时更新feishu 工具:发送消息/收到回复时更新持久联系人/Agent:通过工具按需查询(如 feishu_get_contact_list),不随任务注入。
实现:agent/core/runner.py:AgentRunner._build_context_injection, agent/tools/builtin/subagent.py
@tool()
async def my_tool(arg: str, ctx: ToolContext) -> ToolResult:
return ToolResult(
title="Success",
output="Result content",
long_term_memory="Short summary" # 可选:压缩后保留的摘要
)
| 类型 | 作用 |
|---|---|
@tool |
装饰器,自动注册工具并生成 Schema |
ToolResult |
工具执行结果,支持双层记忆 |
ToolContext |
工具执行上下文,依赖注入 |
| 目录 | 工具 | 说明 |
|---|---|---|
trace/ |
goal | Agent 内部计划管理 |
builtin/ |
agent, evaluate | 子 Agent 创建与评估 |
builtin/file/ |
read, write, edit, glob, grep | 文件操作 |
builtin/browser/ |
browser actions | 浏览器自动化 |
builtin/ |
bash, sandbox, search, webfetch, skill, ask_human | 其他工具 |
大输出(如网页抓取)只传给 LLM 一次,之后用摘要替代:
ToolResult(
output="<10K tokens 的完整内容>",
long_term_memory="Extracted 10000 chars from amazon.com",
include_output_only_once=True
)
详细文档:工具系统
| 类型 | 加载位置 | 加载时机 |
|---|---|---|
| 内置 Skill | System Prompt | Agent 启动时自动注入 |
| 项目 Skill | System Prompt | Agent 启动时按 preset/call-site 过滤后注入 |
| 普通 Skill | 对话消息 | 模型调用 skill 工具时 |
agent/memory/skills/ # 内置 Skills(始终加载)
├── planning.md # 计划与 Goal 工具使用
├── research.md # 搜索与内容研究
└── browser.md # 浏览器自动化
./skills/ # 项目自定义 Skills
不同 Agent 类型所需的 skills 不同。过滤优先级:
agent() 工具的 skills 参数(显式指定,最高优先级)AgentPreset.skills(preset 默认值)None(加载全部,向后兼容)示例:调用子 Agent 时只注入解构相关 skill:
agent(task="...", agent_type="deconstruct", skills=["planning", "deconstruct"])
实现:agent/memory/skill_loader.py
详细文档:Skills 使用指南
从执行历史中提取的经验规则,用于指导未来任务。
经验以 Markdown 文件存储(默认 ./.cache/experiences.md),人类可读、可编辑、可版本控制。
文件格式:
---
id: ex_001
trace_id: trace-xxx
category: tool_usage
tags: {state: ["large_file", "dirty_repo"], intent: ["batch_edit", "safe_modify"]}
metrics: {helpful: 12, harmful: 0}
created_at: 2026-02-12 15:30
---
---
id: ex_002
...
通过 POST /api/traces/{id}/reflect 触发,旨在将原始执行历史提炼为可复用的知识。
1. 分叉反思:在 trace 末尾追加 user message(含反思与打标 Prompt),作为侧枝执行。
2. 结构化生成:
·归类:将经验分配至 tool_usage(工具)、logic_flow(逻辑)、environment(环境)等。
·打标:提取 state(环境状态)与 intent(用户意图)语义标签。
·量化:初始 helpful 设为 1。
3. 持久化:将带有元数据的 Markdown 块追加至 experiences.md。
实现:agent/trace/run_api.py:reflect_trace
新建 Trace 时,Runner 采用“分析-检索-注入”三阶段策略,实现精准经验推荐。
1. 意图预分析
Runner 调用 utility_llm 对初始任务进行语义提取:
-输入:"优化这个项目的 Docker 构建速度"
-输出:{state: ["docker", "ci"], intent: ["optimization"]}
2. 语义检索
在 _load_experiences 中根据标签进行语义匹配(优先匹配 intent,其次是 state),筛选出相关度最高的 Top-K 条经验。
3. 精准注入
将匹配到的经验注入第一条 user message 末尾:
# _build_history 中(仅新建模式):
if not config.trace_id:
relevant_ex = self.experience_retriever.search(task_tags)
if relevant_ex:
formatted_ex = "\n".join([f"- [{e.id}] {e.content} (Helpful: {e.helpful})" for e in relevant_ex])
first_user_msg["content"] += f"\n\n## 参考经验\n\n{formatted_ex}"
实现:agent/core/runner.py:AgentRunner._build_history
不再仅限于启动时自动注入,而是通过内置工具供 Agent 在需要时主动调用。当执行结果不符合预期或进入未知领域时,Agent 应优先使用此工具。 工具定义:
@tool(description="根据当前任务状态和意图,从经验库中检索相关的历史经验")
async def get_experience(
intent: Optional[str] = None,
state: Optional[str] = None
) -> Dict[str, Any]:
"""
参数:
intent: 想要达成的目标意图 (如 "optimization", "debug")
state: 当前环境或遇到的问题状态 (如 "docker_build_fail", "permission_denied")
"""
实现: agent/tools/builtin/experience.py
每轮 agent loop 构建 llm_messages 时自动执行:
大多数情况下 Level 1 足够。
触发条件:Level 1 之后 token 数仍超过阈值(默认 max_tokens × 0.8)。
流程:
./.cache/experiences.md。反思消息为侧枝(parent_sequence 分叉,不在主路径上)parent_sequence 跳过被压缩的范围to_prompt() 支持两种模式:
include_summary=False(默认):精简视图,用于日常周期性注入include_summary=True:含所有 completed goals 的 summary,用于 Level 2 压缩时提供上下文messages/parent_sequence 树结构实现跳过,无需 compression events 或 skip list实现:agent/trace/compaction.py, agent/trace/goal_models.py
详细文档:Context 管理
class TraceStore(Protocol):
async def create_trace(self, trace: Trace) -> None: ...
async def get_trace(self, trace_id: str) -> Trace: ...
async def update_trace(self, trace_id: str, **updates) -> None: ...
async def add_message(self, message: Message) -> None: ...
async def get_trace_messages(self, trace_id: str) -> List[Message]: ...
async def get_main_path_messages(self, trace_id: str, head_sequence: int) -> List[Message]: ...
async def get_messages_by_goal(self, trace_id: str, goal_id: str) -> List[Message]: ...
async def append_event(self, trace_id: str, event_type: str, payload: Dict) -> int: ...
get_main_path_messages 从 head_sequence 沿 parent_sequence 链回溯,返回主路径上的有序消息列表。
实现:
agent/trace/protocols.pyagent/trace/store.py:FileSystemTraceStoreagent/trace/remote_store.py:RemoteTraceStore(规划中)agent/trace/hybrid_store.py:HybridTraceStore(规划中)HybridTraceStore 根据 Trace ID 自动路由到本地或远程存储:
| Trace ID 格式 | 存储位置 | 访问方式 |
|---|---|---|
abc-123 |
本地文件系统 | FileSystemTraceStore |
agent://host/abc-123 |
远程 Agent | HTTP API(RemoteTraceStore) |
RemoteTraceStore 通过 HTTP API 访问远程 Trace:
GET /api/traces/{trace_id} - 获取 Trace 元数据GET /api/traces/{trace_id}/messages - 获取消息历史POST /api/traces/{trace_id}/run - 续跑(追加消息并执行)认证:通过 API Key 认证,配置在 config/agents.yaml。
实现位置:agent/trace/hybrid_store.py, agent/trace/remote_store.py(规划中)
.trace/
├── {trace_id}/
│ ├── meta.json # Trace 元数据(含 tools 定义)
│ ├── goal.json # GoalTree(mission + goals 列表)
│ ├── events.jsonl # 事件流(goal 变更、sub_trace 生命周期等)
│ └── messages/ # Messages
│ ├── {trace_id}-0001.json
│ └── ...
│
└── {trace_id}@explore-{序号}-{timestamp}-001/ # 子 Trace
└── ...
events.jsonl 说明:
Sub-Trace 目录命名:
{parent}@explore-{序号:03d}-{timestamp}-001{parent}@delegate-{timestamp}-001{parent}@evaluate-{timestamp}-001meta.json 示例:
{
"trace_id": "0415dc38-...",
"mode": "agent",
"task": "分析代码结构",
"agent_type": "default",
"status": "running",
"model": "google/gemini-2.5-flash",
"tools": [...],
"llm_params": {"temperature": 0.3},
"context": {
"collaborators": [
{"name": "researcher", "type": "agent", "trace_id": "...", "status": "completed", "summary": "方案A最优"}
]
},
"current_goal_id": "3"
}
详见 设计决策文档
核心决策:
所有 Agent 都是 Trace - 主 Agent、子 Agent、人类协助统一为 Trace,通过 parent_trace_id 和 spawn_tool 区分
trace/ 模块统一管理执行状态 - 合并原 execution/ 和 goal/,包含计划管理和 Agent 内部控制工具
tools/ 专注外部交互 - 文件、命令、网络、浏览器等与外部世界的交互
Agent 预设替代 Sub-Agent 配置 - 通过 core/presets.py 定义不同类型 Agent 的工具权限和参数
| 文档 | 内容 |
|---|---|
| Context 管理 | Goals、压缩、Plan 注入策略 |
| 工具系统 | 工具定义、注册、双层记忆 |
| Skills 指南 | Skill 分类、编写、加载 |
| 多模态支持 | 图片、PDF 处理 |
| 知识管理 | 知识结构、检索、提取机制 |
| Scope 设计 | 知识可见性和权限控制 |
| Agent 设计决策 | Agent Core 架构决策记录 |
| Gateway 设计决策 | Gateway 架构决策记录 |
| 组织级概览 | 组织级 Agent 系统架构和规划 |
| Enterprise 实现 | 认证、审计、多租户技术实现 |
| 测试指南 | 测试策略和命令 |
| A2A 协议调研 | 行业 A2A 通信协议和框架对比 |
| A2A 跨设备通信 | 跨设备 Agent 通信方案(内部) |
| A2A Trace 存储 | 跨设备 Trace 存储方案详细设计 |
| MAMP 协议 | 与外部 Agent 系统的通用交互协议 |
| A2A IM 系统 | Agent 即时通讯系统架构和实现 |
| Gateway 架构 | Gateway 三层架构和设计决策 |
| Gateway 部署 | Gateway 部署模式和配置 |
| Gateway API | Gateway API 完整参考 |