Quellcode durchsuchen

refactor: compact & reflect in API & CLI

Talegorithm vor 7 Stunden
Ursprung
Commit
7feeb57fcc
5 geänderte Dateien mit 182 neuen und 197 gelöschten Zeilen
  1. 44 60
      agent/cli/interactive.py
  2. 69 62
      agent/core/runner.py
  3. 5 1
      agent/docs/architecture.md
  4. 24 4
      agent/docs/decisions.md
  5. 40 70
      agent/trace/run_api.py

+ 44 - 60
agent/cli/interactive.py

@@ -209,82 +209,66 @@ class InteractiveController:
         """
         """
         执行经验总结
         执行经验总结
 
 
+        通过调用 API 端点触发反思侧分支。
+
         Args:
         Args:
             trace_id: Trace ID
             trace_id: Trace ID
             focus: 反思重点(可选)
             focus: 反思重点(可选)
         """
         """
-        from agent.core.prompts.knowledge import build_reflect_prompt
-        from agent.core.runner import RunConfig
-
-        trace = await self.store.get_trace(trace_id)
-        if not trace:
-            print("未找到 Trace")
-            return
-
-        saved_head = trace.head_sequence
+        import httpx
 
 
-        # 构建反思 prompt
-        prompt = build_reflect_prompt()
-        if focus:
-            prompt += f"\n\n请特别关注:{focus}"
-
-        print("正在生成反思...")
-        reflect_cfg = RunConfig(trace_id=trace_id, max_iterations=10, tools=[])
+        print("正在启动反思任务...")
 
 
         try:
         try:
-            result = await self.runner.run_result(
-                messages=[{"role": "user", "content": prompt}],
-                config=reflect_cfg,
-            )
-            reflection_text = result.get("summary", "")
-
-            if reflection_text:
-                print("\n--- 反思内容 ---")
-                print(reflection_text)
-                print("--- 结束 ---\n")
-            else:
-                print("未生成反思内容")
-
-        finally:
-            # 恢复 head_sequence(反思消息成为侧枝,不污染主对话)
-            await self.store.update_trace(trace_id, head_sequence=saved_head)
+            # 调用 reflect API 端点
+            async with httpx.AsyncClient() as client:
+                payload = {}
+                if focus:
+                    payload["focus"] = focus
+
+                response = await client.post(
+                    f"http://localhost:8000/api/traces/{trace_id}/reflect",
+                    json=payload,
+                    timeout=10.0
+                )
+                response.raise_for_status()
+                result = response.json()
+
+            print(f"✅ 反思任务已启动: {result.get('message', '')}")
+            print("提示:可通过 WebSocket 监听实时进度")
+
+        except httpx.HTTPError as e:
+            print(f"❌ 反思任务启动失败: {e}")
+        except Exception as e:
+            print(f"❌ 发生错误: {e}")
 
 
     async def manual_compact(self, trace_id: str):
     async def manual_compact(self, trace_id: str):
         """
         """
         手动压缩上下文
         手动压缩上下文
 
 
+        通过调用 API 端点触发压缩侧分支。
+
         Args:
         Args:
             trace_id: Trace ID
             trace_id: Trace ID
         """
         """
-        from agent.core.runner import RunConfig
+        import httpx
 
 
-        print("\n正在执行上下文压缩(compact)...")
+        print("\n正在启动上下文压缩任务...")
 
 
         try:
         try:
-            goal_tree = await self.store.get_goal_tree(trace_id)
-            trace = await self.store.get_trace(trace_id)
-
-            if not trace:
-                print("未找到 Trace,无法压缩")
-                return
-
-            # 重建当前 history
-            main_path = await self.store.get_main_path_messages(trace_id, trace.head_sequence)
-            history = [msg.to_llm_dict() for msg in main_path]
-            head_seq = main_path[-1].sequence if main_path else 0
-            next_seq = head_seq + 1
-
-            compact_config = RunConfig(trace_id=trace_id)
-            new_history, new_head, new_seq = await self.runner._compress_history(
-                trace_id=trace_id,
-                history=history,
-                goal_tree=goal_tree,
-                config=compact_config,
-                sequence=next_seq,
-                head_seq=head_seq,
-            )
-
-            print(f"\n✅ 压缩完成: {len(history)} 条消息 → {len(new_history)} 条")
-
+            # 调用 compact API 端点
+            async with httpx.AsyncClient() as client:
+                response = await client.post(
+                    f"http://localhost:8000/api/traces/{trace_id}/compact",
+                    timeout=10.0
+                )
+                response.raise_for_status()
+                result = response.json()
+
+            print(f"✅ 压缩任务已启动: {result.get('message', '')}")
+            print("提示:可通过 WebSocket 监听实时进度")
+
+        except httpx.HTTPError as e:
+            print(f"❌ 压缩任务启动失败: {e}")
         except Exception as e:
         except Exception as e:
