""" Process Research Pipeline v3:单 Agent 完成搜索 + 能力提取 + 策略归纳 每个需求输出到 output/{N}/: - case.json 工序案例(含步骤级能力详情) - strategy.json 策略 × case 关联索引 - process.json 策略 × 能力流水线详情 用法: python run.py # 跑全部需求 python run.py --from 2 # 从第3个需求续跑 python run.py --requirements req.json # 指定需求文件 python run.py --remote # 强制远端模式(覆盖 config.USE_REMOTE_RESEARCH) python run.py --local # 强制本地模式 环境变量: KNOWHUB_API 线上 KnowHub 地址(默认 http://localhost:9999) """ import argparse import json import os import sys import asyncio from datetime import datetime from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner, RunConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.cli import InteractiveController from agent.utils import setup_logging from agent.tools.builtin.subagent import _run_remote_agent from agent.tools.builtin.browser.baseClass import init_browser_session, kill_browser_session from config import ( COORDINATOR_RUN_CONFIG, OUTPUT_DIR, SKILLS_DIR, TRACE_STORE_PATH, DEBUG, LOG_LEVEL, LOG_FILE, BROWSER_TYPE, HEADLESS, IM_ENABLED, IM_CONTACT_ID, IM_SERVER_URL, IM_WINDOW_MODE, IM_NOTIFY_INTERVAL, USE_REMOTE_RESEARCH, ) # ───────────────────────────────────────────── # 本地模式:单需求执行 # ───────────────────────────────────────────── async def run_single_local( runner: AgentRunner, interactive: InteractiveController, store: FileSystemTraceStore, prompt: SimplePrompt, requirement: str, output_dir: Path, req_index: int, ) -> tuple[str, bool]: """本地 AgentRunner 模式执行单个需求,返回 (最终回复, 是否应退出)。""" output_dir.mkdir(parents=True, exist_ok=True) messages = prompt.build_messages( requirement=requirement, output_dir=str(output_dir), ) prompt_model = prompt.config.get("model", None) run_config = RunConfig( model=prompt_model or COORDINATOR_RUN_CONFIG.model, temperature=COORDINATOR_RUN_CONFIG.temperature, max_iterations=COORDINATOR_RUN_CONFIG.max_iterations, extra_llm_params=COORDINATOR_RUN_CONFIG.extra_llm_params, agent_type=COORDINATOR_RUN_CONFIG.agent_type, name=f"工序调研:需求{req_index+1:03d}", knowledge=COORDINATOR_RUN_CONFIG.knowledge, ) print(f"\n{'=' * 60}") print(f"[{req_index+1:03d}] 开始调研 【本地模式】") print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}") print(f"输出:{output_dir}") print(f"{'=' * 60}") current_trace_id = None current_sequence = 0 final_response = "" should_exit = False try: async for item in runner.run(messages=messages, config=run_config): cmd = interactive.check_stdin() if cmd == 'pause': print("\n⏸️ 正在暂停执行...") if current_trace_id: await runner.stop(current_trace_id) await asyncio.sleep(0.5) menu_result = await interactive.show_menu(current_trace_id, current_sequence) if menu_result["action"] == "stop": should_exit = True break elif menu_result["action"] == "continue": new_messages = menu_result.get("messages", []) run_config.after_sequence = menu_result.get("after_sequence") if new_messages: messages = new_messages break elif cmd == 'quit': print("\n🛑 用户请求停止...") if current_trace_id: await runner.stop(current_trace_id) should_exit = True break if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": print(f"[Trace] 开始: {item.trace_id[:8]}...") elif item.status == "completed": print(f"\n[Trace] ✅ 完成 messages={item.total_messages} cost=${item.total_cost:.4f}") elif item.status == "failed": print(f"\n[Trace] ❌ 失败: {item.error_message}") elif item.status == "stopped": print(f"\n[Trace] ⏸️ 已停止") elif isinstance(item, Message): current_sequence = item.sequence if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") if text and not tool_calls: final_response = text print(f"\n[Response] Agent 回复:") print(text) elif text: preview = text[:150] + "..." if len(text) > 150 else text print(f"[Assistant] {preview}") elif item.role == "tool": content = item.content tool_name = "unknown" if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") if item.description and item.description != tool_name: desc = item.description[:80] print(f"[Tool] ✅ {tool_name}: {desc}...") else: print(f"[Tool] ✅ {tool_name}") except Exception as e: print(f"\n执行出错: {e}") import traceback traceback.print_exc() if current_trace_id: print(f" Trace ID: {current_trace_id}") return final_response, should_exit # ───────────────────────────────────────────── # 远端模式:单需求执行 # ───────────────────────────────────────────── async def run_single_remote( requirement: str, output_dir: Path, req_index: int, ) -> tuple[str, bool]: """HTTP 调用线上 KnowHub remote_research agent,返回 (摘要, False)。""" output_dir.mkdir(parents=True, exist_ok=True) print(f"\n{'=' * 60}") print(f"[{req_index+1:03d}] 开始调研 【远端模式 → KnowHub remote_research】") print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}") print(f"输出:{output_dir}") print(f"{'=' * 60}") result = await _run_remote_agent( agent_type="remote_research", task=requirement, messages=None, continue_from=None, skills=None, ) status = result.get("status", "unknown") summary = result.get("summary", "") error = result.get("error") stats = result.get("stats", {}) if status == "completed": print(f"\n[Remote] ✅ 完成 tokens={stats.get('total_tokens', 0)} cost=${stats.get('total_cost', 0.0):.4f}") else: print(f"\n[Remote] ❌ 失败: {error}") if result.get("sub_trace_id"): print(f" Remote Trace ID: {result['sub_trace_id']}") return summary or "", False # ───────────────────────────────────────────── # Main # ───────────────────────────────────────────── async def main(): parser = argparse.ArgumentParser(description="Process Research Pipeline v3") parser.add_argument( "--from", dest="from_index", type=int, default=0, help="从第几个需求开始(0-based)", ) parser.add_argument( "--only", dest="only_index", type=int, default=None, help="只执行指定的第几个需求(0-based),这会覆盖 --from", ) parser.add_argument( "--requirements", type=str, default=None, help="需求列表 JSON 文件路径(默认 db_requirements.json)", ) parser.add_argument( "--remote", action="store_true", help="强制使用远端模式(覆盖 config.USE_REMOTE_RESEARCH)", ) parser.add_argument( "--local", action="store_true", help="强制使用本地模式(覆盖 config.USE_REMOTE_RESEARCH)", ) parser.add_argument( "--parallel", action="store_true", help="强制开启并发多浏览器机制(覆盖 config.PARALLEL_TOOL_EXECUTION)", ) parser.add_argument( "--sequential", action="store_true", help="强制开启单步串行机制(覆盖 config.PARALLEL_TOOL_EXECUTION)", ) args = parser.parse_args() # 决定是否用远端 use_remote = USE_REMOTE_RESEARCH if args.remote: use_remote = True if args.local: use_remote = False # 决定并发模式 is_parallel = COORDINATOR_RUN_CONFIG.parallel_tool_execution if args.parallel: is_parallel = True if args.sequential: is_parallel = False COORDINATOR_RUN_CONFIG.parallel_tool_execution = is_parallel # 根据并发模式覆盖浏览器类型(并发必须用云浏览器防止冲突) browser_type = BROWSER_TYPE if is_parallel: browser_type = "cloud" elif args.sequential: browser_type = "local" # 串行可以选择回本地 base_dir = Path(__file__).parent project_root = base_dir.parent.parent output_root = project_root / OUTPUT_DIR setup_logging(level=LOG_LEVEL, file=LOG_FILE) # 加载 presets presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) print("已加载 presets") # 读取需求 req_path = Path(args.requirements) if args.requirements else base_dir / "db_requirements.json" if not req_path.exists(): print(f"错误: 需求文件不存在: {req_path}") sys.exit(1) with open(req_path, encoding='utf-8') as f: requirements = json.load(f) if not isinstance(requirements, list) or len(requirements) == 0: print("错误: 需求文件必须是非空 JSON 数组") sys.exit(1) output_root.mkdir(parents=True, exist_ok=True) store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) total = len(requirements) start = args.from_index print("=" * 60) print(f"Process Research Pipeline v3") print(f"执行引擎:{'并发多云并发 (Parallel)' if is_parallel else '单步串行序列 (Sequential)'}") print(f"模式:{'远端 KnowHub' if use_remote else '本地'}") if args.only_index is not None: print(f"模式:仅执行第 {args.only_index} 个需求") else: print(f"共 {total} 个需求,从第 {start} 个开始") print("=" * 60) # IM 初始化(可选) if IM_ENABLED and not use_remote: from agent.tools.builtin.im.chat import im_setup, im_open_window result = await im_setup( contact_id=IM_CONTACT_ID, server_url=IM_SERVER_URL, notify_interval=IM_NOTIFY_INTERVAL, ) print(f"IM: {result.output}") if IM_WINDOW_MODE: window_result = await im_open_window(contact_id=IM_CONTACT_ID) print(f"IM: {window_result.output}") # 初始化本地浏览器 if not use_remote: print(f"正在初始化浏览器环境 ({browser_type})...") await init_browser_session( browser_type=browser_type, headless=HEADLESS, url="https://www.google.com/", profile_name="" ) # 本地模式:初始化 runner runner = None interactive = None prompt = None if not use_remote: prompt_path = base_dir / "prompts" / "coordinator.prompt" prompt = SimplePrompt(prompt_path) prompt_model = prompt.config.get("model", None) or COORDINATOR_RUN_CONFIG.model runner = AgentRunner( trace_store=store, llm_call=create_qwen_llm_call(model=prompt_model), skills_dir=SKILLS_DIR, debug=DEBUG, ) interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True) runner.stdin_check = interactive.check_stdin print("💡 输入 'p' 暂停,'q' 退出") print("=" * 60) completed = 0 try: for i, requirement in enumerate(requirements): if args.only_index is not None: if i != args.only_index: continue else: if i < start: continue # 为防止两个模式同时跑起冲突,动态附加模式后缀 mode_suffix = "_parallel" if is_parallel else "_sequential" req_output_dir = output_root / f"{(i+1):03d}{mode_suffix}" if use_remote: _, _ = await run_single_remote( requirement=requirement, output_dir=req_output_dir, req_index=i, ) else: _, should_exit = await run_single_local( runner=runner, interactive=interactive, store=store, prompt=prompt, requirement=requirement, output_dir=req_output_dir, req_index=i, ) if should_exit: print(f"\n🛑 用户中止,已完成 {completed}/{total - start} 个需求") break completed += 1 print(f"\n✓ [{(i+1):03d}] 完成,输出:{req_output_dir}") print(f" - case.json") print(f" - strategy.json") except KeyboardInterrupt: print(f"\n\n用户中断 (Ctrl+C),已完成 {completed}/{total - start} 个需求") finally: if not use_remote: try: await kill_browser_session() except Exception: pass print() print("=" * 60) print(f"完成:{completed}/{total - start} 个需求") print(f"输出根目录:{output_root}") print("=" * 60) if __name__ == "__main__": asyncio.run(main())