Просмотр исходного кода

fix: infinite reflect with turns overflow

Talegorithm 3 часов назад
Родитель
Сommit
0896186009

+ 1 - 0
README.md

@@ -243,6 +243,7 @@ RunConfig(
     agent_type="default",     # 预设类型:default / explore / analyst
     agent_type="default",     # 预设类型:default / explore / analyst
     trace_id=None,            # 续跑/回溯时传入已有 trace ID
     trace_id=None,            # 续跑/回溯时传入已有 trace ID
     after_sequence=None,      # 从哪条消息后续跑(message sequence)
     after_sequence=None,      # 从哪条消息后续跑(message sequence)
+    goal_compression="on_overflow",  # Goal 压缩模式:none / on_complete / on_overflow
     knowledge=KnowledgeConfig(),  # 知识管理配置
     knowledge=KnowledgeConfig(),  # 知识管理配置
 )
 )
 ```
 ```

+ 2 - 0
agent/core/prompts/compression.py

@@ -21,6 +21,8 @@ COMPRESSION_PROMPT_TEMPLATE = """请对以上对话历史进行压缩总结。
 格式要求:
 格式要求:
 [[SUMMARY]]
 [[SUMMARY]]
 (此处填写结构化的摘要内容)
 (此处填写结构化的摘要内容)
+
+**生成摘要后立即停止,不要继续执行原有任务。**
 """
 """
 
 
 # 保留旧名以兼容 compaction.py 的调用
 # 保留旧名以兼容 compaction.py 的调用

+ 2 - 0
agent/core/prompts/knowledge.py

@@ -50,6 +50,7 @@ REFLECT_PROMPT = """请回顾以上执行过程,将值得沉淀的经验直接
 - 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获。
 - 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获。
 - 不需要输出任何文字,直接调用工具即可
 - 不需要输出任何文字,直接调用工具即可
 - 如果没有值得保存的经验,不调用任何工具
 - 如果没有值得保存的经验,不调用任何工具
+- **完成经验保存后立即停止,不要继续执行原有任务**
 """
 """
 
 
 
 