-            print(f"\n❌ 压缩失败: {e}")
+            print(f"❌ 发生错误: {e}")

+ 69 - 62
agent/core/runner.py

@@ -73,9 +73,8 @@ class SideBranchContext:
     start_head_seq: int          # 侧分支起点的 head_seq
     start_head_seq: int          # 侧分支起点的 head_seq
     start_sequence: int          # 侧分支第一条消息的 sequence
     start_sequence: int          # 侧分支第一条消息的 sequence
     start_history_length: int    # 侧分支起点的 history 长度
     start_history_length: int    # 侧分支起点的 history 长度
-    side_messages: List[Message] # 侧分支产生的消息
+    start_iteration: int         # 侧分支开始时的 iteration
     max_turns: int = 5           # 最大轮次
     max_turns: int = 5           # 最大轮次
-    current_turn: int = 0        # 当前轮次
 
 
     def to_dict(self) -> Dict[str, Any]:
     def to_dict(self) -> Dict[str, Any]:
         """转换为字典(用于持久化和传递给工具)"""
         """转换为字典(用于持久化和传递给工具)"""
@@ -84,8 +83,8 @@ class SideBranchContext:
             "branch_id": self.branch_id,
             "branch_id": self.branch_id,
             "start_head_seq": self.start_head_seq,
             "start_head_seq": self.start_head_seq,
             "start_sequence": self.start_sequence,
             "start_sequence": self.start_sequence,
+            "start_iteration": self.start_iteration,
             "max_turns": self.max_turns,
             "max_turns": self.max_turns,
-            "current_turn": self.current_turn,
             "is_side_branch": True,
             "is_side_branch": True,
             "started_at": datetime.now().isoformat(),
             "started_at": datetime.now().isoformat(),
         }
         }
@@ -107,6 +106,9 @@ class RunConfig:
     tools: Optional[List[str]] = None          # None = 全部已注册工具
     tools: Optional[List[str]] = None          # None = 全部已注册工具
     side_branch_max_turns: int = 5             # 侧分支最大轮次(压缩/反思)
     side_branch_max_turns: int = 5             # 侧分支最大轮次(压缩/反思)
 
 
+    # --- 强制侧分支(用于 API 手动触发)---
+    force_side_branch: Optional[Literal["compression", "reflection"]] = None
+
     # --- 框架层参数 ---
     # --- 框架层参数 ---
     agent_type: str = "default"
     agent_type: str = "default"
     uid: Optional[str] = None
     uid: Optional[str] = None
@@ -310,27 +312,17 @@ class AgentRunner:
             side_branch_ctx_for_build: Optional[SideBranchContext] = None
             side_branch_ctx_for_build: Optional[SideBranchContext] = None
             if trace.context.get("active_side_branch") and messages:
             if trace.context.get("active_side_branch") and messages:
                 side_branch_data = trace.context["active_side_branch"]
                 side_branch_data = trace.context["active_side_branch"]
