""" 两阶段 Pipeline:工具调研 + 工作流分析 Stage 1:批量调研(qwen3.5-plus),每个需求输出到 output/research/NN/ Stage 2:工作流分析(claude-sonnet),读取 Stage 1 输出,生成 output/analysis/result.json 用法: python run.py # 完整两阶段(默认) python run.py --stage research # 只跑调研 python run.py --stage analysis # 只跑分析(用已有调研结果) python run.py --stage research --from 2 # 从第3个需求续跑 """ import argparse import json import os import sys import asyncio from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner, RunConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call, create_openrouter_llm_call from agent.cli import InteractiveController from agent.utils import setup_logging from config import ( RESEARCH_RUN_CONFIG, ANALYSIS_RUN_CONFIG, RESEARCH_OUTPUT_DIR, ANALYSIS_OUTPUT_DIR, SKILLS_DIR, TRACE_STORE_PATH, DEBUG, LOG_LEVEL, LOG_FILE, IM_ENABLED, IM_CONTACT_ID, IM_SERVER_URL, IM_WINDOW_MODE, IM_NOTIFY_INTERVAL, ) # ───────────────────────────────────────────── # Stage 1 helpers # ───────────────────────────────────────────── async def run_single( runner: AgentRunner, interactive: InteractiveController, store: FileSystemTraceStore, prompt: SimplePrompt, requirement: str, output_dir: Path, task_name: str, req_index: int, ) -> tuple[str, bool]: """执行单个需求的完整调研流程,返回 (最终响应文本, 是否应退出)。""" output_dir.mkdir(parents=True, exist_ok=True) messages = prompt.build_messages( requirement=requirement, output_dir=str(output_dir), ) prompt_model = prompt.config.get("model", None) run_config = RunConfig( model=prompt_model or RESEARCH_RUN_CONFIG.model, temperature=RESEARCH_RUN_CONFIG.temperature, max_iterations=RESEARCH_RUN_CONFIG.max_iterations, extra_llm_params=RESEARCH_RUN_CONFIG.extra_llm_params, agent_type=RESEARCH_RUN_CONFIG.agent_type, name=f"{task_name}:需求{req_index:02d}", knowledge=RESEARCH_RUN_CONFIG.knowledge, ) print(f"\n{'=' * 60}") print(f"[{req_index:02d}] 开始调研") print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}") print(f"输出:{output_dir}") print(f"{'=' * 60}") current_trace_id = None current_sequence = 0 final_response = "" should_exit = False try: async for item in runner.run(messages=messages, config=run_config): cmd = interactive.check_stdin() if cmd == 'pause': print("\n⏸️ 正在暂停执行...") if current_trace_id: await runner.stop(current_trace_id) await asyncio.sleep(0.5) menu_result = await interactive.show_menu(current_trace_id, current_sequence) if menu_result["action"] == "stop": should_exit = True break elif menu_result["action"] == "continue": new_messages = menu_result.get("messages", []) run_config.after_sequence = menu_result.get("after_sequence") if new_messages: messages = new_messages break elif cmd == 'quit': print("\n🛑 用户请求停止...") if current_trace_id: await runner.stop(current_trace_id) should_exit = True break if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": print(f"[Trace] 开始: {item.trace_id[:8]}...") elif item.status == "completed": print(f"\n[Trace] ✅ 完成 messages={item.total_messages} cost=${item.total_cost:.4f}") elif item.status == "failed": print(f"\n[Trace] ❌ 失败: {item.error_message}") elif item.status == "stopped": print(f"\n[Trace] ⏸️ 已停止") elif isinstance(item, Message): current_sequence = item.sequence if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") if text and not tool_calls: final_response = text print(f"\n[Response] Agent 回复:") print(text) elif text: preview = text[:150] + "..." if len(text) > 150 else text print(f"[Assistant] {preview}") elif item.role == "tool": content = item.content tool_name = "unknown" if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") if item.description and item.description != tool_name: desc = item.description[:80] if len(item.description) > 80 else item.description print(f"[Tool Result] ✅ {tool_name}: {desc}...") else: print(f"[Tool Result] ✅ {tool_name}") except Exception as e: print(f"\n执行出错: {e}") import traceback traceback.print_exc() if final_response: output_file = output_dir / "result.txt" with open(output_file, 'w', encoding='utf-8') as f: f.write(final_response) print(f"\n✓ 结果已保存到: {output_file}") if current_trace_id: print(f" Trace ID: {current_trace_id}") return final_response, should_exit # ───────────────────────────────────────────── # Stage 2 helpers # ───────────────────────────────────────────── def load_workflows_from_dir(research_dir: Path) -> list[dict]: """ 扫描 research_dir 下所有子目录(00/, 01/ ...),合并工序发现列表。 优先读取 workflows.json(Stage 1 新格式); 若不存在则把目录内 *.md 文件内容作为文本传给 coordinator(兜底)。 """ workflows = [] wf_index = 1 subdirs = sorted( [d for d in research_dir.iterdir() if d.is_dir()], key=lambda d: d.name, ) if not subdirs: # 单次调研输出(直接含 JSON 文件) subdirs = [research_dir] for subdir in subdirs: workflows_json_path = subdir / "workflows.json" # ── 优先:读取 workflows.json ── if workflows_json_path.exists(): try: with open(workflows_json_path, encoding='utf-8') as f: data = json.load(f) discovered = data.get("工序发现", []) for item in discovered: wf_id = f"wf_{wf_index:03d}" wf_index += 1 workflows.append({ "id": wf_id, "name": item.get("方案名称", "未命名工序"), "category": "", "source_channel": item.get("来源渠道", "未知"), "source_file": str(workflows_json_path.relative_to(research_dir)), "steps": item.get("工序步骤", []), "post_links": list(item.get("帖子链接", [])), }) print(f" + {wf_id}: {item.get('方案名称', '未命名')[:50]}") continue except (json.JSONDecodeError, IOError) as e: print(f" [警告] workflows.json 解析失败: {subdir.name} ({e}),尝试 Markdown 兜底") # ── 兜底:读取 *.md 文件内容 ── md_files = sorted(subdir.glob("*.md")) if md_files: for md_file in md_files: try: content = md_file.read_text(encoding='utf-8') wf_id = f"wf_{wf_index:03d}" wf_index += 1 workflows.append({ "id": wf_id, "name": md_file.stem, "category": "", "source_channel": "Markdown报告", "source_file": str(md_file.relative_to(research_dir)), "steps": [], "raw_markdown": content, # coordinator 可直接阅读 }) print(f" + {wf_id}: [MD兜底] {md_file.name}") except IOError as e: print(f" [警告] 无法读取 {md_file.name}: {e}") else: print(f" [跳过] {subdir.name}:无 workflows.json 也无 .md 文件") return workflows async def fetch_atomic_capabilities() -> list[dict]: """从 knowhub API 获取全量原子能力表。""" import urllib.request knowhub_api = os.getenv("KNOWHUB_API", "http://43.106.118.91:9999") url = f"{knowhub_api}/api/capability?limit=500" try: with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read().decode()) capabilities = data.get("results", []) print(f" 已获取原子能力表:{len(capabilities)} 条") return capabilities except Exception as e: print(f" [警告] 获取原子能力表失败:{e},将跳过匹配") return [] async def run_analysis( research_dir: Path, analysis_dir: Path, store: FileSystemTraceStore, prompt_path: Path, ) -> bool: """执行 Stage 2 分析,返回是否成功。""" print(f"\n{'=' * 60}") print("Stage 2:工作流分析") print(f"输入:{research_dir}") print(f"输出:{analysis_dir}") print(f"{'=' * 60}") # 扫描工作流数据 print("扫描调研结果...") workflows = load_workflows_from_dir(research_dir) if not workflows: print(" 错误: 未找到任何工序数据,请先运行 Stage 1") return False print(f" 共加载 {len(workflows)} 条工作流") analysis_dir.mkdir(parents=True, exist_ok=True) # 获取原子能力表并写入文件 print("获取原子能力表...") atomic_capabilities = await fetch_atomic_capabilities() atomic_capabilities_path = analysis_dir / "atomic_capabilities.json" atomic_capabilities_path.write_text( json.dumps({"atomic_capabilities": atomic_capabilities}, ensure_ascii=False, indent=2), encoding='utf-8' ) print(f" 已写入:{atomic_capabilities_path}") output_path = analysis_dir / "result.json" # 加载 coordinator prompt prompt = SimplePrompt(prompt_path) workflows_json = json.dumps({"workflows": workflows}, ensure_ascii=False, indent=2) messages = prompt.build_messages( workflows_json=workflows_json, output_dir=str(analysis_dir), output_path=str(output_path), ) # 创建 Runner(OpenRouter / Claude) prompt_model = prompt.config.get("model", None) or ANALYSIS_RUN_CONFIG.model print(f" 模型: {prompt_model}") runner = AgentRunner( trace_store=store, llm_call=create_openrouter_llm_call(model=prompt_model), skills_dir=SKILLS_DIR, debug=DEBUG, ) interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True) runner.stdin_check = interactive.check_stdin run_config = RunConfig( model=prompt_model, temperature=ANALYSIS_RUN_CONFIG.temperature, max_iterations=ANALYSIS_RUN_CONFIG.max_iterations, agent_type=ANALYSIS_RUN_CONFIG.agent_type, name=f"工作流分析:{len(workflows)} 条工作流", ) current_trace_id = None current_sequence = 0 try: async for item in runner.run(messages=messages, config=run_config): cmd = interactive.check_stdin() if cmd == 'pause': print("\n⏸️ 正在暂停...") if current_trace_id: await runner.stop(current_trace_id) await asyncio.sleep(0.5) menu_result = await interactive.show_menu(current_trace_id, current_sequence) if menu_result["action"] == "stop": break elif menu_result["action"] == "continue": new_messages = menu_result.get("messages", []) run_config.after_sequence = menu_result.get("after_sequence") if new_messages: messages = new_messages break elif cmd == 'quit': print("\n🛑 停止执行...") if current_trace_id: await runner.stop(current_trace_id) break if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": print(f"[Trace] 开始: {item.trace_id[:8]}...") elif item.status == "completed": print(f"\n[Trace] ✅ 完成 messages={item.total_messages} cost=${item.total_cost:.4f}") elif item.status == "failed": print(f"\n[Trace] ❌ 失败: {item.error_message}") elif item.status == "stopped": print(f"\n[Trace] ⏸️ 已停止") elif isinstance(item, Message): current_sequence = item.sequence if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") if text and not tool_calls: print(f"\n[Response]\n{text}") elif text: preview = text[:150] + "..." if len(text) > 150 else text print(f"[Assistant] {preview}") elif item.role == "tool": content = item.content tool_name = "unknown" if isinstance(content, dict): tool_name = content.get("tool_name", "unknown") if item.description and item.description != tool_name: desc = item.description[:80] if len(item.description) > 80 else item.description print(f"[Tool] ✅ {tool_name}: {desc}...") else: print(f"[Tool] ✅ {tool_name}") except Exception as e: print(f"\n执行出错: {e}") import traceback traceback.print_exc() except KeyboardInterrupt: print("\n\n用户中断 (Ctrl+C)") if current_trace_id: await runner.stop(current_trace_id) # 结果摘要 print() print("=" * 60) if output_path.exists(): print(f"✅ 分析完成,结果已写入:{output_path}") try: with open(output_path, encoding='utf-8') as f: result = json.load(f) n_modules = len(result.get("capability_modules", [])) n_coarse = len(result.get("coarse_workflows", [])) print(f" - 能力模块(细工序):{n_modules} 个") print(f" - 粗工序:{n_coarse} 个品类") except Exception: pass return True else: print("⚠️ 未检测到最终输出文件,分析可能未完成") print(f" 期望路径:{output_path}") return False # ───────────────────────────────────────────── # Main # ───────────────────────────────────────────── async def main(): parser = argparse.ArgumentParser(description="两阶段 Pipeline:工具调研 + 工作流分析") parser.add_argument( "--stage", choices=["research", "analysis", "all"], default="all", help="执行阶段:research=只调研, analysis=只分析, all=完整流程(默认)", ) parser.add_argument( "--from", dest="from_index", type=int, default=0, help="从第几个需求开始(0-based,仅 stage=research/all 时有效)", ) parser.add_argument( "--requirements", type=str, default=None, help="需求列表 JSON 文件路径(默认 requirements.json)", ) args = parser.parse_args() base_dir = Path(__file__).parent project_root = base_dir.parent.parent research_output_dir = project_root / RESEARCH_OUTPUT_DIR analysis_output_dir = project_root / ANALYSIS_OUTPUT_DIR setup_logging(level=LOG_LEVEL, file=LOG_FILE) # 加载 presets presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) print("已加载 presets") store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) # ── Stage 1: Research ── if args.stage in ("all", "research"): req_path = Path(args.requirements) if args.requirements else base_dir / "requirements.json" if not req_path.exists(): print(f"错误: 需求文件不存在: {req_path}") sys.exit(1) with open(req_path, encoding='utf-8') as f: requirements = json.load(f) if not isinstance(requirements, list) or len(requirements) == 0: print("错误: 需求文件必须是非空 JSON 数组") sys.exit(1) research_output_dir.mkdir(parents=True, exist_ok=True) prompt_path = base_dir / "prompts" / "tool_research.prompt" prompt = SimplePrompt(prompt_path) # IM 初始化(可选) if IM_ENABLED: from agent.tools.builtin.im.chat import im_setup, im_open_window result = await im_setup( contact_id=IM_CONTACT_ID, server_url=IM_SERVER_URL, notify_interval=IM_NOTIFY_INTERVAL, ) print(f"IM: {result.output}") if IM_WINDOW_MODE: window_result = await im_open_window(contact_id=IM_CONTACT_ID) print(f"IM: {window_result.output}") prompt_model = prompt.config.get("model", None) or RESEARCH_RUN_CONFIG.model runner = AgentRunner( trace_store=store, llm_call=create_qwen_llm_call(model=prompt_model), skills_dir=SKILLS_DIR, debug=DEBUG, ) interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True) runner.stdin_check = interactive.check_stdin task_name = RESEARCH_RUN_CONFIG.name or base_dir.name total = len(requirements) start = args.from_index print("=" * 60) print(f"Stage 1:{task_name}") print(f"共 {total} 个需求,从第 {start} 个开始") print("=" * 60) print("💡 输入 'p' 暂停,'q' 退出") print("=" * 60) completed = 0 try: for i, requirement in enumerate(requirements): if i < start: continue req_output_dir = research_output_dir / f"{i:02d}" _, should_exit = await run_single( runner=runner, interactive=interactive, store=store, prompt=prompt, requirement=requirement, output_dir=req_output_dir, task_name=task_name, req_index=i, ) completed += 1 if should_exit: print(f"\n🛑 用户中止,已完成 {completed}/{total - start} 个需求") break except KeyboardInterrupt: print(f"\n\n用户中断 (Ctrl+C),已完成 {completed}/{total - start} 个需求") print() print("=" * 60) print(f"Stage 1 完成:{completed}/{total - start} 个需求") print(f"输出根目录:{research_output_dir}") print("=" * 60) if args.stage == "all": # 统计已采集工作流数量(粗略) wf_count = sum( 1 for d in research_output_dir.iterdir() if d.is_dir() and (d / "workflows.json").exists() ) print(f"\n[Stage 1 完成] 共 {wf_count} 个目录含 workflows.json,自动进入 Stage 2 分析...") # ── Stage 2: Analysis ── if args.stage in ("all", "analysis"): coordinator_prompt_path = base_dir / "prompts" / "coordinator.prompt" await run_analysis( research_dir=research_output_dir, analysis_dir=analysis_output_dir, store=store, prompt_path=coordinator_prompt_path, ) if __name__ == "__main__": asyncio.run(main())