""" V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research """ import argparse import asyncio import json import os import sys import time from datetime import datetime from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent # Force runtime working directory to project root so relative trace/cache paths # always land in the repo root no matter where this script is launched from. os.chdir(PROJECT_ROOT) # Add project root to path sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner, RunConfig from agent.tools.builtin.knowledge import KnowledgeConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.llm.openrouter import create_openrouter_llm_call from agent.llm.claude import create_claude_llm_call # config from existing setup from examples.process_research.config import ( OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE, BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG ) from agent.utils import setup_logging async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False): from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy base_dir = Path(__file__).parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" prompt = SimplePrompt(prompt_path) messages = prompt.build_messages(**kwargs) target_tools = [] if prompt_name == "extract_capabilities": target_tools = ["capability_search", "capability_list", "tool_search"] # 按 agent 类型配置工具组权限 tool_groups_map = { "researcher": ["core", "content"], # 搜索+文件,无浏览器 "filter_and_blueprint": ["core"], # 只需文件读写 "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充) "assemble_strategy": ["core"], # 只需文件读写 } total_task_cost = 0.0 task_errors = [] out_file = kwargs.get("output_file") max_retries = 3 last_trace_id = None last_validation_error = None final_trace_id = None # 用于返回最终成功的 trace_id for attempt in range(max_retries): if attempt > 0 and last_trace_id and last_validation_error: # 续跑模式:把错误信息告诉之前的 agent,让它修复 print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent") fix_messages = [{ "role": "user", "content": ( f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n" f"错误详情:{last_validation_error}\n\n" f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。" f"只修复有问题的部分,不要丢弃已有的正确内容。" ) }] fix_config = RunConfig( model=prompt.config.get("model") or model_name, temperature=0.1, name=f"{task_name}_Fix{attempt}", agent_type=prompt_name, tools=target_tools, tool_groups=tool_groups_map.get(prompt_name, ["core"]), trace_id=last_trace_id, knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) try: async for item in runner.run(messages=fix_messages, config=fix_config): if isinstance(item, Trace): last_trace_id = item.trace_id if item.status == "completed": total_task_cost += item.total_cost elif item.status == "failed": task_errors.append(f"{task_name} Fix Failed: {item.error_message}") if isinstance(item, Message) and item.role == "tool": content = item.content if isinstance(item.content, dict) else {} if content.get("tool_name") in ("write_file", "write_json"): print(f" 💾 [Fix File Written by {task_name}]") except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} fix crashed: {err_msg}") elif attempt > 0: # 没有 trace_id 或没有 validation error,只能完全重跑 print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart") if out_file and Path(out_file).exists(): Path(out_file).unlink() run_config = RunConfig( model=prompt.config.get("model") or model_name, temperature=prompt.config.get("temperature") or 0.3, name=f"{task_name}_A{attempt}", agent_type=prompt_name, tools=target_tools, tool_groups=tool_groups_map.get(prompt_name, ["core"]), knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})") try: async for item in runner.run(messages=messages, config=run_config): if isinstance(item, Trace): last_trace_id = item.trace_id if item.status == "completed": total_task_cost += item.total_cost elif item.status == "failed": task_errors.append(f"{task_name} Failed: {item.error_message}") if isinstance(item, Message): if item.role == "tool": content = item.content if isinstance(item.content, dict) else {} t_name = content.get("tool_name", "unknown") if t_name in ("write_file", "write_json"): print(f" 💾 [File Written by {task_name}]") except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} crashed: {err_msg}") else: # 首次执行 run_config = RunConfig( model=prompt.config.get("model") or model_name, temperature=prompt.config.get("temperature") or 0.3, name=f"{task_name}_A{attempt}", agent_type=prompt_name, tools=target_tools, tool_groups=tool_groups_map.get(prompt_name, ["core"]), knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})") try: async for item in runner.run(messages=messages, config=run_config): if isinstance(item, Trace): last_trace_id = item.trace_id if item.status == "completed": total_task_cost += item.total_cost elif item.status == "failed": task_errors.append(f"{task_name} Failed: {item.error_message}") if isinstance(item, Message): if item.role == "tool": content = item.content if isinstance(item.content, dict) else {} t_name = content.get("tool_name", "unknown") if t_name in ("write_file", "write_json"): print(f" 💾 [File Written by {task_name}]") except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} crashed: {err_msg}") # Verification & Recovery block if out_file and not Path(out_file).exists() and last_trace_id: print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...") recovery_messages = [{ "role": "user", "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!" }] rec_config = RunConfig( model=model_name, temperature=0.1, name=task_name + "_Rec", agent_type=prompt_name, trace_id=last_trace_id, knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) try: async for r_item in runner.run(messages=recovery_messages, config=rec_config): if isinstance(r_item, Trace): if r_item.status == "completed": total_task_cost += r_item.total_cost elif r_item.status == "failed": task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}") if isinstance(r_item, Message) and r_item.role == "tool": content = r_item.content if isinstance(r_item.content, dict) else {} if content.get("tool_name") in ("write_file", "write_json"): print(f" 💾 [Recovery File Written by {task_name}]") except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} recovery crashed: {err_msg}") # Schema Validation if out_file and Path(out_file).exists() and str(out_file).endswith(".json"): if skip_validation: print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)") return total_task_cost, task_errors, last_trace_id try: with open(out_file, "r", encoding="utf-8") as f: data = json.loads(f.read()) filename = Path(out_file).name err = None if filename.startswith("case_"): err = validate_case(data) elif filename == "blueprint.json": err = validate_blueprint(data) elif filename == "capabilities_extracted.json": err = validate_capabilities(data) elif filename == "strategy.json": err = validate_strategy(data) if err: raise ValueError(f"Schema Validation Failed: {err}") print(f" ✅ [Schema Validated] {Path(out_file).name}") final_trace_id = last_trace_id return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop. except Exception as e: err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}" print(f"❌ [Validation Error] {task_name}: {err_msg}") task_errors.append(f"{task_name} Error: {e}") last_validation_error = str(e) if attempt == max_retries - 1: print(f"❌ [Retry Limit] {task_name} exhausted retries.") return total_task_cost, task_errors, last_trace_id else: print(f"❌ [Missing File] {task_name} did not produce output file after recovery.") last_validation_error = None if attempt == max_retries - 1: return total_task_cost, task_errors, last_trace_id return total_task_cost, task_errors, last_trace_id async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False): """ 备用:使用纯净的官方 Anthropic SDK 驱动。 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。 """ from anthropic import AsyncAnthropic from agent.tools.registry import get_tool_registry base_dir = Path(__file__).parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" prompt = SimplePrompt(prompt_path) # 1. 组装输入 Message raw_messages = prompt.build_messages(**kwargs) system_prompt = "" messages = [] for msg in raw_messages: if msg["role"] == "system": system_prompt += msg["content"] + "\n\n" else: messages.append({"role": msg["role"], "content": msg["content"]}) # 2. 映射目标工具 target_tools = ["write_file", "write_json", "read_file", "glob_files"] if prompt_name == "extract_capabilities": target_tools.extend(["capability_search", "capability_list", "tool_search"]) registry = get_tool_registry() schemas = registry.get_schemas(target_tools) anthropic_tools = [] for s in schemas: anthropic_tools.append({ "name": s["function"]["name"], "description": s["function"].get("description", ""), "input_schema": s["function"]["parameters"] }) # 3. 初始化并开启 Loop # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量 client = AsyncAnthropic() total_task_cost = 0.0 task_errors = [] sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy out_file = kwargs.get("output_file") max_retries = 3 for attempt in range(max_retries): if attempt > 0: print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}") if out_file and Path(out_file).exists(): Path(out_file).unlink() print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})") # Reset messages for retry messages_copy = list(messages) max_loops = 50 for loop_idx in range(max_loops): try: # 去除前缀(兼容比如 openrouter 传入的名字) clean_model = model_name.split("/")[-1] if "/" in model_name else model_name # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5 target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model response = await client.messages.create( model=target_model, max_tokens=4096, temperature=0.2, system=system_prompt, messages=messages_copy, tools=anthropic_tools ) # (简略预估,不代表真实官方开销) if hasattr(response, 'usage'): step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0) total_task_cost += step_cost # 加入助手回复 assistant_content = [] tool_uses = [] for content_block in response.content: if content_block.type == "text": text_val = content_block.text if text_val: assistant_content.append({"type": "text", "text": text_val}) print(f"\n🤖 [{task_name} Output]:\n{text_val}\n") elif content_block.type == "tool_use": assistant_content.append({ "type": "tool_use", "id": content_block.id, "name": content_block.name, "input": content_block.input }) tool_uses.append(content_block) if not assistant_content: assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"}) messages_copy.append({"role": "assistant", "content": assistant_content}) # 出口:没有调用工具说明任务结束 if not tool_uses: print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})") break # 工具执行与回传 tool_results = [] for tu in tool_uses: if tu.name in ("write_file", "write_json"): print(f" 💾 [File Written by SDK] {task_name}") print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}") # 执行本地环境的函数 result_str = await registry.execute(tu.name, tu.input) tool_results.append({ "type": "tool_result", "tool_use_id": tu.id, "content": result_str }) messages_copy.append({"role": "user", "content": tool_results}) except Exception as e: err_msg = str(e) print(f"❌ [Fail SDK Core] {task_name}: {err_msg}") task_errors.append(err_msg) break # Verification & Recovery block for SDK (porting from AgentRunner) if out_file and not Path(out_file).exists(): print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...") messages_copy.append({ "role": "user", "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!" }) try: target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model rec_response = await client.messages.create( model=target_model, max_tokens=4096, temperature=0.1, system=system_prompt, messages=messages_copy, tools=anthropic_tools, tool_choice={"type": "any"} ) for content_block in rec_response.content: if content_block.type == "tool_use": if content_block.name in ("write_file", "write_json"): print(f" 💾 [Recovery File Written by SDK] {task_name}") await registry.execute(content_block.name, content_block.input) except Exception as e: print(f"❌ [Fail SDK Recovery] {task_name}: {e}") task_errors.append(str(e)) # Schema Validation if out_file and Path(out_file).exists() and str(out_file).endswith(".json"): if skip_validation: print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)") return total_task_cost, task_errors, None try: with open(out_file, "r", encoding="utf-8") as f: data = json.loads(f.read()) filename = Path(out_file).name err = None if filename.startswith("case_"): err = validate_case(data) elif filename == "blueprint.json": err = validate_blueprint(data) elif filename == "capabilities_extracted.json": err = validate_capabilities(data) elif filename == "strategy.json": err = validate_strategy(data) if err: raise ValueError(f"Schema Validation Failed: {err}") print(f" ✅ [Schema Validated] {Path(out_file).name}") return total_task_cost, task_errors # Success! Exit retry loop. except Exception as e: err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}" print(f"❌ [Validation Error] {task_name}: {err_msg}") task_errors.append(f"{task_name} Error: {e}") if attempt == max_retries - 1: print(f"❌ [Retry Limit] {task_name} exhausted retries.") return total_task_cost, task_errors else: print(f"❌ [Missing File] {task_name} did not produce output file after recovery.") if attempt == max_retries - 1: return total_task_cost, task_errors return total_task_cost, task_errors async def main(): parser = argparse.ArgumentParser() parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json") parser.add_argument("--skip-research", action="store_true", help="Skip Phase 1 and use existing raw cases") parser.add_argument("--research-only", action="store_true", help="Only run research phases, skip Phase 2 and 3") parser.add_argument("--platforms", type=str, default="xhs,youtube,bili,x", help="Comma-separated list of platforms to search") parser.add_argument("--use-claude-sdk", action="store_true", help="Use pure Anthropic SDK (run_anthropic_sdk_task) instead of internal AgentRunner for Phase 2/3") parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions") args = parser.parse_args() base_dir = Path(__file__).parent # Load requirements locally req_path = base_dir / "db_requirements.json" with open(req_path, encoding='utf-8') as f: reqs = json.load(f) if args.index < 0 or args.index >= len(reqs): print("Index out of bounds") sys.exit(1) requirement = reqs[args.index] # 0. Setup directories output_dir = base_dir / "output" / f"{(args.index+1):03d}" output_dir.mkdir(parents=True, exist_ok=True) raw_cases_dir = output_dir / "raw_cases" raw_cases_dir.mkdir(parents=True, exist_ok=True) setup_logging(level=LOG_LEVEL, file=LOG_FILE) print("=" * 60) print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...") print("=" * 60) # Load presets presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) print("✅ Configured Agent Presets (Skills Boundaries)") # Browser initialization removed to save resources store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) # Instantiate two distinct LLM orchestrators qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface # 用户指示使用 OpenRouter 代理的 Claude 4.5 claude_model = "anthropic/claude-4.5-sonnet" args.use_claude_sdk = False # 禁用纯 Native SDK 模式,走内部通用 AgentRunner (即可对接 OpenRouter) from agent.llm.openrouter import create_openrouter_llm_call claude_llm_call = create_openrouter_llm_call(model=claude_model) runner_qwen = AgentRunner( trace_store=store, llm_call=create_qwen_llm_call(model=qwen_model), skills_dir=SKILLS_DIR ) runner_claude = AgentRunner( trace_store=store, llm_call=claude_llm_call, skills_dir=SKILLS_DIR ) try: start_time = time.time() total_cost = 0.0 costs_breakdown = {} global_errors = [] strategy_file = None existing_platforms = [] if raw_cases_dir.exists(): for f in raw_cases_dir.glob("case_*.json"): plat = f.stem.replace("case_", "") if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]: existing_platforms.append(plat) platforms = [p.strip() for p in args.platforms.split(",") if p.strip()] needed_count = max(0, 4 - len(existing_platforms)) # Phase 0: Dynamic Routing if not args.skip_research: if needed_count == 0: print(f"\n--- Phase 0: Skipping Routing (Already have {len(existing_platforms)} existing cases: {existing_platforms}) ---") platforms = [] else: print(f"\n--- Phase 0: Dynamic Platform Routing ({qwen_model}) ---") print(f"📡 Found existing cases: {existing_platforms}. Requesting {needed_count} new platforms...") try: router_prompt = SimplePrompt(base_dir / "prompts" / "router.prompt") rmessages = router_prompt.build_messages( requirement=requirement, existing_platforms=",".join(existing_platforms) if existing_platforms else "无", needed_count=str(needed_count) ) rconfig = RunConfig( model=qwen_model, temperature=0.1, name="P0_Router", agent_type="router", knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) print(f"🚀 [Launch] P0_Router calculating optimal platforms...") router_response = "" async for item in runner_qwen.run(messages=rmessages, config=rconfig): if isinstance(item, Message) and item.role == "assistant" and isinstance(item.content, dict): text = item.content.get("text", "") if text and not item.content.get("tool_calls"): router_response = text if isinstance(item, Trace) and item.status == "completed": total_cost += item.total_cost costs_breakdown["P0_Router"] = round(item.total_cost, 4) if router_response: import re # Extract all alphabetic words from the response to handle extra text or markdown words = set(re.findall(r'\b[a-z]+\b', router_response.lower())) valid_platforms = ["xhs", "youtube", "bili", "x", "zhihu", "gzh"] # Intersect words with valid platforms (direct exact word matching, exclude existing) final_platforms = [p for p in valid_platforms if p in words and p not in existing_platforms] if final_platforms: platforms = final_platforms[:needed_count] print(f"🎯 [Router Decision] Selected {len(platforms)} new platforms: {platforms}") else: platforms = [p for p in platforms if p not in existing_platforms][:needed_count] print(f"⚠️ [Router Fallback] Invalid output '{router_response}'. Using delta default: {platforms}") except Exception as e: platforms = [p for p in platforms if p not in existing_platforms][:needed_count] print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}") is_single_step = args.restart_mode.startswith("single_") # Phase 1: MAP (Parallel Search) uses Qwen if not args.skip_research: print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---") phase1_tasks = [] for p in platforms: task_desc = f"渠道:{p.upper()}。核心需求:{requirement}" out_file = str(raw_cases_dir / f"case_{p}.json") kwargs = { "task": task_desc, "output_file": out_file } phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=is_single_step)) phase1_results = await asyncio.gather(*phase1_tasks) phase1_trace_ids = {} for (task_cost, task_errors, trace_id), p in zip(phase1_results, platforms): total_cost += task_cost costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4) phase1_trace_ids[f"P1_Research_{p}"] = trace_id global_errors.extend(task_errors) # Check if cases actually got written expected_file = Path(raw_cases_dir / f"case_{p}.json") if not expected_file.exists(): err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving." print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) # Phase 1.5: Extract raw post data from cache → raw_cases/source.json try: from examples.process_pipeline.script.extract_sources import extract_sources_to_json # 使用本次 pipeline 的 trace_ids 精确定位 cache 文件 trace_id_list = [tid for tid in phase1_trace_ids.values() if tid] src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list) print( f"📎 [Source Extraction] " f"existing={src_stats['total_existing']} " f"new={src_stats['total_matched']} " f"total={src_stats['total_existing'] + src_stats['total_matched']} " f"→ {raw_cases_dir / 'source.json'}" ) except Exception as e: err_msg = f"Source extraction failed: {type(e).__name__}: {e}" print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) # Phase 1.6: Process sources with Claude → case_detailed.json source_file = raw_cases_dir / "source.json" if source_file.exists(): try: from examples.process_pipeline.script.process_sources import process_sources print(f"\n--- Phase 1.6: Workflow Extraction ({claude_model}) ---") detailed_file = raw_cases_dir / "case_detailed.json" workflow_stats = await process_sources( source_file, detailed_file, claude_llm_call, model=claude_model, max_concurrent=3 ) total_cost += workflow_stats.get("total_cost", 0.0) costs_breakdown["P1.6_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4) print( f"🔍 [Workflow Extraction] " f"success={workflow_stats['success']} " f"failed={workflow_stats['failed']} " f"→ {detailed_file}" ) except Exception as e: err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}" print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) else: print("\n⏭️ [Skip] Phase 1 Skipped via --skip-research. Using existing cases...") # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude if not args.research_only: # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude print(f"\n--- Phase 2: Parallel Distillation ({claude_model}) ---") blueprint_file = str(output_dir / "blueprint.json") capabilities_file = str(output_dir / "capabilities_extracted.json") raw_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/") task_a = None task_b = None force_strategy_rerun = False if Path(blueprint_file).exists(): print(f"⏭️ [Skip P2] blueprint.json already exists. Skipping P2_FilterBlueprint.") else: force_strategy_rerun = True if args.use_claude_sdk: print(" > Using [Anthropic SDK Core] for P2_FilterBlueprint") task_a = run_anthropic_sdk_task("filter_and_blueprint", { "requirement": requirement, "raw_files_glob": raw_glob, "output_file": blueprint_file }, "P2_FilterBlueprint", claude_model, skip_validation=is_single_step) else: print(" > Using [AgentRunner Core] for P2_FilterBlueprint") task_a = run_agent_task(runner_claude, "filter_and_blueprint", { "requirement": requirement, "raw_files_glob": raw_glob, "output_file": blueprint_file }, "P2_FilterBlueprint", claude_model, skip_validation=is_single_step) if Path(capabilities_file).exists(): print(f"⏭️ [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.") else: force_strategy_rerun = True if args.use_claude_sdk: print(" > Using [Anthropic SDK Core] for P2_ExtractCaps") task_b = run_anthropic_sdk_task("extract_capabilities", { "requirement": requirement, "raw_files_glob": raw_glob, "output_file": capabilities_file }, "P2_ExtractCaps", claude_model, skip_validation=is_single_step) else: print(" > Using [AgentRunner Core] for P2_ExtractCaps") task_b = run_agent_task(runner_claude, "extract_capabilities", { "requirement": requirement, "raw_files_glob": raw_glob, "output_file": capabilities_file }, "P2_ExtractCaps", claude_model, skip_validation=is_single_step) to_await = [] names_await = [] if task_a: to_await.append(task_a) names_await.append("P2_FilterBlueprint") if task_b: to_await.append(task_b) names_await.append("P2_ExtractCaps") if to_await: phase2_results = await asyncio.gather(*to_await) phase2_trace_ids = {} for (cost, errs, trace_id), t_name in zip(phase2_results, names_await): total_cost += cost costs_breakdown[t_name] = round(cost, 4) phase2_trace_ids[t_name] = trace_id global_errors.extend(errs) # Phase 3: REDUCE 2 (Final Assembly) uses Claude print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---") strategy_file = str(output_dir / "strategy.json") if args.restart_mode == "single": force_strategy_rerun = False if Path(strategy_file).exists() and not force_strategy_rerun: print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.") else: if Path(strategy_file).exists(): print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...") if args.use_claude_sdk: print(" > Using [Anthropic SDK Core]") phase3_cost, phase3_errs, phase3_trace_id = await run_anthropic_sdk_task("assemble_strategy", { "requirement": requirement, "blueprint_file": blueprint_file, "capabilities_file": capabilities_file, "output_file": strategy_file }, "P3_Assembler", claude_model, skip_validation=is_single_step) else: print(" > Using [AgentRunner Core]") phase3_cost, phase3_errs, phase3_trace_id = await run_agent_task(runner_claude, "assemble_strategy", { "requirement": requirement, "blueprint_file": blueprint_file, "capabilities_file": capabilities_file, "output_file": strategy_file }, "P3_Assembler", claude_model, skip_validation=is_single_step) total_cost += phase3_cost costs_breakdown["P3_Assembler"] = round(phase3_cost, 4) global_errors.extend(phase3_errs) else: print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---") end_time = time.time() elapsed_sec = end_time - start_time # Save Metrics metrics_file = base_dir / "run_metrics.json" metrics_data = [] if metrics_file.exists(): with open(metrics_file, "r", encoding="utf-8") as f: try: metrics_data = json.load(f) except json.JSONDecodeError: pass # Collect trace_ids from all phases trace_ids = {} if not args.skip_research and 'phase1_trace_ids' in dir(): trace_ids.update(phase1_trace_ids) if not args.research_only: if 'phase2_trace_ids' in dir(): trace_ids.update(phase2_trace_ids) if 'phase3_trace_id' in dir() and phase3_trace_id: trace_ids["P3_Assembler"] = phase3_trace_id metrics_data.append({ "index": args.index, "requirement": requirement[:80] + "...", "duration_seconds": round(elapsed_sec, 2), "total_cost_usd": round(total_cost, 4), "costs_breakdown": costs_breakdown, "trace_ids": trace_ids, "errors": global_errors, "timestamp": datetime.now().isoformat() }) with open(metrics_file, "w", encoding="utf-8") as f: json.dump(metrics_data, f, indent=2, ensure_ascii=False) print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}") finally: pass print("✅ Pipeline run finished.") if strategy_file: print("✅ Strategy saved to:", strategy_file) if __name__ == "__main__": asyncio.run(main())