-                branch_id = side_branch_data["branch_id"]
 
 
-                # 从数据库查询侧分支消息
-                if self.trace_store:
-                    all_messages = await self.trace_store.get_trace_messages(trace.trace_id)
-                    side_messages = [
-                        m for m in all_messages
-                        if m.branch_id == branch_id
-                    ]
-
-                    # 创建侧分支上下文(用于标记用户追加的消息)
-                    side_branch_ctx_for_build = SideBranchContext(
-                        type=side_branch_data["type"],
-                        branch_id=branch_id,
-                        start_head_seq=side_branch_data["start_head_seq"],
-                        start_sequence=side_branch_data["start_sequence"],
-                        start_history_length=0,
-                        side_messages=side_messages,
-                        max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
-                        current_turn=side_branch_data.get("current_turn", 0),
-                    )
+                # 创建侧分支上下文(用于标记用户追加的消息)
+                side_branch_ctx_for_build = SideBranchContext(
+                    type=side_branch_data["type"],
+                    branch_id=side_branch_data["branch_id"],
+                    start_head_seq=side_branch_data["start_head_seq"],
+                    start_sequence=side_branch_data["start_sequence"],
+                    start_history_length=0,
+                    start_iteration=side_branch_data.get("start_iteration", 0),
+                    max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
+                )
 
 
             # Phase 2: BUILD HISTORY
             # Phase 2: BUILD HISTORY
             history, sequence, created_messages, head_seq = await self._build_history(
             history, sequence, created_messages, head_seq = await self._build_history(
@@ -698,7 +690,6 @@ class AgentRunner:
                         branch_id=side_branch_ctx.branch_id,
                         branch_id=side_branch_ctx.branch_id,
                         content=msg_dict.get("content"),
                         content=msg_dict.get("content"),
                     )
                     )
-                    side_branch_ctx.side_messages.append(stored_msg)
                     logger.info(f"用户在侧分支 {side_branch_ctx.type} 中追加消息")
                     logger.info(f"用户在侧分支 {side_branch_ctx.type} 中追加消息")
                 else:
                 else:
                     stored_msg = Message.from_llm_dict(
                     stored_msg = Message.from_llm_dict(
@@ -949,14 +940,13 @@ class AgentRunner:
                     start_head_seq=side_branch_data["start_head_seq"],
                     start_head_seq=side_branch_data["start_head_seq"],
                     start_sequence=side_branch_data["start_sequence"],
                     start_sequence=side_branch_data["start_sequence"],
                     start_history_length=0,  # 稍后重新计算
                     start_history_length=0,  # 稍后重新计算
-                    side_messages=side_messages,
+                    start_iteration=side_branch_data.get("start_iteration", 0),
                     max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
                     max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
-                    current_turn=side_branch_data.get("current_turn", 0),
                 )
                 )
 
 
                 logger.info(
                 logger.info(
                     f"恢复未完成的侧分支: {side_branch_ctx.type}, "
                     f"恢复未完成的侧分支: {side_branch_ctx.type}, "
-                    f"已执行 {side_branch_ctx.current_turn}/{side_branch_ctx.max_turns} 轮"
+                    f"max_turns={side_branch_ctx.max_turns}"
                 )
                 )
 
 
                 # 将侧分支消息追加到 history
                 # 将侧分支消息追加到 history
@@ -999,14 +989,29 @@ class AgentRunner:
             # Context 管理(仅主路径)
             # Context 管理(仅主路径)
             needs_enter_side_branch = False
             needs_enter_side_branch = False
             if not side_branch_ctx:
             if not side_branch_ctx:
-                history, head_seq, sequence, needs_enter_side_branch = await self._manage_context_usage(
-                    trace_id, history, goal_tree, config, sequence, head_seq
-                )
+                # 检查是否强制进入侧分支(API 手动触发)
+                if config.force_side_branch:
+                    needs_enter_side_branch = True
+                    logger.info(f"强制进入侧分支: {config.force_side_branch}")
+                else:
+                    # 正常的 context 管理逻辑
+                    history, head_seq, sequence, needs_enter_side_branch = await self._manage_context_usage(
+                        trace_id, history, goal_tree, config, sequence, head_seq
+                    )
 
 
             # 进入侧分支
             # 进入侧分支
             if needs_enter_side_branch and not side_branch_ctx:
             if needs_enter_side_branch and not side_branch_ctx:
