""" 交互式控制器 提供暂停/继续、交互式菜单、经验总结等功能。 """ import sys import asyncio from typing import Optional, Dict, Any from pathlib import Path from agent.core.runner import AgentRunner from agent.trace import TraceStore # ===== 非阻塞 stdin 检测 ===== if sys.platform == 'win32': import msvcrt def check_stdin() -> Optional[str]: """ 跨平台非阻塞检查 stdin 输入。 Windows: 使用 msvcrt.kbhit() macOS/Linux: 使用 select.select() Returns: 'pause' | 'quit' | None """ if sys.platform == 'win32': # Windows: 检查是否有按键按下 if msvcrt.kbhit(): ch = msvcrt.getwch().lower() if ch == 'p': return 'pause' if ch == 'q': return 'quit' return None else: # Unix/Mac: 使用 select import select ready, _, _ = select.select([sys.stdin], [], [], 0) if ready: line = sys.stdin.readline().strip().lower() if line in ('p', 'pause'): return 'pause' if line in ('q', 'quit'): return 'quit' return None def read_multiline() -> str: """ 读取多行输入,以连续两次回车(空行)结束。 Returns: 用户输入的多行文本 """ print("\n请输入干预消息(连续输入两次回车结束):") lines = [] blank_count = 0 while True: line = input() if line == "": blank_count += 1 if blank_count >= 2: break lines.append("") # 保留单个空行 else: blank_count = 0 lines.append(line) # 去掉尾部多余空行 while lines and lines[-1] == "": lines.pop() return "\n".join(lines) # ===== 交互式控制器 ===== class InteractiveController: """ 交互式控制器 管理暂停/继续、交互式菜单、经验总结等交互功能。 """ def __init__( self, runner: AgentRunner, store: TraceStore, enable_stdin_check: bool = True ): """ 初始化交互式控制器 Args: runner: Agent Runner 实例 store: Trace Store 实例 enable_stdin_check: 是否启用 stdin 检查 """ self.runner = runner self.store = store self.enable_stdin_check = enable_stdin_check def check_stdin(self) -> Optional[str]: """ 检查 stdin 输入 Returns: 'pause' | 'quit' | None """ if not self.enable_stdin_check: return None return check_stdin() async def show_menu( self, trace_id: str, current_sequence: int ) -> Dict[str, Any]: """ 显示交互式菜单 Args: trace_id: Trace ID current_sequence: 当前消息序号 Returns: 用户选择的操作 """ print("\n" + "=" * 60) print(" 执行已暂停") print("=" * 60) print("请选择操作:") print(" 1. 插入干预消息并继续") print(" 2. 触发经验总结(reflect)") print(" 3. 查看当前 GoalTree") print(" 4. 手动压缩上下文(compact)") print(" 5. 继续执行") print(" 6. 停止执行") print("=" * 60) while True: choice = input("请输入选项 (1-6): ").strip() if choice == "1": # 插入干预消息 text = read_multiline() if not text: print("未输入任何内容,取消操作") continue print(f"\n将插入干预消息并继续执行...") # 从 store 读取实际的 last_sequence live_trace = await self.store.get_trace(trace_id) actual_sequence = live_trace.last_sequence if live_trace and live_trace.last_sequence else current_sequence return { "action": "continue", "messages": [{"role": "user", "content": text}], "after_sequence": actual_sequence, } elif choice == "2": # 触发经验总结 print("\n触发经验总结...") focus = input("请输入反思重点(可选,直接回车跳过): ").strip() await self.perform_reflection(trace_id, focus=focus) continue elif choice == "3": # 查看 GoalTree goal_tree = await self.store.get_goal_tree(trace_id) if goal_tree and goal_tree.goals: print("\n当前 GoalTree:") print(goal_tree.to_prompt()) else: print("\n当前没有 Goal") continue elif choice == "4": # 手动压缩上下文 await self.manual_compact(trace_id) continue elif choice == "5": # 继续执行 print("\n继续执行...") return {"action": "continue"} elif choice == "6": # 停止执行 print("\n停止执行...") return {"action": "stop"} else: print("无效选项,请重新输入") async def perform_reflection( self, trace_id: str, focus: str = "" ): """ 执行经验总结 通过调用 API 端点触发反思侧分支。 Args: trace_id: Trace ID focus: 反思重点(可选) """ import httpx print("正在启动反思任务...") try: # 调用 reflect API 端点 async with httpx.AsyncClient() as client: payload = {} if focus: payload["focus"] = focus response = await client.post( f"http://localhost:8000/api/traces/{trace_id}/reflect", json=payload, timeout=10.0 ) response.raise_for_status() result = response.json() print(f"✅ 反思任务已启动: {result.get('message', '')}") print("提示:可通过 WebSocket 监听实时进度") except httpx.HTTPError as e: print(f"❌ 反思任务启动失败: {e}") except Exception as e: print(f"❌ 发生错误: {e}") async def manual_compact(self, trace_id: str): """ 手动压缩上下文 通过调用 API 端点触发压缩侧分支。 Args: trace_id: Trace ID """ import httpx print("\n正在启动上下文压缩任务...") try: # 调用 compact API 端点 async with httpx.AsyncClient() as client: response = await client.post( f"http://localhost:8000/api/traces/{trace_id}/compact", timeout=10.0 ) response.raise_for_status() result = response.json() print(f"✅ 压缩任务已启动: {result.get('message', '')}") print("提示:可通过 WebSocket 监听实时进度") except httpx.HTTPError as e: print(f"❌ 压缩任务启动失败: {e}") except Exception as e: print(f"❌ 发生错误: {e}")