run.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. """
  2. Process Research Pipeline v3:单 Agent 完成搜索 + 能力提取 + 策略归纳
  3. 每个需求输出到 output/{N}/:
  4. - case.json 工序案例(含步骤级能力详情)
  5. - strategy.json 策略 × case 关联索引
  6. - process.json 策略 × 能力流水线详情
  7. 用法:
  8. python run.py # 跑全部需求
  9. python run.py --from 2 # 从第3个需求续跑
  10. python run.py --requirements req.json # 指定需求文件
  11. python run.py --remote # 强制远端模式(覆盖 config.USE_REMOTE_RESEARCH)
  12. python run.py --local # 强制本地模式
  13. 环境变量:
  14. KNOWHUB_API 线上 KnowHub 地址(默认 http://localhost:9999)
  15. """
  16. import argparse
  17. import json
  18. import os
  19. import sys
  20. import asyncio
  21. from datetime import datetime
  22. from pathlib import Path
  23. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  24. from dotenv import load_dotenv
  25. load_dotenv()
  26. from agent.llm.prompts import SimplePrompt
  27. from agent.core.runner import AgentRunner, RunConfig
  28. from agent.trace import FileSystemTraceStore, Trace, Message
  29. from agent.llm import create_qwen_llm_call
  30. from agent.cli import InteractiveController
  31. from agent.utils import setup_logging
  32. from agent.tools.builtin.subagent import _run_remote_agent
  33. from agent.tools.builtin.browser.baseClass import init_browser_session, kill_browser_session
  34. from config import (
  35. COORDINATOR_RUN_CONFIG,
  36. OUTPUT_DIR,
  37. SKILLS_DIR, TRACE_STORE_PATH, DEBUG, LOG_LEVEL, LOG_FILE,
  38. BROWSER_TYPE, HEADLESS,
  39. IM_ENABLED, IM_CONTACT_ID, IM_SERVER_URL, IM_WINDOW_MODE, IM_NOTIFY_INTERVAL,
  40. USE_REMOTE_RESEARCH,
  41. )
  42. # ─────────────────────────────────────────────
  43. # 本地模式:单需求执行
  44. # ─────────────────────────────────────────────
  45. async def run_single_local(
  46. runner: AgentRunner,
  47. interactive: InteractiveController,
  48. store: FileSystemTraceStore,
  49. prompt: SimplePrompt,
  50. requirement: str,
  51. output_dir: Path,
  52. req_index: int,
  53. ) -> tuple[str, bool]:
  54. """本地 AgentRunner 模式执行单个需求,返回 (最终回复, 是否应退出)。"""
  55. output_dir.mkdir(parents=True, exist_ok=True)
  56. messages = prompt.build_messages(
  57. requirement=requirement,
  58. output_dir=str(output_dir),
  59. )
  60. prompt_model = prompt.config.get("model", None)
  61. run_config = RunConfig(
  62. model=prompt_model or COORDINATOR_RUN_CONFIG.model,
  63. temperature=COORDINATOR_RUN_CONFIG.temperature,
  64. max_iterations=COORDINATOR_RUN_CONFIG.max_iterations,
  65. extra_llm_params=COORDINATOR_RUN_CONFIG.extra_llm_params,
  66. agent_type=COORDINATOR_RUN_CONFIG.agent_type,
  67. name=f"工序调研:需求{req_index+1:03d}",
  68. knowledge=COORDINATOR_RUN_CONFIG.knowledge,
  69. )
  70. print(f"\n{'=' * 60}")
  71. print(f"[{req_index+1:03d}] 开始调研 【本地模式】")
  72. print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}")
  73. print(f"输出:{output_dir}")
  74. print(f"{'=' * 60}")
  75. current_trace_id = None
  76. current_sequence = 0
  77. final_response = ""
  78. should_exit = False
  79. try:
  80. async for item in runner.run(messages=messages, config=run_config):
  81. cmd = interactive.check_stdin()
  82. if cmd == 'pause':
  83. print("\n⏸️ 正在暂停执行...")
  84. if current_trace_id:
  85. await runner.stop(current_trace_id)
  86. await asyncio.sleep(0.5)
  87. menu_result = await interactive.show_menu(current_trace_id, current_sequence)
  88. if menu_result["action"] == "stop":
  89. should_exit = True
  90. break
  91. elif menu_result["action"] == "continue":
  92. new_messages = menu_result.get("messages", [])
  93. run_config.after_sequence = menu_result.get("after_sequence")
  94. if new_messages:
  95. messages = new_messages
  96. break
  97. elif cmd == 'quit':
  98. print("\n🛑 用户请求停止...")
  99. if current_trace_id:
  100. await runner.stop(current_trace_id)
  101. should_exit = True
  102. break
  103. if isinstance(item, Trace):
  104. current_trace_id = item.trace_id
  105. if item.status == "running":
  106. print(f"[Trace] 开始: {item.trace_id[:8]}...")
  107. elif item.status == "completed":
  108. print(f"\n[Trace] ✅ 完成 messages={item.total_messages} cost=${item.total_cost:.4f}")
  109. elif item.status == "failed":
  110. print(f"\n[Trace] ❌ 失败: {item.error_message}")
  111. elif item.status == "stopped":
  112. print(f"\n[Trace] ⏸️ 已停止")
  113. elif isinstance(item, Message):
  114. current_sequence = item.sequence
  115. if item.role == "assistant":
  116. content = item.content
  117. if isinstance(content, dict):
  118. text = content.get("text", "")
  119. tool_calls = content.get("tool_calls")
  120. if text and not tool_calls:
  121. final_response = text
  122. print(f"\n[Response] Agent 回复:")
  123. print(text)
  124. elif text:
  125. preview = text[:150] + "..." if len(text) > 150 else text
  126. print(f"[Assistant] {preview}")
  127. elif item.role == "tool":
  128. content = item.content
  129. tool_name = "unknown"
  130. if isinstance(content, dict):
  131. tool_name = content.get("tool_name", "unknown")
  132. if item.description and item.description != tool_name:
  133. desc = item.description[:80]
  134. print(f"[Tool] ✅ {tool_name}: {desc}...")
  135. else:
  136. print(f"[Tool] ✅ {tool_name}")
  137. except Exception as e:
  138. print(f"\n执行出错: {e}")
  139. import traceback
  140. traceback.print_exc()
  141. if current_trace_id:
  142. print(f" Trace ID: {current_trace_id}")
  143. return final_response, should_exit
  144. # ─────────────────────────────────────────────
  145. # 远端模式:单需求执行
  146. # ─────────────────────────────────────────────
  147. async def run_single_remote(
  148. requirement: str,
  149. output_dir: Path,
  150. req_index: int,
  151. ) -> tuple[str, bool]:
  152. """HTTP 调用线上 KnowHub remote_research agent,返回 (摘要, False)。"""
  153. output_dir.mkdir(parents=True, exist_ok=True)
  154. print(f"\n{'=' * 60}")
  155. print(f"[{req_index+1:03d}] 开始调研 【远端模式 → KnowHub remote_research】")
  156. print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}")
  157. print(f"输出:{output_dir}")
  158. print(f"{'=' * 60}")
  159. result = await _run_remote_agent(
  160. agent_type="remote_research",
  161. task=requirement,
  162. messages=None,
  163. continue_from=None,
  164. skills=None,
  165. )
  166. status = result.get("status", "unknown")
  167. summary = result.get("summary", "")
  168. error = result.get("error")
  169. stats = result.get("stats", {})
  170. if status == "completed":
  171. print(f"\n[Remote] ✅ 完成 tokens={stats.get('total_tokens', 0)} cost=${stats.get('total_cost', 0.0):.4f}")
  172. else:
  173. print(f"\n[Remote] ❌ 失败: {error}")
  174. if result.get("sub_trace_id"):
  175. print(f" Remote Trace ID: {result['sub_trace_id']}")
  176. return summary or "", False
  177. # ─────────────────────────────────────────────
  178. # Main
  179. # ─────────────────────────────────────────────
  180. async def main():
  181. parser = argparse.ArgumentParser(description="Process Research Pipeline v3")
  182. parser.add_argument(
  183. "--from", dest="from_index", type=int, default=0,
  184. help="从第几个需求开始(0-based)",
  185. )
  186. parser.add_argument(
  187. "--only", dest="only_index", type=int, default=None,
  188. help="只执行指定的第几个需求(0-based),这会覆盖 --from",
  189. )
  190. parser.add_argument(
  191. "--requirements", type=str, default=None,
  192. help="需求列表 JSON 文件路径(默认 db_requirements.json)",
  193. )
  194. parser.add_argument(
  195. "--remote", action="store_true",
  196. help="强制使用远端模式(覆盖 config.USE_REMOTE_RESEARCH)",
  197. )
  198. parser.add_argument(
  199. "--local", action="store_true",
  200. help="强制使用本地模式(覆盖 config.USE_REMOTE_RESEARCH)",
  201. )
  202. parser.add_argument(
  203. "--parallel", action="store_true",
  204. help="强制开启并发多浏览器机制(覆盖 config.PARALLEL_TOOL_EXECUTION)",
  205. )
  206. parser.add_argument(
  207. "--sequential", action="store_true",
  208. help="强制开启单步串行机制(覆盖 config.PARALLEL_TOOL_EXECUTION)",
  209. )
  210. args = parser.parse_args()
  211. # 决定是否用远端
  212. use_remote = USE_REMOTE_RESEARCH
  213. if args.remote:
  214. use_remote = True
  215. if args.local:
  216. use_remote = False
  217. # 决定并发模式
  218. is_parallel = COORDINATOR_RUN_CONFIG.parallel_tool_execution
  219. if args.parallel:
  220. is_parallel = True
  221. if args.sequential:
  222. is_parallel = False
  223. COORDINATOR_RUN_CONFIG.parallel_tool_execution = is_parallel
  224. # 根据并发模式覆盖浏览器类型(并发必须用云浏览器防止冲突)
  225. browser_type = BROWSER_TYPE
  226. if is_parallel:
  227. browser_type = "cloud"
  228. elif args.sequential:
  229. browser_type = "local" # 串行可以选择回本地
  230. base_dir = Path(__file__).parent
  231. project_root = base_dir.parent.parent
  232. output_root = project_root / OUTPUT_DIR
  233. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  234. # 加载 presets
  235. presets_path = base_dir / "presets.json"
  236. if presets_path.exists():
  237. from agent.core.presets import load_presets_from_json
  238. load_presets_from_json(str(presets_path))
  239. print("已加载 presets")
  240. # 读取需求
  241. req_path = Path(args.requirements) if args.requirements else base_dir / "db_requirements.json"
  242. if not req_path.exists():
  243. print(f"错误: 需求文件不存在: {req_path}")
  244. sys.exit(1)
  245. with open(req_path, encoding='utf-8') as f:
  246. requirements = json.load(f)
  247. if not isinstance(requirements, list) or len(requirements) == 0:
  248. print("错误: 需求文件必须是非空 JSON 数组")
  249. sys.exit(1)
  250. output_root.mkdir(parents=True, exist_ok=True)
  251. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  252. total = len(requirements)
  253. start = args.from_index
  254. print("=" * 60)
  255. print(f"Process Research Pipeline v3")
  256. print(f"执行引擎:{'并发多云并发 (Parallel)' if is_parallel else '单步串行序列 (Sequential)'}")
  257. print(f"模式:{'远端 KnowHub' if use_remote else '本地'}")
  258. if args.only_index is not None:
  259. print(f"模式:仅执行第 {args.only_index} 个需求")
  260. else:
  261. print(f"共 {total} 个需求,从第 {start} 个开始")
  262. print("=" * 60)
  263. # IM 初始化(可选)
  264. if IM_ENABLED and not use_remote:
  265. from agent.tools.builtin.im.chat import im_setup, im_open_window
  266. result = await im_setup(
  267. contact_id=IM_CONTACT_ID,
  268. server_url=IM_SERVER_URL,
  269. notify_interval=IM_NOTIFY_INTERVAL,
  270. )
  271. print(f"IM: {result.output}")
  272. if IM_WINDOW_MODE:
  273. window_result = await im_open_window(contact_id=IM_CONTACT_ID)
  274. print(f"IM: {window_result.output}")
  275. # 初始化本地浏览器
  276. if not use_remote:
  277. print(f"正在初始化浏览器环境 ({browser_type})...")
  278. await init_browser_session(
  279. browser_type=browser_type,
  280. headless=HEADLESS,
  281. url="https://www.google.com/",
  282. profile_name=""
  283. )
  284. # 本地模式:初始化 runner
  285. runner = None
  286. interactive = None
  287. prompt = None
  288. if not use_remote:
  289. prompt_path = base_dir / "prompts" / "coordinator.prompt"
  290. prompt = SimplePrompt(prompt_path)
  291. prompt_model = prompt.config.get("model", None) or COORDINATOR_RUN_CONFIG.model
  292. runner = AgentRunner(
  293. trace_store=store,
  294. llm_call=create_qwen_llm_call(model=prompt_model),
  295. skills_dir=SKILLS_DIR,
  296. debug=DEBUG,
  297. )
  298. interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True)
  299. runner.stdin_check = interactive.check_stdin
  300. print("💡 输入 'p' 暂停,'q' 退出")
  301. print("=" * 60)
  302. completed = 0
  303. try:
  304. for i, requirement in enumerate(requirements):
  305. if args.only_index is not None:
  306. if i != args.only_index:
  307. continue
  308. else:
  309. if i < start:
  310. continue
  311. # 为防止两个模式同时跑起冲突,动态附加模式后缀
  312. mode_suffix = "_parallel" if is_parallel else "_sequential"
  313. req_output_dir = output_root / f"{(i+1):03d}{mode_suffix}"
  314. if use_remote:
  315. _, _ = await run_single_remote(
  316. requirement=requirement,
  317. output_dir=req_output_dir,
  318. req_index=i,
  319. )
  320. else:
  321. _, should_exit = await run_single_local(
  322. runner=runner,
  323. interactive=interactive,
  324. store=store,
  325. prompt=prompt,
  326. requirement=requirement,
  327. output_dir=req_output_dir,
  328. req_index=i,
  329. )
  330. if should_exit:
  331. print(f"\n🛑 用户中止,已完成 {completed}/{total - start} 个需求")
  332. break
  333. completed += 1
  334. print(f"\n✓ [{(i+1):03d}] 完成,输出:{req_output_dir}")
  335. print(f" - case.json")
  336. print(f" - strategy.json")
  337. except KeyboardInterrupt:
  338. print(f"\n\n用户中断 (Ctrl+C),已完成 {completed}/{total - start} 个需求")
  339. finally:
  340. if not use_remote:
  341. try:
  342. await kill_browser_session()
  343. except Exception:
  344. pass
  345. print()
  346. print("=" * 60)
  347. print(f"完成:{completed}/{total - start} 个需求")
  348. print(f"输出根目录:{output_root}")
  349. print("=" * 60)
  350. if __name__ == "__main__":
  351. asyncio.run(main())