-                # 判断侧分支类型:反思 or 压缩
-                branch_type = "reflection" if config.knowledge.enable_extraction else "compression"
+                # 判断侧分支类型
+                if config.force_side_branch:
+                    # API 强制触发
+                    branch_type = config.force_side_branch
+                elif config.knowledge.enable_extraction:
+                    # 自动触发:反思
+                    branch_type = "reflection"
+                else:
+                    # 自动触发:压缩
+                    branch_type = "compression"
+
                 branch_id = f"{branch_type}_{uuid.uuid4().hex[:8]}"
                 branch_id = f"{branch_type}_{uuid.uuid4().hex[:8]}"
 
 
                 side_branch_ctx = SideBranchContext(
                 side_branch_ctx = SideBranchContext(
@@ -1015,9 +1020,8 @@ class AgentRunner:
                     start_head_seq=head_seq,
                     start_head_seq=head_seq,
                     start_sequence=sequence,
                     start_sequence=sequence,
                     start_history_length=len(history),
                     start_history_length=len(history),
-                    side_messages=[],
+                    start_iteration=iteration,
                     max_turns=config.side_branch_max_turns,
                     max_turns=config.side_branch_max_turns,
-                    current_turn=0,
                 )
                 )
 
 
                 # 持久化侧分支状态
                 # 持久化侧分支状态
@@ -1027,8 +1031,8 @@ class AgentRunner:
                         "branch_id": side_branch_ctx.branch_id,
                         "branch_id": side_branch_ctx.branch_id,
                         "start_head_seq": side_branch_ctx.start_head_seq,
                         "start_head_seq": side_branch_ctx.start_head_seq,
                         "start_sequence": side_branch_ctx.start_sequence,
                         "start_sequence": side_branch_ctx.start_sequence,
+                        "start_iteration": side_branch_ctx.start_iteration,
                         "max_turns": side_branch_ctx.max_turns,
                         "max_turns": side_branch_ctx.max_turns,
-                        "current_turn": 0,
                         "started_at": datetime.now().isoformat(),
                         "started_at": datetime.now().isoformat(),
                     }
                     }
                     await self.trace_store.update_trace(
                     await self.trace_store.update_trace(
@@ -1058,7 +1062,6 @@ class AgentRunner:
                     await self.trace_store.add_message(branch_user_msg)
                     await self.trace_store.add_message(branch_user_msg)
 
 
                 history.append(branch_user_msg.to_llm_dict())
                 history.append(branch_user_msg.to_llm_dict())
-                side_branch_ctx.side_messages.append(branch_user_msg)
                 head_seq = sequence
                 head_seq = sequence
                 sequence += 1
                 sequence += 1
 
 
@@ -1174,9 +1177,7 @@ class AgentRunner:
                     cache_read_tokens=cache_read_tokens or 0,
                     cache_read_tokens=cache_read_tokens or 0,
                 )
                 )
 
 
-            # 如果在侧分支,记录到 side_messages
-            if side_branch_ctx:
-                side_branch_ctx.side_messages.append(assistant_msg)
+            # 如果在侧分支,记录到 assistant_msg(已持久化,不需要额外维护)
 
 
             yield assistant_msg
             yield assistant_msg
             head_seq = sequence
             head_seq = sequence
@@ -1184,18 +1185,11 @@ class AgentRunner:
 
 
             # 检查侧分支是否应该退出
             # 检查侧分支是否应该退出
             if side_branch_ctx:
             if side_branch_ctx:
-                side_branch_ctx.current_turn += 1
-
-                # 更新持久化状态
-                if self.trace_store:
-                    trace.context["active_side_branch"]["current_turn"] = side_branch_ctx.current_turn
-                    await self.trace_store.update_trace(
-                        trace_id,
-                        context=trace.context
-                    )
+                # 计算侧分支已执行的轮次
+                turns_in_branch = iteration - side_branch_ctx.start_iteration
 
 
                 # 检查是否达到最大轮次
                 # 检查是否达到最大轮次
-                if side_branch_ctx.current_turn >= side_branch_ctx.max_turns:
+                if turns_in_branch >= side_branch_ctx.max_turns:
                     logger.warning(
                     logger.warning(
                         f"侧分支 {side_branch_ctx.type} 达到最大轮次 "
                         f"侧分支 {side_branch_ctx.type} 达到最大轮次 "
                         f"{side_branch_ctx.max_turns},强制退出"
                         f"{side_branch_ctx.max_turns},强制退出"
@@ -1225,6 +1219,9 @@ class AgentRunner:
                             side_branch_ctx.start_head_seq
                             side_branch_ctx.start_head_seq
                         )
                         )
 
 