@@ -91,6 +92,7 @@ COMPLETION_REFLECT_PROMPT = """请对整个任务进行复盘,将值得沉淀
 - 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获。
 - 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获。
 - 不需要输出任何文字,直接调用工具即可
 - 不需要输出任何文字,直接调用工具即可
 - 如果没有值得保存的经验,不调用任何工具
 - 如果没有值得保存的经验,不调用任何工具
+- **完成经验保存后立即停止,不要继续执行原有任务**
 """
 """
 
 
 
 

+ 42 - 12
agent/core/runner.py

@@ -28,7 +28,7 @@ from agent.trace.protocols import TraceStore
 from agent.trace.goal_models import GoalTree
 from agent.trace.goal_models import GoalTree
 from agent.trace.compaction import (
 from agent.trace.compaction import (
     CompressionConfig,
     CompressionConfig,
-    filter_by_goal_status,
+    compress_completed_goals,
     estimate_tokens,
     estimate_tokens,
     needs_level2_compression,
     needs_level2_compression,
     build_compression_prompt,
     build_compression_prompt,
@@ -105,6 +105,7 @@ class RunConfig:
     max_iterations: int = 200
     max_iterations: int = 200
     tools: Optional[List[str]] = None          # None = 全部已注册工具
     tools: Optional[List[str]] = None          # None = 全部已注册工具
     side_branch_max_turns: int = 5             # 侧分支最大轮次(压缩/反思)
     side_branch_max_turns: int = 5             # 侧分支最大轮次(压缩/反思)
+    goal_compression: Literal["none", "on_complete", "on_overflow"] = "on_overflow"  # Goal 压缩模式
 
 
     # --- 强制侧分支(用于 API 手动触发或自动压缩流程)---
     # --- 强制侧分支(用于 API 手动触发或自动压缩流程)---
     # 使用列表作为侧分支队列,每次完成一个侧分支后 pop(0) 取下一个
     # 使用列表作为侧分支队列,每次完成一个侧分支后 pop(0) 取下一个
@@ -787,19 +788,19 @@ class AgentRunner:
             config.force_side_branch = ["reflection", "compression"]
             config.force_side_branch = ["reflection", "compression"]
             return history, head_seq, sequence, True
             return history, head_seq, sequence, True
 
 
-        # Level 1 压缩:GoalTree 过滤
-        if self.trace_store and goal_tree:
+        # Level 1 压缩:Goal 完成压缩
+        if config.goal_compression != "none" and self.trace_store and goal_tree:
             if head_seq > 0:
             if head_seq > 0:
                 main_path_msgs = await self.trace_store.get_main_path_messages(
                 main_path_msgs = await self.trace_store.get_main_path_messages(
                     trace_id, head_seq
                     trace_id, head_seq
                 )
                 )
-                filtered_msgs = filter_by_goal_status(main_path_msgs, goal_tree)
-                if len(filtered_msgs) < len(main_path_msgs):
+                compressed_msgs = compress_completed_goals(main_path_msgs, goal_tree)
+                if len(compressed_msgs) < len(main_path_msgs):
                     logger.info(
                     logger.info(
                         "Level 1 压缩: %d -> %d 条消息",
                         "Level 1 压缩: %d -> %d 条消息",
-                        len(main_path_msgs), len(filtered_msgs),
+                        len(main_path_msgs), len(compressed_msgs),
                     )
                     )
-                    history = [msg.to_llm_dict() for msg in filtered_msgs]
+                    history = [msg.to_llm_dict() for msg in compressed_msgs]
                 else:
                 else:
                     logger.info(
                     logger.info(
                         "Level 1 压缩: 无可过滤消息 (%d 条全部保留)",
                         "Level 1 压缩: 无可过滤消息 (%d 条全部保留)",
@@ -807,7 +808,7 @@ class AgentRunner:
                     )
                     )
         elif needs_compression:
         elif needs_compression:
             logger.warning(
             logger.warning(
-                "消息数 (%d) 或 token 数 (%d) 超过阈值,但无法执行 Level 1 压缩(缺少 store 或 goal_tree)",
+                "消息数 (%d) 或 token 数 (%d) 超过阈值,但无法执行 Level 1 压缩(缺少 store 或 goal_tree,或 goal_compression=none)",
                 msg_count, token_count,
                 msg_count, token_count,
             )
             )
 
 
@@ -1250,7 +1251,7 @@ class AgentRunner:
                         # 清除侧分支状态
                         # 清除侧分支状态
                         trace.context.pop("active_side_branch", None)
                         trace.context.pop("active_side_branch", None)
 
 
-                        # 队列中如果还有侧分支,保持 force_side_branch;否则清空
+                        # 队列中如果还有侧分支(如 compression),保持;否则清空
                         if not config.force_side_branch or len(config.force_side_branch) == 0:
                         if not config.force_side_branch or len(config.force_side_branch) == 0:
                             config.force_side_branch = None
                             config.force_side_branch = None
                             logger.info("反思超时,队列为空")
                             logger.info("反思超时,队列为空")
@@ -1268,9 +1269,6 @@ class AgentRunner:
                             history = [m.to_llm_dict() for m in main_path_messages]
                             history = [m.to_llm_dict() for m in main_path_messages]
                             head_seq = side_branch_ctx.start_head_seq
                             head_seq = side_branch_ctx.start_head_seq
 
 
-                        # 清除强制侧分支配置
-                        config.force_side_branch = None
-
                         side_branch_ctx = None
                         side_branch_ctx = None
                         continue
                         continue
 
 
@@ -1525,6 +1523,38 @@ class AgentRunner:
                         "content": tool_content_for_llm,
                         "content": tool_content_for_llm,
                     })
                     })
 
 
+                # on_complete 模式:goal(done=...) 后立即压缩该 goal 的消息
+                if (
+                    not side_branch_ctx
+                    and config.goal_compression == "on_complete"
+                    and self.trace_store
+                    and goal_tree
+                ):
+                    has_goal_done = False
+                    for tc in tool_calls:
+                        if tc["function"]["name"] != "goal":
+                            continue
+                        try:
+                            raw = tc["function"]["arguments"]
+                            args = json.loads(raw) if isinstance(raw, str) and raw.strip() else {}
+                        except (json.JSONDecodeError, TypeError):
+                            args = {}
+                        if args.get("done") is not None:
+                            has_goal_done = True
+                            break
+
+                    if has_goal_done:
+                        main_path_msgs = await self.trace_store.get_main_path_messages(
+                            trace_id, head_seq
+                        )
+                        compressed_msgs = compress_completed_goals(main_path_msgs, goal_tree)
+                        if len(compressed_msgs) < len(main_path_msgs):
+                            logger.info(
+                                "on_complete 压缩: %d -> %d 条消息",
+                                len(main_path_msgs), len(compressed_msgs),
+                            )
+                            history = [msg.to_llm_dict() for msg in compressed_msgs]
+
                 continue  # 继续循环
                 continue  # 继续循环
 
 
             # 无工具调用
             # 无工具调用

+ 73 - 13
agent/docs/architecture.md

@@ -1242,26 +1242,86 @@ async def get_experience(
 
 
 ## Context 压缩
 ## Context 压缩
 
 
-### 两级压缩策略
+### 压缩策略概述
 
 
-#### Level 1:GoalTree 过滤(确定性,零成本)
+Context 压缩分为两级,通过 `RunConfig` 中的 `goal_compression` 参数控制 Level 1 的行为:
 
 
-每轮 agent loop 构建 `llm_messages` 时自动执行:
-- 始终保留:system prompt、第一条 user message(含 GoalTree 精简视图)、当前 focus goal 的消息
-- 跳过 completed/abandoned goals 的消息(信息已在 GoalTree summary 中)
-- 通过 Message Tree 的 parent_sequence 实现跳过
+| 模式 | 值 | Level 1 行为 | Level 2 行为 |
+|------|-----|-------------|-------------|
+| 不压缩 | `"none"` | 跳过 Level 1 | 超限时直接进入 Level 2 |
+| 完成后压缩 | `"on_complete"` | 每个 goal 完成时立刻压缩该 goal 的消息 | 超限时进入 Level 2 |
+| 超长时压缩 | `"on_overflow"` | 超限时遍历所有 completed goal 逐个压缩 | Level 1 后仍超限则进入 Level 2 |
 
 
-大多数情况下 Level 1 足够。
+默认值:`"on_overflow"`
 
 
-#### Level 2:LLM 总结(仅在 Level 1 后仍超限时触发)
+```python
+RunConfig(
+    goal_compression="on_overflow",  # "none" | "on_complete" | "on_overflow"
+)
+```
+
+### Level 1:Goal 完成压缩(确定性,零 LLM 成本)
+
+对单个 completed goal 的压缩逻辑:
+
+1. **识别目标消息**:找到该 goal 关联的所有消息(`msg.goal_id == goal.id`)
+2. **区分 goal 工具消息和非 goal 消息**:检查 assistant 消息的 tool_calls 中是否调用了 `goal` 工具(实际场景中 goal 调用通常是单独一条 assistant 消息,不考虑混合情况)
+3. **保留 goal 工具消息**:保留所有调用 `goal(...)` 的 assistant 消息及其对应的 tool result(包括 add、focus、under、done 等操作)
+4. **删除非 goal 消息**:从 history 中移除该 goal 的其他 assistant 消息及其 tool result(read_file、bash、search 等中间工具调用)
+5. **替换 done 的 tool result**:将 `goal(done=...)` 的 tool result 内容替换为:"具体执行过程已清理"
+6. **纯内存操作**:压缩仅操作内存中的 history 列表,不涉及新增消息或持久化变更,原始消息永远保留在存储层
+
+压缩后的 history 片段示例:
+
+```
+... (前面的消息)
+[assistant] tool_calls: [goal(focus="1.1")]
+[tool] goal result: "## 更新\n- 焦点切换到: 1.1\n\n## Current Plan\n..."
+[assistant] tool_calls: [goal(done="1.1", summary="前端使用 React...")]
+[tool] goal result: "具体执行过程已清理"
+... (后面的消息)
+```
+
+#### `on_complete` 模式
+
+在 goal 工具执行 `done` 操作后,立刻对该 goal 执行压缩。优点是 history 始终保持精简,缺点是如果后续需要回溯到该 goal 的中间过程,信息已丢失(存储层仍保留原始消息)。
+
+**触发点**:`agent/trace/goal_tool.py` 中 done 操作完成后
+
+#### `on_overflow` 模式
+
+在 `_manage_context_usage` 检测到超限时,遍历所有 completed goal(按完成时间排序),逐个执行压缩,直到 token 数降到阈值以下或所有 completed goal 都已压缩。如果仍超限,进入 Level 2。
+
+**触发点**:`agent/core/runner.py:_manage_context_usage`
+
+**实现**:`agent/trace/compaction.py:compress_completed_goal`, `agent/trace/compaction.py:compress_all_completed_goals`
+
+### Level 2:LLM 总结(仅在 Level 1 后仍超限时触发)
+
+触发条件:Level 1 之后 token 数仍超过阈值(默认 `context_window × 0.5`)。
+
+通过侧分支队列机制执行,`force_side_branch` 为列表类型:
+
+1. **反思**(可选,由 `knowledge.enable_extraction` 控制):进入 `reflection` 侧分支,LLM 可多轮调用 knowledge_search、resource_save、knowledge_save 等工具提取经验
+2. **压缩**:进入 `compression` 侧分支,LLM 生成 summary
+
+侧分支队列示例:
+- 启用知识提取:`force_side_branch = ["reflection", "compression"]`
+- 仅压缩:`force_side_branch = ["compression"]`
+
+压缩完成后重建 history 为:`system prompt + 第一条 user message + summary(含详细 GoalTree)`
+
+**实现**:`agent/core/runner.py:_agent_loop`(侧分支状态机), `agent/core/runner.py:_rebuild_history_after_compression`
+
+### 任务完成后反思
 
 
-触发条件:Level 1 之后 token 数仍超过阈值(默认 `max_tokens × 0.8`)。
+主路径无工具调用(任务完成)时,如果 `knowledge.enable_completion_extraction` 为 True,通过侧分支机制进入反思:
 
 
-流程:
-1. **经验提取**:在消息列表末尾追加反思 prompt,进入侧分支 agent 模式(最多 5 轮),LLM 可调用工具(如 knowledge_search, knowledge_save)进行多轮推理。反思消息标记为 `branch_type="reflection"`,不在主路径上
-2. **压缩**:在消息列表末尾追加压缩 prompt(含 GoalTree 完整视图),进入侧分支 agent 模式(最多 5 轮),LLM 可调用工具(如 goal_status)辅助压缩。压缩消息标记为 `branch_type="compression"`,完成后创建 summary 消息,其 `parent_sequence` 跳过被压缩的范围
+1. 设置 `force_side_branch = ["reflection"]` 和 `break_after_side_branch = True`
+2. 反思侧分支完成后回到主路径
+3. 检测到 `break_after_side_branch` 标志,直接 break 退出循环
 
 
-**侧分支模式**:压缩和反思在同一 agent loop 中通过状态机实现,复用主路径的缓存和工具配置,支持多轮推理。
+**实现**:`agent/core/runner.py:_agent_loop`
 
 
 ### GoalTree 双视图
 ### GoalTree 双视图
 
 

+ 93 - 64
agent/trace/compaction.py

@@ -1,17 +1,19 @@
 """
 """
 Context 压缩 — 两级压缩策略
 Context 压缩 — 两级压缩策略
 
 
-Level 1: GoalTree 过滤(确定性,零成本)
-  - 跳过 completed/abandoned goals 的消息(信息已在 GoalTree summary 中)
-  - 始终保留:system prompt、第一条 user message、当前 focus goal 的消息
+Level 1: Goal 完成压缩(确定性,零 LLM 成本)
+  - 对 completed/abandoned goals:保留 goal 工具消息,移除非 goal 工具消息
+  - 三种模式:none / on_complete / on_overflow
 
 
 Level 2: LLM 总结(仅在 Level 1 后仍超限时触发)
 Level 2: LLM 总结(仅在 Level 1 后仍超限时触发)
-  - 在消息列表末尾追加压缩 prompt → 主模型回复 → summary 存为新消息
-  - summary 的 parent_sequence 跳过被压缩的范围
+  - 通过侧分支多轮 agent 模式压缩
+  - 压缩后重建 history 为:system prompt + 第一条 user message + summary
 
 
-压缩不修改存储:原始消息永远保留在 messages/,通过 parent_sequence 树结构实现跳过
+压缩不修改存储:原始消息永远保留在 messages/,纯内存操作
 """
 """
 
 
+import copy
+import json
 import logging
 import logging
 from dataclasses import dataclass
 from dataclasses import dataclass
 from typing import List, Dict, Any, Optional, Set
 from typing import List, Dict, Any, Optional, Set
@@ -19,7 +21,6 @@ from typing import List, Dict, Any, Optional, Set
 from .goal_models import GoalTree
 from .goal_models import GoalTree
 from .models import Message
 from .models import Message
 from agent.core.prompts import (
 from agent.core.prompts import (
-    COMPRESSION_EVAL_PROMPT_TEMPLATE,
     REFLECT_PROMPT,
     REFLECT_PROMPT,
     build_compression_eval_prompt,
     build_compression_eval_prompt,
 )
 )
@@ -99,84 +100,112 @@ class CompressionConfig:
         return int(window * self.threshold_ratio)
         return int(window * self.threshold_ratio)
 
 
 
 
-# ===== Level 1: GoalTree 过滤 =====
+# ===== Level 1: Goal 完成压缩 =====
 
 
-def filter_by_goal_status(
+def compress_completed_goals(
     messages: List[Message],
     messages: List[Message],
     goal_tree: Optional[GoalTree],
     goal_tree: Optional[GoalTree],
 ) -> List[Message]:
 ) -> List[Message]:
     """
     """
-    Level 1 过滤:跳过 completed/abandoned goals 的消息
+    Level 1 压缩:移除 completed/abandoned goals 的非 goal 工具消息
 
 
-    始终保留:
-    - goal_id 为 None 的消息(system prompt、初始 user message)
-    - 当前 focus goal 及其祖先链上的消息
-    - in_progress 和 pending goals 的消息
+    对每个 completed/abandoned goal:
+    - 保留:所有调用 goal 工具的 assistant 消息及其 tool result
+    - 移除:所有非 goal 工具的 assistant 消息及其 tool result
+    - 替换:goal(done=...) 的 tool result 内容为 "具体执行过程已清理"
+    - goal_id 为 None 的消息始终保留(system prompt、初始 user message)
+    - pending / in_progress goals 的消息不受影响
 
 
-    跳过:
-    - completed 且不在焦点路径上的 goals 的消息
-    - abandoned goals 的消息
+    纯内存操作,不修改原始 Message 对象,不涉及持久化。
 
 
     Args:
     Args:
-        messages: 主路径上的有序消息列表
+        messages: 主路径上的有序消息列表(Message 对象)
         goal_tree: GoalTree 实例
         goal_tree: GoalTree 实例
 
 
     Returns:
     Returns:
-        过滤后的消息列表
+        压缩后的消息列表
     """
     """
     if not goal_tree or not goal_tree.goals:
     if not goal_tree or not goal_tree.goals:
         return messages
         return messages
 
 
-    # 构建焦点路径(当前焦点 + 父链 + 直接子节点)
-    focus_path = _get_focus_path(goal_tree)
+    # 收集 completed/abandoned goal IDs
+    completed_ids: Set[str] = {
+        g.id for g in goal_tree.goals
+        if g.status in ("completed", "abandoned")
+    }
+    if not completed_ids:
+        return messages
 
 
-    # 构建需要跳过的 goal IDs
-    skip_goal_ids: Set[str] = set()
-    for goal in goal_tree.goals:
-        if goal.id in focus_path:
-            continue  # 焦点路径上的 goal 始终保留
-        if goal.status in ("completed", "abandoned"):
-            skip_goal_ids.add(goal.id)
+    # Pass 1: 扫描 assistant 消息,分类 tool_call_ids
+    remove_seqs: Set[int] = set()       # 要移除的 assistant 消息 sequence
+    remove_tc_ids: Set[str] = set()     # 要移除的 tool result 的 tool_call_id
+    done_tc_ids: Set[str] = set()       # goal(done=...) 的 tool_call_id(替换 tool result)
 
 
-    # 过滤消息
-    result = []
     for msg in messages:
     for msg in messages:
-        if msg.goal_id is None:
-            result.append(msg)  # 无 goal 的消息始终保留
-        elif msg.goal_id not in skip_goal_ids:
-            result.append(msg)  # 不在跳过列表中的消息保留
-
-    return result
-
-
-def _get_focus_path(goal_tree: GoalTree) -> Set[str]:
-    """
-    获取焦点路径上需要保留消息的 goal IDs
-
-    保留:焦点自身 + 父链 + 未完成的直接子节点
-    不保留:已完成/已放弃的直接子节点(信息已在 goal.summary 中)
-    """
-    focus_ids: Set[str] = set()
-
-    if not goal_tree.current_id:
-        return focus_ids
-
-    # 焦点自身
-    focus_ids.add(goal_tree.current_id)
-
-    # 父链
-    goal = goal_tree.find(goal_tree.current_id)
-    while goal and goal.parent_id:
-        focus_ids.add(goal.parent_id)
-        goal = goal_tree.find(goal.parent_id)
+        if msg.goal_id not in completed_ids:
+            continue
+        if msg.role != "assistant":
+            continue
+
+        content = msg.content
+        tc_list = []
+        if isinstance(content, dict):
+            tc_list = content.get("tool_calls", [])
+
+        if not tc_list:
+            # 纯文本 assistant 消息(无工具调用),移除
+            remove_seqs.add(msg.sequence)
+            continue
+
+        # 检查是否包含 goal 工具调用
+        has_goal_call = False
+        for tc in tc_list:
+            func_name = tc.get("function", {}).get("name", "")
+            if func_name == "goal":
+                has_goal_call = True
+                # 检查是否为 done 调用
+                args_str = tc.get("function", {}).get("arguments", "{}")
+                try:
+                    args = json.loads(args_str) if isinstance(args_str, str) else (args_str or {})
+                except json.JSONDecodeError:
+                    args = {}
+                if args.get("done") is not None:
+                    tc_id = tc.get("id")
+                    if tc_id:
+                        done_tc_ids.add(tc_id)
+
+        if not has_goal_call:
+            # 不含 goal 工具调用 → 移除整条 assistant 及其所有 tool results
+            remove_seqs.add(msg.sequence)
+            for tc in tc_list:
+                tc_id = tc.get("id")
+                if tc_id:
+                    remove_tc_ids.add(tc_id)
+
+    # 无需压缩
+    if not remove_seqs and not done_tc_ids:
+        return messages
 
 
-    # 直接子节点:仅保留未完成的(completed/abandoned 的信息已在 summary 中)
-    children = goal_tree.get_children(goal_tree.current_id)
-    for child in children:
-        if child.status not in ("completed", "abandoned"):
-            focus_ids.add(child.id)
+    # Pass 2: 构建结果
+    result: List[Message] = []
+    for msg in messages:
+        # 跳过标记移除的 assistant 消息
+        if msg.sequence in remove_seqs:
+            continue
+        # 跳过标记移除的 tool result
+        if msg.role == "tool" and msg.tool_call_id in remove_tc_ids:
+            continue
+
+        # 替换 done 的 tool result 内容
+        if msg.role == "tool" and msg.tool_call_id in done_tc_ids:
+            modified = copy.copy(msg)
+            modified.content = {"tool_name": "goal", "result": "具体执行过程已清理"}
+            result.append(modified)
+            continue
+
+        result.append(msg)
 
 
-    return focus_ids
+    return result
 
 
 
 
 # ===== Token 估算 =====
 # ===== Token 估算 =====

+ 0 - 16
examples/research/knowledge/README.md

@@ -1,16 +0,0 @@
-# 新产品营销推广调研知识库
-
-本目录收集整理新产品面世时的营销推广策略、方法论和最佳实践。
-
-## 调研维度
-
-1. 权威资源和行业专家观点
-2. 成功案例分析
-3. 营销框架和方法论
-4. 不同渠道的推广策略
-
-## 更新日期
-2024年
-
----
-调研进行中...