""" 交互式控制器 提供暂停/继续、交互式菜单、经验总结等功能。 """ import sys import asyncio from typing import Optional, Dict, Any from pathlib import Path from agent.core.runner import AgentRunner from agent.trace import TraceStore # ===== 非阻塞 stdin 检测 ===== if sys.platform == 'win32': import msvcrt def check_stdin() -> Optional[str]: """ 跨平台非阻塞检查 stdin 输入。 Windows: 使用 msvcrt.kbhit() macOS/Linux: 使用 select.select() Returns: 'pause' | 'quit' | None """ if sys.platform == 'win32': # Windows: 检查是否有按键按下 if msvcrt.kbhit(): ch = msvcrt.getwch().lower() if ch == 'p': return 'pause' if ch == 'q': return 'quit' return None else: # Unix/Mac: 使用 select import select ready, _, _ = select.select([sys.stdin], [], [], 0) if ready: line = sys.stdin.readline().strip().lower() if line in ('p', 'pause'): return 'pause' if line in ('q', 'quit'): return 'quit' return None def read_multiline() -> str: """ 读取多行输入,以连续两次回车(空行)结束。 Returns: 用户输入的多行文本 """ print("\n请输入干预消息(连续输入两次回车结束):") lines = [] blank_count = 0 while True: line = input() if line == "": blank_count += 1 if blank_count >= 2: break lines.append("") # 保留单个空行 else: blank_count = 0 lines.append(line) # 去掉尾部多余空行 while lines and lines[-1] == "": lines.pop() return "\n".join(lines) # ===== 交互式控制器 ===== class InteractiveController: """ 交互式控制器 管理暂停/继续、交互式菜单、经验总结等交互功能。 """ def __init__( self, runner: AgentRunner, store: TraceStore, enable_stdin_check: bool = True ): """ 初始化交互式控制器 Args: runner: Agent Runner 实例 store: Trace Store 实例 enable_stdin_check: 是否启用 stdin 检查 """ self.runner = runner self.store = store self.enable_stdin_check = enable_stdin_check def check_stdin(self) -> Optional[str]: """ 检查 stdin 输入 Returns: 'pause' | 'quit' | None """ if not self.enable_stdin_check: return None return check_stdin() async def show_menu( self, trace_id: str, current_sequence: int ) -> Dict[str, Any]: """ 显示交互式菜单 Args: trace_id: Trace ID current_sequence: 当前消息序号 Returns: 用户选择的操作 """ print("\n" + "=" * 60) print(" 执行已暂停") print("=" * 60) print("请选择操作:") print(" 1. 插入干预消息并继续") print(" 2. 触发经验总结(reflect)") print(" 3. 查看当前 GoalTree") print(" 4. 手动压缩上下文(compact)") print(" 5. 继续执行") print(" 6. 停止执行") print("=" * 60) while True: choice = input("请输入选项 (1-6): ").strip() if choice == "1": # 插入干预消息 text = read_multiline() if not text: print("未输入任何内容,取消操作") continue print(f"\n将插入干预消息并继续执行...") # 从 store 读取实际的 last_sequence live_trace = await self.store.get_trace(trace_id) actual_sequence = live_trace.last_sequence if live_trace and live_trace.last_sequence else current_sequence return { "action": "continue", "messages": [{"role": "user", "content": text}], "after_sequence": actual_sequence, } elif choice == "2": # 触发经验总结 print("\n触发经验总结...") focus = input("请输入反思重点(可选,直接回车跳过): ").strip() await self.perform_reflection(trace_id, focus=focus) continue elif choice == "3": # 查看 GoalTree goal_tree = await self.store.get_goal_tree(trace_id) if goal_tree and goal_tree.goals: print("\n当前 GoalTree:") print(goal_tree.to_prompt()) else: print("\n当前没有 Goal") continue elif choice == "4": # 手动压缩上下文 await self.manual_compact(trace_id) continue elif choice == "5": # 继续执行 print("\n继续执行...") return {"action": "continue"} elif choice == "6": # 停止执行 print("\n停止执行...") return {"action": "stop"} else: print("无效选项,请重新输入") async def perform_reflection( self, trace_id: str, focus: str = "" ): """ 执行经验总结 Args: trace_id: Trace ID focus: 反思重点(可选) """ from agent.core.prompts.knowledge import build_reflect_prompt from agent.core.runner import RunConfig trace = await self.store.get_trace(trace_id) if not trace: print("未找到 Trace") return saved_head = trace.head_sequence # 构建反思 prompt prompt = build_reflect_prompt() if focus: prompt += f"\n\n请特别关注:{focus}" print("正在生成反思...") reflect_cfg = RunConfig(trace_id=trace_id, max_iterations=1, tools=[]) try: result = await self.runner.run_result( messages=[{"role": "user", "content": prompt}], config=reflect_cfg, ) reflection_text = result.get("summary", "") if reflection_text: print("\n--- 反思内容 ---") print(reflection_text) print("--- 结束 ---\n") else: print("未生成反思内容") finally: # 恢复 head_sequence(反思消息成为侧枝,不污染主对话) await self.store.update_trace(trace_id, head_sequence=saved_head) async def manual_compact(self, trace_id: str): """ 手动压缩上下文 Args: trace_id: Trace ID """ from agent.core.runner import RunConfig print("\n正在执行上下文压缩(compact)...") try: goal_tree = await self.store.get_goal_tree(trace_id) trace = await self.store.get_trace(trace_id) if not trace: print("未找到 Trace,无法压缩") return # 重建当前 history main_path = await self.store.get_main_path_messages(trace_id, trace.head_sequence) history = [msg.to_llm_dict() for msg in main_path] head_seq = main_path[-1].sequence if main_path else 0 next_seq = head_seq + 1 compact_config = RunConfig(trace_id=trace_id) new_history, new_head, new_seq = await self.runner._compress_history( trace_id=trace_id, history=history, goal_tree=goal_tree, config=compact_config, sequence=next_seq, head_seq=head_seq, ) print(f"\n✅ 压缩完成: {len(history)} 条消息 → {len(new_history)} 条") except Exception as e: print(f"\n❌ 压缩失败: {e}")