+                        # 清除强制侧分支配置
+                        config.force_side_branch = None
+
                         side_branch_ctx = None
                         side_branch_ctx = None
                         continue
                         continue
 
 
@@ -1247,6 +1244,9 @@ class AgentRunner:
                             history = [m.to_llm_dict() for m in main_path_messages]
                             history = [m.to_llm_dict() for m in main_path_messages]
                             head_seq = side_branch_ctx.start_head_seq
                             head_seq = side_branch_ctx.start_head_seq
 
 
+                        # 清除强制侧分支配置
+                        config.force_side_branch = None
+
                         side_branch_ctx = None
                         side_branch_ctx = None
                         continue
                         continue
 
 
@@ -1256,16 +1256,23 @@ class AgentRunner:
 
 
                     # 提取结果
                     # 提取结果
                     if side_branch_ctx.type == "compression":
                     if side_branch_ctx.type == "compression":
-                        # 从侧分支消息中提取 summary
+                        # 从数据库查询侧分支消息并提取 summary
                         summary_text = ""
                         summary_text = ""
-                        for msg in side_branch_ctx.side_messages:
-                            if msg.role == "assistant" and isinstance(msg.content, dict):
-                                text = msg.content.get("text", "")
-                                if "[[SUMMARY]]" in text:
-                                    summary_text = text[text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
-                                    break
-                                elif text:
-                                    summary_text = text
+                        if self.trace_store:
+                            all_messages = await self.trace_store.get_trace_messages(trace_id)
+                            side_messages = [
+                                m for m in all_messages
+                                if m.branch_id == side_branch_ctx.branch_id
+                            ]
+
+                            for msg in side_messages:
+                                if msg.role == "assistant" and isinstance(msg.content, dict):
+                                    text = msg.content.get("text", "")
+                                    if "[[SUMMARY]]" in text:
+                                        summary_text = text[text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
+                                        break
+                                    elif text:
+                                        summary_text = text
 
 
                         if not summary_text:
                         if not summary_text:
                             logger.warning("侧分支未生成有效 summary,使用默认")
                             logger.warning("侧分支未生成有效 summary,使用默认")
@@ -1317,6 +1324,9 @@ class AgentRunner:
                             head_sequence=head_seq,
                             head_sequence=head_seq,
                         )
                         )
 
 
+                    # 清除强制侧分支配置(避免影响后续续跑)
+                    config.force_side_branch = None
+
                     side_branch_ctx = None
                     side_branch_ctx = None
                     continue
                     continue
 
 
@@ -1382,7 +1392,6 @@ class AgentRunner:
                                 "type": side_branch_ctx.type,
                                 "type": side_branch_ctx.type,
                                 "branch_id": side_branch_ctx.branch_id,
                                 "branch_id": side_branch_ctx.branch_id,
                                 "is_side_branch": True,
                                 "is_side_branch": True,
-                                "current_turn": side_branch_ctx.current_turn,
                                 "max_turns": side_branch_ctx.max_turns,
                                 "max_turns": side_branch_ctx.max_turns,
                             } if side_branch_ctx else None,
                             } if side_branch_ctx else None,
                         },
                         },
@@ -1469,9 +1478,7 @@ class AgentRunner:
                                     print(f"[Runner] 截图已保存: {png_path.name}")
                                     print(f"[Runner] 截图已保存: {png_path.name}")
                                     break  # 只存第一张
                                     break  # 只存第一张
 
 
-                    # 如果在侧分支,记录到 side_messages
-                    if side_branch_ctx:
-                        side_branch_ctx.side_messages.append(tool_msg)
+                    # 如果在侧分支,tool_msg 已持久化(不需要额外维护)
 
 
                     yield tool_msg
                     yield tool_msg
                     head_seq = sequence
                     head_seq = sequence

+ 5 - 1
agent/docs/architecture.md

