| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- """
- Process Research Pipeline v3:单 Agent 完成搜索 + 能力提取 + 策略归纳
- 每个需求输出到 output/{N}/:
- - case.json 工序案例(含步骤级能力详情)
- - strategy.json 策略 × case 关联索引
- - process.json 策略 × 能力流水线详情
- 用法:
- python run.py # 跑全部需求
- python run.py --from 2 # 从第3个需求续跑
- python run.py --requirements req.json # 指定需求文件
- python run.py --remote # 强制远端模式(覆盖 config.USE_REMOTE_RESEARCH)
- python run.py --local # 强制本地模式
- 环境变量:
- KNOWHUB_API 线上 KnowHub 地址(默认 http://localhost:9999)
- """
- import argparse
- import json
- import os
- import sys
- import asyncio
- from datetime import datetime
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.cli import InteractiveController
- from agent.utils import setup_logging
- from agent.tools.builtin.subagent import _run_remote_agent
- from agent.tools.builtin.browser.baseClass import init_browser_session, kill_browser_session
- from config import (
- COORDINATOR_RUN_CONFIG,
- OUTPUT_DIR,
- SKILLS_DIR, TRACE_STORE_PATH, DEBUG, LOG_LEVEL, LOG_FILE,
- BROWSER_TYPE, HEADLESS,
- IM_ENABLED, IM_CONTACT_ID, IM_SERVER_URL, IM_WINDOW_MODE, IM_NOTIFY_INTERVAL,
- USE_REMOTE_RESEARCH,
- )
- # ─────────────────────────────────────────────
- # 本地模式:单需求执行
- # ─────────────────────────────────────────────
- async def run_single_local(
- runner: AgentRunner,
- interactive: InteractiveController,
- store: FileSystemTraceStore,
- prompt: SimplePrompt,
- requirement: str,
- output_dir: Path,
- req_index: int,
- ) -> tuple[str, bool]:
- """本地 AgentRunner 模式执行单个需求,返回 (最终回复, 是否应退出)。"""
- output_dir.mkdir(parents=True, exist_ok=True)
- messages = prompt.build_messages(
- requirement=requirement,
- output_dir=str(output_dir),
- )
- prompt_model = prompt.config.get("model", None)
- run_config = RunConfig(
- model=prompt_model or COORDINATOR_RUN_CONFIG.model,
- temperature=COORDINATOR_RUN_CONFIG.temperature,
- max_iterations=COORDINATOR_RUN_CONFIG.max_iterations,
- extra_llm_params=COORDINATOR_RUN_CONFIG.extra_llm_params,
- agent_type=COORDINATOR_RUN_CONFIG.agent_type,
- name=f"工序调研:需求{req_index+1:03d}",
- knowledge=COORDINATOR_RUN_CONFIG.knowledge,
- )
- print(f"\n{'=' * 60}")
- print(f"[{req_index+1:03d}] 开始调研 【本地模式】")
- print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}")
- print(f"输出:{output_dir}")
- print(f"{'=' * 60}")
- current_trace_id = None
- current_sequence = 0
- final_response = ""
- should_exit = False
- try:
- async for item in runner.run(messages=messages, config=run_config):
- cmd = interactive.check_stdin()
- if cmd == 'pause':
- print("\n⏸️ 正在暂停执行...")
- if current_trace_id:
- await runner.stop(current_trace_id)
- await asyncio.sleep(0.5)
- menu_result = await interactive.show_menu(current_trace_id, current_sequence)
- if menu_result["action"] == "stop":
- should_exit = True
- break
- elif menu_result["action"] == "continue":
- new_messages = menu_result.get("messages", [])
- run_config.after_sequence = menu_result.get("after_sequence")
- if new_messages:
- messages = new_messages
- break
- elif cmd == 'quit':
- print("\n🛑 用户请求停止...")
- if current_trace_id:
- await runner.stop(current_trace_id)
- should_exit = True
- break
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- print(f"[Trace] 开始: {item.trace_id[:8]}...")
- elif item.status == "completed":
- print(f"\n[Trace] ✅ 完成 messages={item.total_messages} cost=${item.total_cost:.4f}")
- elif item.status == "failed":
- print(f"\n[Trace] ❌ 失败: {item.error_message}")
- elif item.status == "stopped":
- print(f"\n[Trace] ⏸️ 已停止")
- elif isinstance(item, Message):
- current_sequence = item.sequence
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- if text and not tool_calls:
- final_response = text
- print(f"\n[Response] Agent 回复:")
- print(text)
- elif text:
- preview = text[:150] + "..." if len(text) > 150 else text
- print(f"[Assistant] {preview}")
- elif item.role == "tool":
- content = item.content
- tool_name = "unknown"
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- if item.description and item.description != tool_name:
- desc = item.description[:80]
- print(f"[Tool] ✅ {tool_name}: {desc}...")
- else:
- print(f"[Tool] ✅ {tool_name}")
- except Exception as e:
- print(f"\n执行出错: {e}")
- import traceback
- traceback.print_exc()
- if current_trace_id:
- print(f" Trace ID: {current_trace_id}")
- return final_response, should_exit
- # ─────────────────────────────────────────────
- # 远端模式:单需求执行
- # ─────────────────────────────────────────────
- async def run_single_remote(
- requirement: str,
- output_dir: Path,
- req_index: int,
- ) -> tuple[str, bool]:
- """HTTP 调用线上 KnowHub remote_research agent,返回 (摘要, False)。"""
- output_dir.mkdir(parents=True, exist_ok=True)
- print(f"\n{'=' * 60}")
- print(f"[{req_index+1:03d}] 开始调研 【远端模式 → KnowHub remote_research】")
- print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}")
- print(f"输出:{output_dir}")
- print(f"{'=' * 60}")
- result = await _run_remote_agent(
- agent_type="remote_research",
- task=requirement,
- messages=None,
- continue_from=None,
- skills=None,
- )
- status = result.get("status", "unknown")
- summary = result.get("summary", "")
- error = result.get("error")
- stats = result.get("stats", {})
- if status == "completed":
- print(f"\n[Remote] ✅ 完成 tokens={stats.get('total_tokens', 0)} cost=${stats.get('total_cost', 0.0):.4f}")
- else:
- print(f"\n[Remote] ❌ 失败: {error}")
- if result.get("sub_trace_id"):
- print(f" Remote Trace ID: {result['sub_trace_id']}")
- return summary or "", False
- # ─────────────────────────────────────────────
- # Main
- # ─────────────────────────────────────────────
- async def main():
- parser = argparse.ArgumentParser(description="Process Research Pipeline v3")
- parser.add_argument(
- "--from", dest="from_index", type=int, default=0,
- help="从第几个需求开始(0-based)",
- )
- parser.add_argument(
- "--only", dest="only_index", type=int, default=None,
- help="只执行指定的第几个需求(0-based),这会覆盖 --from",
- )
- parser.add_argument(
- "--requirements", type=str, default=None,
- help="需求列表 JSON 文件路径(默认 db_requirements.json)",
- )
- parser.add_argument(
- "--remote", action="store_true",
- help="强制使用远端模式(覆盖 config.USE_REMOTE_RESEARCH)",
- )
- parser.add_argument(
- "--local", action="store_true",
- help="强制使用本地模式(覆盖 config.USE_REMOTE_RESEARCH)",
- )
- parser.add_argument(
- "--parallel", action="store_true",
- help="强制开启并发多浏览器机制(覆盖 config.PARALLEL_TOOL_EXECUTION)",
- )
- parser.add_argument(
- "--sequential", action="store_true",
- help="强制开启单步串行机制(覆盖 config.PARALLEL_TOOL_EXECUTION)",
- )
- args = parser.parse_args()
- # 决定是否用远端
- use_remote = USE_REMOTE_RESEARCH
- if args.remote:
- use_remote = True
- if args.local:
- use_remote = False
- # 决定并发模式
- is_parallel = COORDINATOR_RUN_CONFIG.parallel_tool_execution
- if args.parallel:
- is_parallel = True
- if args.sequential:
- is_parallel = False
- COORDINATOR_RUN_CONFIG.parallel_tool_execution = is_parallel
-
- # 根据并发模式覆盖浏览器类型(并发必须用云浏览器防止冲突)
- browser_type = BROWSER_TYPE
- if is_parallel:
- browser_type = "cloud"
- elif args.sequential:
- browser_type = "local" # 串行可以选择回本地
- base_dir = Path(__file__).parent
- project_root = base_dir.parent.parent
- output_root = project_root / OUTPUT_DIR
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
- # 加载 presets
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- print("已加载 presets")
- # 读取需求
- req_path = Path(args.requirements) if args.requirements else base_dir / "db_requirements.json"
- if not req_path.exists():
- print(f"错误: 需求文件不存在: {req_path}")
- sys.exit(1)
- with open(req_path, encoding='utf-8') as f:
- requirements = json.load(f)
- if not isinstance(requirements, list) or len(requirements) == 0:
- print("错误: 需求文件必须是非空 JSON 数组")
- sys.exit(1)
- output_root.mkdir(parents=True, exist_ok=True)
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- total = len(requirements)
- start = args.from_index
- print("=" * 60)
- print(f"Process Research Pipeline v3")
- print(f"执行引擎:{'并发多云并发 (Parallel)' if is_parallel else '单步串行序列 (Sequential)'}")
- print(f"模式:{'远端 KnowHub' if use_remote else '本地'}")
- if args.only_index is not None:
- print(f"模式:仅执行第 {args.only_index} 个需求")
- else:
- print(f"共 {total} 个需求,从第 {start} 个开始")
- print("=" * 60)
- # IM 初始化(可选)
- if IM_ENABLED and not use_remote:
- from agent.tools.builtin.im.chat import im_setup, im_open_window
- result = await im_setup(
- contact_id=IM_CONTACT_ID,
- server_url=IM_SERVER_URL,
- notify_interval=IM_NOTIFY_INTERVAL,
- )
- print(f"IM: {result.output}")
- if IM_WINDOW_MODE:
- window_result = await im_open_window(contact_id=IM_CONTACT_ID)
- print(f"IM: {window_result.output}")
- # 初始化本地浏览器
- if not use_remote:
- print(f"正在初始化浏览器环境 ({browser_type})...")
- await init_browser_session(
- browser_type=browser_type,
- headless=HEADLESS,
- url="https://www.google.com/",
- profile_name=""
- )
- # 本地模式:初始化 runner
- runner = None
- interactive = None
- prompt = None
- if not use_remote:
- prompt_path = base_dir / "prompts" / "coordinator.prompt"
- prompt = SimplePrompt(prompt_path)
- prompt_model = prompt.config.get("model", None) or COORDINATOR_RUN_CONFIG.model
- runner = AgentRunner(
- trace_store=store,
- llm_call=create_qwen_llm_call(model=prompt_model),
- skills_dir=SKILLS_DIR,
- debug=DEBUG,
- )
- interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True)
- runner.stdin_check = interactive.check_stdin
- print("💡 输入 'p' 暂停,'q' 退出")
- print("=" * 60)
- completed = 0
- try:
- for i, requirement in enumerate(requirements):
- if args.only_index is not None:
- if i != args.only_index:
- continue
- else:
- if i < start:
- continue
- # 为防止两个模式同时跑起冲突,动态附加模式后缀
- mode_suffix = "_parallel" if is_parallel else "_sequential"
- req_output_dir = output_root / f"{(i+1):03d}{mode_suffix}"
- if use_remote:
- _, _ = await run_single_remote(
- requirement=requirement,
- output_dir=req_output_dir,
- req_index=i,
- )
- else:
- _, should_exit = await run_single_local(
- runner=runner,
- interactive=interactive,
- store=store,
- prompt=prompt,
- requirement=requirement,
- output_dir=req_output_dir,
- req_index=i,
- )
- if should_exit:
- print(f"\n🛑 用户中止,已完成 {completed}/{total - start} 个需求")
- break
- completed += 1
- print(f"\n✓ [{(i+1):03d}] 完成,输出:{req_output_dir}")
- print(f" - case.json")
- print(f" - strategy.json")
- except KeyboardInterrupt:
- print(f"\n\n用户中断 (Ctrl+C),已完成 {completed}/{total - start} 个需求")
- finally:
- if not use_remote:
- try:
- await kill_browser_session()
- except Exception:
- pass
- print()
- print("=" * 60)
- print(f"完成:{completed}/{total - start} 个需求")
- print(f"输出根目录:{output_root}")
- print("=" * 60)
- if __name__ == "__main__":
- asyncio.run(main())
|