interactive.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. """
  2. 交互式控制器
  3. 提供暂停/继续、交互式菜单、经验总结等功能。
  4. """
  5. import sys
  6. import asyncio
  7. from typing import Optional, Dict, Any
  8. from pathlib import Path
  9. from agent.core.runner import AgentRunner
  10. from agent.trace import TraceStore
  11. from agent.trace.models import Message, Trace
  12. # ===== 非阻塞 stdin 检测 =====
  13. if sys.platform == 'win32':
  14. import msvcrt
  15. def check_stdin() -> Optional[str]:
  16. """
  17. 跨平台非阻塞检查 stdin 输入。
  18. 支持终端输入和控制文件(用于后台进程控制)。
  19. 优先级:
  20. 1. 检查控制文件 .agent_control(用于后台进程)
  21. 2. 检查终端/管道输入
  22. Returns:
  23. 'pause' | 'quit' | None
  24. """
  25. # 1. 优先检查控制文件(用于后台进程控制)
  26. control_file = Path.cwd() / ".agent_control"
  27. if control_file.exists():
  28. try:
  29. cmd = control_file.read_text(encoding='utf-8').strip().lower()
  30. control_file.unlink() # 读取后立即删除
  31. if cmd in ('p', 'pause'):
  32. return 'pause'
  33. if cmd in ('q', 'quit'):
  34. return 'quit'
  35. except Exception:
  36. pass
  37. # 2. 检查终端/管道输入
  38. if sys.platform == 'win32':
  39. # Windows: 先检查是否是终端
  40. if sys.stdin.isatty():
  41. # 终端模式:使用 msvcrt
  42. if msvcrt.kbhit():
  43. ch = msvcrt.getwch().lower()
  44. if ch == 'p':
  45. return 'pause'
  46. if ch == 'q':
  47. return 'quit'
  48. else:
  49. # 管道模式:尝试非阻塞读取
  50. try:
  51. import select
  52. ready, _, _ = select.select([sys.stdin], [], [], 0)
  53. if ready:
  54. line = sys.stdin.readline().strip().lower()
  55. if line in ('p', 'pause'):
  56. return 'pause'
  57. if line in ('q', 'quit'):
  58. return 'quit'
  59. except Exception:
  60. pass
  61. return None
  62. else:
  63. # Unix/Mac: 使用 select(支持终端和管道)
  64. import select
  65. ready, _, _ = select.select([sys.stdin], [], [], 0)
  66. if ready:
  67. line = sys.stdin.readline().strip().lower()
  68. if line in ('p', 'pause'):
  69. return 'pause'
  70. if line in ('q', 'quit'):
  71. return 'quit'
  72. return None
  73. def read_multiline() -> str:
  74. """
  75. 读取多行输入,以连续两次回车(空行)结束。
  76. Returns:
  77. 用户输入的多行文本
  78. """
  79. print("\n请输入干预消息(连续输入两次回车结束):")
  80. lines = []
  81. blank_count = 0
  82. while True:
  83. line = input()
  84. if line == "":
  85. blank_count += 1
  86. if blank_count >= 2:
  87. break
  88. lines.append("") # 保留单个空行
  89. else:
  90. blank_count = 0
  91. lines.append(line)
  92. # 去掉尾部多余空行
  93. while lines and lines[-1] == "":
  94. lines.pop()
  95. return "\n".join(lines)
  96. # ===== 交互式控制器 =====
  97. class InteractiveController:
  98. """
  99. 交互式控制器
  100. 管理暂停/继续、交互式菜单、经验总结等交互功能。
  101. """
  102. def __init__(
  103. self,
  104. runner: AgentRunner,
  105. store: TraceStore,
  106. enable_stdin_check: bool = True
  107. ):
  108. """
  109. 初始化交互式控制器
  110. Args:
  111. runner: Agent Runner 实例
  112. store: Trace Store 实例
  113. enable_stdin_check: 是否启用 stdin 检查
  114. """
  115. self.runner = runner
  116. self.store = store
  117. self.enable_stdin_check = enable_stdin_check
  118. def check_stdin(self) -> Optional[str]:
  119. """
  120. 检查 stdin 输入
  121. Returns:
  122. 'pause' | 'quit' | None
  123. """
  124. if not self.enable_stdin_check:
  125. return None
  126. return check_stdin()
  127. async def show_menu(
  128. self,
  129. trace_id: str,
  130. current_sequence: int
  131. ) -> Dict[str, Any]:
  132. """
  133. 显示交互式菜单
  134. Args:
  135. trace_id: Trace ID
  136. current_sequence: 当前消息序号
  137. Returns:
  138. 用户选择的操作
  139. """
  140. print("\n" + "=" * 60)
  141. print(" 执行已暂停")
  142. print("=" * 60)
  143. print("请选择操作:")
  144. print(" 1. 插入干预消息并继续")
  145. print(" 2. 触发经验总结(reflect)")
  146. print(" 3. 查看当前 GoalTree")
  147. print(" 4. 手动压缩上下文(compact)")
  148. print(" 5. 从指定消息续跑")
  149. print(" 6. 继续执行")
  150. print(" 7. 停止执行")
  151. print("=" * 60)
  152. while True:
  153. choice = input("请输入选项 (1-7): ").strip()
  154. if choice == "1":
  155. # 插入干预消息
  156. text = read_multiline()
  157. if not text:
  158. print("未输入任何内容,取消操作")
  159. continue
  160. print(f"\n将插入干预消息并继续执行...")
  161. # 从 store 读取实际的 last_sequence
  162. live_trace = await self.store.get_trace(trace_id)
  163. actual_sequence = live_trace.last_sequence if live_trace and live_trace.last_sequence else current_sequence
  164. return {
  165. "action": "continue",
  166. "messages": [{"role": "user", "content": text}],
  167. "after_sequence": actual_sequence,
  168. }
  169. elif choice == "2":
  170. # 触发经验总结
  171. print("\n触发经验总结...")
  172. focus = input("请输入反思重点(可选,直接回车跳过): ").strip()
  173. await self.perform_reflection(trace_id, focus=focus)
  174. continue
  175. elif choice == "3":
  176. # 查看 GoalTree
  177. goal_tree = await self.store.get_goal_tree(trace_id)
  178. if goal_tree and goal_tree.goals:
  179. print("\n当前 GoalTree:")
  180. print(goal_tree.to_prompt())
  181. else:
  182. print("\n当前没有 Goal")
  183. continue
  184. elif choice == "4":
  185. # 手动压缩上下文
  186. await self.manual_compact(trace_id)
  187. continue
  188. elif choice == "5":
  189. # 从指定消息续跑
  190. await self.resume_from_message(trace_id)
  191. return {"action": "stop"} # 返回 stop,让外层循环退出
  192. elif choice == "6":
  193. # 继续执行
  194. print("\n继续执行...")
  195. return {"action": "continue"}
  196. elif choice == "7":
  197. # 停止执行
  198. print("\n停止执行...")
  199. return {"action": "stop"}
  200. else:
  201. print("无效选项,请重新输入")
  202. async def perform_reflection(
  203. self,
  204. trace_id: str,
  205. focus: str = ""
  206. ):
  207. """
  208. 执行经验总结
  209. 通过调用 API 端点触发反思侧分支。
  210. Args:
  211. trace_id: Trace ID
  212. focus: 反思重点(可选)
  213. """
  214. import httpx
  215. print("正在启动反思任务...")
  216. try:
  217. # 调用 reflect API 端点
  218. async with httpx.AsyncClient() as client:
  219. payload = {}
  220. if focus:
  221. payload["focus"] = focus
  222. response = await client.post(
  223. f"http://localhost:8000/api/traces/{trace_id}/reflect",
  224. json=payload,
  225. timeout=10.0
  226. )
  227. response.raise_for_status()
  228. result = response.json()
  229. print(f"✅ 反思任务已启动: {result.get('message', '')}")
  230. print("提示:可通过 WebSocket 监听实时进度")
  231. except httpx.HTTPError as e:
  232. print(f"❌ 反思任务启动失败: {e}")
  233. except Exception as e:
  234. print(f"❌ 发生错误: {e}")
  235. async def manual_compact(self, trace_id: str):
  236. """
  237. 手动压缩上下文
  238. 通过调用 API 端点触发压缩侧分支。
  239. Args:
  240. trace_id: Trace ID
  241. """
  242. import httpx
  243. print("\n正在启动上下文压缩任务...")
  244. try:
  245. # 调用 compact API 端点
  246. async with httpx.AsyncClient() as client:
  247. response = await client.post(
  248. f"http://localhost:8000/api/traces/{trace_id}/compact",
  249. timeout=10.0
  250. )
  251. response.raise_for_status()
  252. result = response.json()
  253. print(f"✅ 压缩任务已启动: {result.get('message', '')}")
  254. print("提示:可通过 WebSocket 监听实时进度")
  255. except httpx.HTTPError as e:
  256. print(f"❌ 压缩任务启动失败: {e}")
  257. except Exception as e:
  258. print(f"❌ 发生错误: {e}")
  259. async def resume_from_message(self, trace_id: str):
  260. """
  261. 从指定消息续跑
  262. 让用户选择一条消息,然后从该消息之后重新执行。
  263. Args:
  264. trace_id: Trace ID
  265. """
  266. print("\n正在加载消息列表...")
  267. # 1. 获取所有消息
  268. messages = await self.store.get_trace_messages(trace_id)
  269. if not messages:
  270. print("❌ 没有找到任何消息")
  271. return
  272. # 2. 显示消息列表(只显示 user 和 assistant 消息)
  273. display_messages = [
  274. msg for msg in messages
  275. if msg.role in ("user", "assistant")
  276. ]
  277. if not display_messages:
  278. print("❌ 没有可选择的消息")
  279. return
  280. print("\n" + "=" * 60)
  281. print(" 消息列表")
  282. print("=" * 60)
  283. for i, msg in enumerate(display_messages, 1):
  284. role_label = "👤 User" if msg.role == "user" else "🤖 Assistant"
  285. content_preview = self._get_content_preview(msg.content)
  286. print(f"{i}. [{msg.sequence:04d}] {role_label}: {content_preview}")
  287. print("=" * 60)
  288. # 3. 让用户选择
  289. while True:
  290. choice = input(f"\n请选择消息编号 (1-{len(display_messages)}),或输入 'c' 取消: ").strip()
  291. if choice.lower() == 'c':
  292. print("已取消")
  293. return
  294. try:
  295. idx = int(choice) - 1
  296. if 0 <= idx < len(display_messages):
  297. selected_msg = display_messages[idx]
  298. break
  299. else:
  300. print(f"无效编号,请输入 1-{len(display_messages)}")
  301. except ValueError:
  302. print("无效输入,请输入数字或 'c'")
  303. # 4. 确认是否重新生成最后一条消息
  304. regenerate_last = False
  305. if selected_msg.role == "assistant":
  306. confirm = input("\n是否重新生成这条 Assistant 消息?(y/n): ").strip().lower()
  307. regenerate_last = (confirm == 'y')
  308. # 5. 询问是否插入干预消息
  309. insert_message = input("\n是否插入一条干预消息?(y/n): ").strip().lower()
  310. intervention_msg = None
  311. if insert_message == 'y':
  312. intervention_msg = read_multiline()
  313. if not intervention_msg.strip():
  314. print("⚠️ 干预消息为空,将不插入")
  315. intervention_msg = None
  316. # 6. 调用 runner.run() 续跑
  317. print(f"\n从消息 {selected_msg.sequence:04d} 之后续跑...")
  318. if regenerate_last:
  319. print("将重新生成最后一条 Assistant 消息")
  320. try:
  321. # 加载 trace
  322. trace = await self.store.get_trace(trace_id)
  323. if not trace:
  324. print("❌ Trace 不存在")
  325. return
  326. # 计算 after_sequence:如果需要重新生成,回退到上一条消息
  327. after_seq = selected_msg.sequence
  328. if regenerate_last and selected_msg.role == "assistant":
  329. # 找到这条 assistant 消息的 parent_sequence
  330. after_seq = selected_msg.parent_sequence or (selected_msg.sequence - 1)
  331. # 导入 RunConfig
  332. from agent.core.runner import RunConfig
  333. # 构建运行配置
  334. config = RunConfig(
  335. trace_id=trace_id,
  336. after_sequence=after_seq,
  337. model=trace.model,
  338. temperature=trace.llm_params.get("temperature", 0.3),
  339. max_iterations=200,
  340. )
  341. # 准备消息列表(如果有干预消息则插入)
  342. messages_to_add = []
  343. if intervention_msg:
  344. messages_to_add.append({"role": "user", "content": intervention_msg})
  345. print(f"\n✓ 将插入干预消息")
  346. # 调用 runner.run() 续跑
  347. print("\n开始执行...")
  348. async for event in self.runner.run(
  349. messages=messages_to_add,
  350. config=config,
  351. ):
  352. # 处理事件输出
  353. if isinstance(event, Message):
  354. if event.role == "assistant":
  355. content = event.content
  356. if isinstance(content, dict):
  357. text = content.get("text", "")
  358. else:
  359. text = str(content)
  360. if text:
  361. print(f"\n🤖 Assistant: {text[:200]}...")
  362. elif isinstance(event, Trace):
  363. # Trace 状态更新
  364. if event.status in ("completed", "failed", "stopped"):
  365. print(f"\n📊 Trace 状态: {event.status}")
  366. print("\n✅ 执行完成")
  367. except Exception as e:
  368. print(f"❌ 执行失败: {e}")
  369. import traceback
  370. traceback.print_exc()
  371. def _get_content_preview(self, content: Any, max_length: int = 60) -> str:
  372. """
  373. 获取消息内容预览
  374. Args:
  375. content: 消息内容
  376. max_length: 最大长度
  377. Returns:
  378. 内容预览字符串
  379. """
  380. if isinstance(content, dict):
  381. text = content.get("text", "")
  382. tool_calls = content.get("tool_calls", [])
  383. # 处理 text 字段(可能是字符串、list、dict 或其他类型)
  384. if isinstance(text, str) and text.strip():
  385. preview = text.strip()
  386. elif isinstance(text, list):
  387. # 多模态内容:提取文本部分
  388. text_parts = [
  389. part.get("text", "") for part in text
  390. if isinstance(part, dict) and part.get("type") == "text"
  391. ]
  392. preview = " ".join(text_parts).strip() if text_parts else "[多模态内容]"
  393. elif isinstance(text, dict):
  394. # text 本身是 dict,尝试提取有用信息
  395. preview = str(text.get("value", text.get("text", "[复杂内容]")))
  396. elif tool_calls:
  397. preview = f"[调用工具: {', '.join(tc.get('function', {}).get('name', '?') for tc in tool_calls)}]"
  398. else:
  399. preview = "[空消息]"
  400. elif isinstance(content, str):
  401. preview = content.strip()
  402. else:
  403. preview = str(content)
  404. if len(preview) > max_length:
  405. preview = preview[:max_length] + "..."
  406. return preview