@@ -333,6 +333,7 @@ agent 工具的合成结果对齐正常返回值格式(含 `sub_trace_id` 字
 | POST | `/api/traces/{id}/run` | 运行(统一续跑 + 回溯) |
 | POST | `/api/traces/{id}/run` | 运行(统一续跑 + 回溯) |
 | POST | `/api/traces/{id}/stop` | 停止运行中的 Trace |
 | POST | `/api/traces/{id}/stop` | 停止运行中的 Trace |
 | POST | `/api/traces/{id}/reflect` | 触发反思,从执行历史中提取经验 |
 | POST | `/api/traces/{id}/reflect` | 触发反思,从执行历史中提取经验 |
+| POST | `/api/traces/{id}/compact` | 触发压缩,通过侧分支多轮 agent 模式压缩上下文 |
 
 
 ```bash
 ```bash
 # 新建
 # 新建
@@ -355,9 +356,12 @@ curl -X POST http://localhost:8000/api/traces/{trace_id}/run \
 # 停止
 # 停止
 curl -X POST http://localhost:8000/api/traces/{trace_id}/stop
 curl -X POST http://localhost:8000/api/traces/{trace_id}/stop
 
 
-# 反思:追加反思 prompt 运行,结果追加到 experiences 文件
+# 反思:通过侧分支多轮 agent 模式提取经验
 curl -X POST http://localhost:8000/api/traces/{trace_id}/reflect \
 curl -X POST http://localhost:8000/api/traces/{trace_id}/reflect \
   -d '{"focus": "为什么第三步选择了错误的方案"}'
   -d '{"focus": "为什么第三步选择了错误的方案"}'
+
+# 压缩:通过侧分支多轮 agent 模式压缩上下文
+curl -X POST http://localhost:8000/api/traces/{trace_id}/compact
 ```
 ```
 
 
 响应立即返回 `{"trace_id": "...", "status": "started"}`,通过 `WS /api/traces/{trace_id}/watch` 监听实时事件。
 响应立即返回 `{"trace_id": "...", "status": "started"}`,通过 `WS /api/traces/{trace_id}/watch` 监听实时事件。

+ 24 - 4
agent/docs/decisions.md

@@ -1194,7 +1194,11 @@ for iteration in range(max_iterations):
 
 
     # 退出侧分支:提取结果,回到起点
     # 退出侧分支:提取结果,回到起点
     if side_branch_ctx and not tool_calls:
     if side_branch_ctx and not tool_calls:
-        summary = extract_summary(side_branch_ctx.side_messages)
+        # 从数据库查询侧分支消息并提取 summary
+        all_messages = await trace_store.get_trace_messages(trace_id)
+        side_messages = [m for m in all_messages if m.branch_id == side_branch_ctx.branch_id]
+        summary = extract_summary(side_messages)
+
         history = history[:side_branch_ctx.start_history_length]
         history = history[:side_branch_ctx.start_history_length]
         # 创建主路径 summary 消息
         # 创建主路径 summary 消息
         side_branch_ctx = None
         side_branch_ctx = None
@@ -1216,11 +1220,14 @@ class SideBranchContext:
     start_head_seq: int  # 起点的 head_seq
     start_head_seq: int  # 起点的 head_seq
     start_sequence: int  # 起点的 sequence
     start_sequence: int  # 起点的 sequence
     start_history_length: int  # 起点的 history 长度
     start_history_length: int  # 起点的 history 长度
-    side_messages: List[Message]  # 侧分支产生的消息
-    max_turns: int = 5  # 侧分支最大轮次
-    current_turn: int = 0  # 当前轮次
+    start_iteration: int  # 侧分支开始时的 iteration
+    max_turns: int = 5  # 最大轮次
 ```
 ```
 
 
+**设计说明**:
+1. **不维护 `side_messages` 列表**:所有侧分支消息已持久化到数据库(标记 `branch_id`),需要时通过查询获取,避免内存中的重复维护
+2. **复用主循环的 `iteration`**:不单独维护 `current_turn`,而是通过 `iteration - start_iteration` 计算侧分支已执行的轮次,简化计数逻辑
+
 #### 24c. 消息标记
 #### 24c. 消息标记
 
 
 侧分支产生的消息通过 `branch_type` 和 `branch_id` 字段标记:
 侧分支产生的消息通过 `branch_type` 和 `branch_id` 字段标记:
@@ -1295,6 +1302,19 @@ context = {
 
 
 新增字段:
 新增字段:
 - `side_branch_max_turns: int = 5` — 侧分支最大轮次,超过后强制退出
 - `side_branch_max_turns: int = 5` — 侧分支最大轮次,超过后强制退出
+- `force_side_branch: Optional[Literal["compression", "reflection"]] = None` — 强制进入侧分支(用于 API 手动触发压缩/反思)
+
+**force_side_branch 说明**:
+- 用于 API 接口手动触发压缩或反思(如 `/api/traces/{id}/compact`、`/api/traces/{id}/reflect`)
+- 设置后,agent loop 会在第一轮就进入指定类型的侧分支,而不是等待 context 超限
+- 侧分支完成后自动清除此配置(`config.force_side_branch = None`),避免影响后续续跑
+
+**API 触发实现**:
+- `/api/traces/{id}/reflect` — 设置 `RunConfig(force_side_branch="reflection")`,启动后台任务
+- `/api/traces/{id}/compact` — 设置 `RunConfig(force_side_branch="compression")`,启动后台任务
+- `agent/cli/interactive.py:manual_compact()` — 同样使用 `force_side_branch="compression"`,消费 `run()` 生成器
+
+**实现位置**:`agent/trace/run_api.py:reflect_trace`, `agent/trace/run_api.py:compact_trace`, `agent/cli/interactive.py:manual_compact`
 
 
 ### 变更范围
 ### 变更范围
 
 

+ 40 - 70
agent/trace/run_api.py

@@ -463,14 +463,11 @@ async def reflect_trace(trace_id: str, req: ReflectRequest):
     """
     """
     触发反思
     触发反思
 
 
-    在 trace 末尾追加一条包含反思 prompt 的 user message,单轮无工具 LLM 调用获取反思结果,
-    将结果追加到 experiences 文件(默认 ./.cache/experiences.md)。
-
-    反思消息作为侧枝(side branch):运行前保存 head_sequence,运行后恢复(try/finally 保证)。
-    使用 max_iterations=1, tools=[] 确保反思不会产生副作用。
+    通过 force_side_branch="reflection" 触发侧分支多轮 agent 模式,
+    LLM 可以调用工具(如 knowledge_search, knowledge_save)进行多轮推理。
+    反思消息标记为侧分支(branch_type="reflection"),不在主路径上。
     """
     """
     from agent.core.runner import RunConfig
     from agent.core.runner import RunConfig
-    from agent.trace.compaction import build_reflect_prompt
 
 
     runner = _get_runner()
     runner = _get_runner()
 
 
@@ -486,28 +483,27 @@ async def reflect_trace(trace_id: str, req: ReflectRequest):
     if trace_id in _running_tasks and not _running_tasks[trace_id].done():
     if trace_id in _running_tasks and not _running_tasks[trace_id].done():
         raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.")
         raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.")
 
 
-    # 1. 构建完整历史消息(沿 parent chain)
-    history, _, _ = await runner._build_history(
+    # 使用 force_side_branch 触发反思侧分支
+    config = RunConfig(
         trace_id=trace_id,
         trace_id=trace_id,
-        new_messages=[],
-        goal_tree=None,
-        config=RunConfig(trace_id=trace_id),
-        sequence=trace.last_sequence
+        model=trace.model or "gpt-4o",
+        force_side_branch="reflection",
+        max_iterations=20,  # 给侧分支足够的轮次
+        enable_prompt_caching=True,
     )
     )
 
 
-    # 2. 直接调用自闭环反思生成(绕过 runner.run_result 及其副作用)
-    from agent.tools.builtin.knowledge import generate_and_save_reflection
-    reflection_raw_text = await generate_and_save_reflection(
-        trace_id=trace_id,
-        messages=history,
-        llm_call_fn=runner.llm_call,
-        model=runner.model or "anthropic/claude-3-5-sonnet",
-        focus=req.focus
-    )
+    # 如果有 focus,可以通过追加消息传递(可选)
+    messages = []
+    if req.focus:
+        messages = [{"role": "user", "content": f"反思重点:{req.focus}"}]
+
+    # 启动反思任务(后台执行)
+    task = asyncio.create_task(_run_trace_background(runner, messages, config))
+    _running_tasks[trace_id] = task
 
 
     return ReflectResponse(
     return ReflectResponse(
         trace_id=trace_id,
         trace_id=trace_id,
-        reflection=reflection_raw_text,
+        reflection="反思任务已启动,通过 WebSocket 监听实时更新",
     )
     )
 
 
 
 
@@ -515,8 +511,10 @@ async def reflect_trace(trace_id: str, req: ReflectRequest):
 async def compact_trace(trace_id: str):
 async def compact_trace(trace_id: str):
     """
     """
     压缩 Trace 的上下文 (Compact)
     压缩 Trace 的上下文 (Compact)
-    
-    调用 runner 的 _compress_history 方法进行 LLM 摘要总结并生成压缩点。
+
+    通过 force_side_branch="compression" 触发侧分支多轮 agent 模式,
+    LLM 可以调用工具(如 goal)进行多轮推理。
+    压缩消息标记为侧分支(branch_type="compression"),不在主路径上。
     """
     """
     from agent.core.runner import RunConfig
     from agent.core.runner import RunConfig
 
 
@@ -524,62 +522,34 @@ async def compact_trace(trace_id: str):
     if not runner.trace_store:
     if not runner.trace_store:
         raise HTTPException(status_code=503, detail="TraceStore not configured")
         raise HTTPException(status_code=503, detail="TraceStore not configured")
 
 
-    # 验证 trace 存在且未在运行
+    # 验证 trace 存在
     trace = await runner.trace_store.get_trace(trace_id)
     trace = await runner.trace_store.get_trace(trace_id)
     if not trace:
     if not trace:
         raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
         raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
-    
+
+    # 检查是否仍在运行
     if trace_id in _running_tasks and not _running_tasks[trace_id].done():
     if trace_id in _running_tasks and not _running_tasks[trace_id].done():
         raise HTTPException(status_code=409, detail="Cannot compact a running trace. Stop it first.")
         raise HTTPException(status_code=409, detail="Cannot compact a running trace. Stop it first.")
 
 
-    # 获取当前主路径历史
-    goal_tree = await runner.trace_store.get_goal_tree(trace_id)
-    main_path = await runner.trace_store.get_main_path_messages(trace_id, trace.head_sequence)
-    if not main_path:
-        return CompactResponse(
-            trace_id=trace_id,
-            previous_count=0,
-            new_count=0,
-            message="No messages to compact."
-        )
-
-    history = [msg.to_llm_dict() for msg in main_path]
-    current_count = len(history)
-    
-    head_seq = main_path[-1].sequence
-    next_seq = trace.last_sequence + 1
-    
-    # 构造配置用于压缩逻辑
+    # 使用 force_side_branch 触发压缩侧分支
     config = RunConfig(
     config = RunConfig(
-        trace_id=trace_id, 
+        trace_id=trace_id,
         model=trace.model or "gpt-4o",
         model=trace.model or "gpt-4o",
-        enable_prompt_caching=True
+        force_side_branch="compression",
+        max_iterations=20,  # 给侧分支足够的轮次
+        enable_prompt_caching=True,
     )
     )
 
 
-    # 调用 runner 的内部压缩逻辑
-    try:
-        new_history, new_head, new_next_seq = await runner._compress_history(
-            trace_id=trace_id,
-            history=history,
-            goal_tree=goal_tree,
-            config=config,
-            sequence=next_seq,
-            head_seq=head_seq,
-        )
-        
-        new_count = len(new_history)
-        
-        return CompactResponse(
-            trace_id=trace_id,
-            previous_count=current_count,
-            new_count=new_count,
-            message=f"Context compacted: {current_count} -> {new_count} messages."
-        )
-    except Exception as e:
-        logger.error(f"Compaction failed for {trace_id}: {e}")
-        import traceback
-        logger.error(traceback.format_exc())
-        raise HTTPException(status_code=500, detail=f"Compaction failed: {str(e)}")
+    # 启动压缩任务(后台执行)
+    task = asyncio.create_task(_run_trace_background(runner, [], config))
+    _running_tasks[trace_id] = task
+
+    return CompactResponse(
+        trace_id=trace_id,
+        previous_count=0,  # 无法立即获取,需通过 WebSocket 监听
+        new_count=0,
+        message="压缩任务已启动,通过 WebSocket 监听实时更新",
+    )
 
 
 
 
 @router.get("/running", tags=["run"])
 @router.get("/running", tags=["run"])