run_pipeline.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import sys
  8. import time
  9. from datetime import datetime
  10. from pathlib import Path
  11. # Add project root to path
  12. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  13. from dotenv import load_dotenv
  14. load_dotenv()
  15. from agent.llm.prompts import SimplePrompt
  16. from agent.core.runner import AgentRunner, RunConfig
  17. from agent.tools.builtin.knowledge import KnowledgeConfig
  18. from agent.trace import FileSystemTraceStore, Trace, Message
  19. from agent.llm import create_qwen_llm_call
  20. from agent.llm.openrouter import create_openrouter_llm_call
  21. from agent.llm.claude import create_claude_llm_call
  22. # config from existing setup
  23. from examples.process_research.config import (
  24. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  25. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  26. )
  27. from agent.utils import setup_logging
  28. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str):
  29. base_dir = Path(__file__).parent
  30. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  31. prompt = SimplePrompt(prompt_path)
  32. messages = prompt.build_messages(**kwargs)
  33. target_tools = []
  34. if prompt_name == "extract_capabilities":
  35. target_tools = ["capability_search", "capability_list", "tool_search"]
  36. # 按 agent 类型配置工具组权限
  37. tool_groups_map = {
  38. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  39. "filter_and_blueprint": ["core"], # 只需文件读写
  40. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  41. "assemble_strategy": ["core"], # 只需文件读写
  42. }
  43. run_config = RunConfig(
  44. model=prompt.config.get("model") or model_name,
  45. temperature=prompt.config.get("temperature") or 0.3,
  46. name=task_name,
  47. agent_type=prompt_name,
  48. tools=target_tools,
  49. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  50. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  51. )
  52. task_cost = 0.0
  53. task_errors = []
  54. last_trace_id = None
  55. print(f"🚀 [Launch] {task_name}")
  56. try:
  57. async for item in runner.run(messages=messages, config=run_config):
  58. if isinstance(item, Trace):
  59. last_trace_id = item.trace_id
  60. if item.status == "completed":
  61. print(f"✅ [Done] {task_name} (Cost: ${item.total_cost:.4f})")
  62. task_cost = item.total_cost
  63. elif item.status == "failed":
  64. print(f"❌ [Fail] {task_name}: {item.error_message}")
  65. task_errors.append(f"{task_name} Failed: {item.error_message}")
  66. if isinstance(item, Message):
  67. if item.role == "tool":
  68. content = item.content if isinstance(item.content, dict) else {}
  69. t_name = content.get("tool_name", "unknown")
  70. if t_name in ("write_file", "write_json"):
  71. print(f" 💾 [File Written by {task_name}]")
  72. except Exception as e:
  73. err_msg = f"{type(e).__name__}: {e}"
  74. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  75. task_errors.append(f"{task_name} crashed: {err_msg}")
  76. # Verification & Recovery block
  77. out_file = kwargs.get("output_file")
  78. if out_file and not Path(out_file).exists() and last_trace_id:
  79. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  80. recovery_messages = [{
  81. "role": "user",
  82. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  83. }]
  84. rec_config = RunConfig(
  85. model=model_name,
  86. temperature=0.1,
  87. name=task_name + "_Rec",
  88. agent_type=prompt_name,
  89. trace_id=last_trace_id,
  90. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  91. )
  92. try:
  93. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  94. if isinstance(r_item, Trace):
  95. if r_item.status == "completed":
  96. task_cost += r_item.total_cost
  97. elif r_item.status == "failed":
  98. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  99. if isinstance(r_item, Message) and r_item.role == "tool":
  100. content = r_item.content if isinstance(r_item.content, dict) else {}
  101. if content.get("tool_name") in ("write_file", "write_json"):
  102. print(f" 💾 [Recovery File Written by {task_name}]")
  103. except Exception as e:
  104. err_msg = f"{type(e).__name__}: {e}"
  105. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  106. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  107. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  108. try:
  109. with open(out_file, "r", encoding="utf-8") as f:
  110. json.loads(f.read())
  111. print(f" ✅ [JSON Validated] {Path(out_file).name}")
  112. except json.JSONDecodeError as e:
  113. err_msg = f"Invalid JSON syntax in {Path(out_file).name}: {e}"
  114. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  115. task_errors.append(f"{task_name} JSON Error: {e}")
  116. return task_cost, task_errors
  117. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str):
  118. """
  119. 备用:使用纯净的官方 Anthropic SDK 驱动。
  120. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  121. """
  122. from anthropic import AsyncAnthropic
  123. from agent.tools.registry import get_tool_registry
  124. base_dir = Path(__file__).parent
  125. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  126. prompt = SimplePrompt(prompt_path)
  127. # 1. 组装输入 Message
  128. raw_messages = prompt.build_messages(**kwargs)
  129. system_prompt = ""
  130. messages = []
  131. for msg in raw_messages:
  132. if msg["role"] == "system":
  133. system_prompt += msg["content"] + "\n\n"
  134. else:
  135. messages.append({"role": msg["role"], "content": msg["content"]})
  136. # 2. 映射目标工具
  137. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  138. if prompt_name == "extract_capabilities":
  139. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  140. registry = get_tool_registry()
  141. schemas = registry.get_schemas(target_tools)
  142. anthropic_tools = []
  143. for s in schemas:
  144. anthropic_tools.append({
  145. "name": s["function"]["name"],
  146. "description": s["function"].get("description", ""),
  147. "input_schema": s["function"]["parameters"]
  148. })
  149. # 3. 初始化并开启 Loop
  150. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  151. client = AsyncAnthropic()
  152. task_cost = 0.0
  153. task_errors = []
  154. print(f"🚀 [Launch Anthropic SDK] {task_name}")
  155. max_loops = 50
  156. for loop_idx in range(max_loops):
  157. try:
  158. # 去除前缀(兼容比如 openrouter 传入的名字)
  159. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  160. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
  161. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  162. response = await client.messages.create(
  163. model=target_model,
  164. max_tokens=4096,
  165. temperature=0.2,
  166. system=system_prompt,
  167. messages=messages,
  168. tools=anthropic_tools
  169. )
  170. # (简略预估,不代表真实官方开销)
  171. if hasattr(response, 'usage'):
  172. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  173. task_cost += step_cost
  174. # 加入助手回复
  175. assistant_content = []
  176. tool_uses = []
  177. for content_block in response.content:
  178. if content_block.type == "text":
  179. text_val = content_block.text
  180. if text_val:
  181. assistant_content.append({"type": "text", "text": text_val})
  182. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  183. elif content_block.type == "tool_use":
  184. assistant_content.append({
  185. "type": "tool_use",
  186. "id": content_block.id,
  187. "name": content_block.name,
  188. "input": content_block.input
  189. })
  190. tool_uses.append(content_block)
  191. if not assistant_content:
  192. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  193. messages.append({"role": "assistant", "content": assistant_content})
  194. # 出口:没有调用工具说明任务结束
  195. if not tool_uses:
  196. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${task_cost:.4f})")
  197. break
  198. # 工具执行与回传
  199. tool_results = []
  200. for tu in tool_uses:
  201. if tu.name in ("write_file", "write_json"):
  202. print(f" 💾 [File Written by SDK] {task_name}")
  203. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  204. # 执行本地环境的函数
  205. result_str = await registry.execute(tu.name, tu.input)
  206. tool_results.append({
  207. "type": "tool_result",
  208. "tool_use_id": tu.id,
  209. "content": result_str
  210. })
  211. messages.append({"role": "user", "content": tool_results})
  212. except Exception as e:
  213. err_msg = str(e)
  214. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  215. task_errors.append(err_msg)
  216. break
  217. # Verification & Recovery block for SDK (porting from AgentRunner)
  218. out_file = kwargs.get("output_file")
  219. if out_file and not Path(out_file).exists():
  220. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  221. messages.append({
  222. "role": "user",
  223. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  224. })
  225. try:
  226. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  227. rec_response = await client.messages.create(
  228. model=target_model,
  229. max_tokens=4096,
  230. temperature=0.1,
  231. system=system_prompt,
  232. messages=messages,
  233. tools=anthropic_tools,
  234. tool_choice={"type": "any"}
  235. )
  236. for content_block in rec_response.content:
  237. if content_block.type == "tool_use":
  238. if content_block.name in ("write_file", "write_json"):
  239. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  240. await registry.execute(content_block.name, content_block.input)
  241. except Exception as e:
  242. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  243. task_errors.append(str(e))
  244. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  245. try:
  246. with open(out_file, "r", encoding="utf-8") as f:
  247. json.loads(f.read())
  248. print(f" ✅ [JSON Validated] {Path(out_file).name}")
  249. except json.JSONDecodeError as e:
  250. err_msg = f"Invalid JSON syntax in {Path(out_file).name}: {e}"
  251. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  252. task_errors.append(f"{task_name} JSON Error: {e}")
  253. return task_cost, task_errors
  254. async def main():
  255. parser = argparse.ArgumentParser()
  256. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  257. parser.add_argument("--skip-research", action="store_true", help="Skip Phase 1 and use existing raw cases")
  258. parser.add_argument("--research-only", action="store_true", help="Only run research phases, skip Phase 2 and 3")
  259. parser.add_argument("--platforms", type=str, default="xhs,youtube,bili,x", help="Comma-separated list of platforms to search")
  260. parser.add_argument("--use-claude-sdk", action="store_true", help="Use pure Anthropic SDK (run_anthropic_sdk_task) instead of internal AgentRunner for Phase 2/3")
  261. args = parser.parse_args()
  262. base_dir = Path(__file__).parent
  263. # Load requirements locally
  264. req_path = base_dir / "db_requirements.json"
  265. with open(req_path, encoding='utf-8') as f:
  266. reqs = json.load(f)
  267. if args.index < 0 or args.index >= len(reqs):
  268. print("Index out of bounds")
  269. sys.exit(1)
  270. requirement = reqs[args.index]
  271. # 0. Setup directories
  272. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  273. output_dir.mkdir(parents=True, exist_ok=True)
  274. raw_cases_dir = output_dir / "raw_cases"
  275. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  276. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  277. print("=" * 60)
  278. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  279. print("=" * 60)
  280. # Load presets
  281. presets_path = base_dir / "presets.json"
  282. if presets_path.exists():
  283. from agent.core.presets import load_presets_from_json
  284. load_presets_from_json(str(presets_path))
  285. print("✅ Configured Agent Presets (Skills Boundaries)")
  286. # Browser initialization removed to save resources
  287. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  288. # Instantiate two distinct LLM orchestrators
  289. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  290. # 用户指示使用 OpenRouter 代理的 Claude 4.5
  291. claude_model = "anthropic/claude-4.5-sonnet"
  292. args.use_claude_sdk = False # 禁用纯 Native SDK 模式,走内部通用 AgentRunner (即可对接 OpenRouter)
  293. from agent.llm.openrouter import create_openrouter_llm_call
  294. claude_llm_call = create_openrouter_llm_call(model=claude_model)
  295. runner_qwen = AgentRunner(
  296. trace_store=store,
  297. llm_call=create_qwen_llm_call(model=qwen_model),
  298. skills_dir=SKILLS_DIR
  299. )
  300. runner_claude = AgentRunner(
  301. trace_store=store,
  302. llm_call=claude_llm_call,
  303. skills_dir=SKILLS_DIR
  304. )
  305. try:
  306. start_time = time.time()
  307. total_cost = 0.0
  308. costs_breakdown = {}
  309. global_errors = []
  310. strategy_file = None
  311. existing_platforms = []
  312. if raw_cases_dir.exists():
  313. for f in raw_cases_dir.glob("case_*.json"):
  314. plat = f.stem.replace("case_", "")
  315. if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
  316. existing_platforms.append(plat)
  317. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  318. needed_count = max(0, 4 - len(existing_platforms))
  319. # Phase 0: Dynamic Routing
  320. if not args.skip_research:
  321. if needed_count == 0:
  322. print(f"\n--- Phase 0: Skipping Routing (Already have {len(existing_platforms)} existing cases: {existing_platforms}) ---")
  323. platforms = []
  324. else:
  325. print(f"\n--- Phase 0: Dynamic Platform Routing ({qwen_model}) ---")
  326. print(f"📡 Found existing cases: {existing_platforms}. Requesting {needed_count} new platforms...")
  327. try:
  328. router_prompt = SimplePrompt(base_dir / "prompts" / "router.prompt")
  329. rmessages = router_prompt.build_messages(
  330. requirement=requirement,
  331. existing_platforms=",".join(existing_platforms) if existing_platforms else "无",
  332. needed_count=str(needed_count)
  333. )
  334. rconfig = RunConfig(
  335. model=qwen_model,
  336. temperature=0.1,
  337. name="P0_Router",
  338. agent_type="router",
  339. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  340. )
  341. print(f"🚀 [Launch] P0_Router calculating optimal platforms...")
  342. router_response = ""
  343. async for item in runner_qwen.run(messages=rmessages, config=rconfig):
  344. if isinstance(item, Message) and item.role == "assistant" and isinstance(item.content, dict):
  345. text = item.content.get("text", "")
  346. if text and not item.content.get("tool_calls"):
  347. router_response = text
  348. if isinstance(item, Trace) and item.status == "completed":
  349. total_cost += item.total_cost
  350. costs_breakdown["P0_Router"] = round(item.total_cost, 4)
  351. if router_response:
  352. import re
  353. # Extract all alphabetic words from the response to handle extra text or markdown
  354. words = set(re.findall(r'\b[a-z]+\b', router_response.lower()))
  355. valid_platforms = ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]
  356. # Intersect words with valid platforms (direct exact word matching, exclude existing)
  357. final_platforms = [p for p in valid_platforms if p in words and p not in existing_platforms]
  358. if final_platforms:
  359. platforms = final_platforms[:needed_count]
  360. print(f"🎯 [Router Decision] Selected {len(platforms)} new platforms: {platforms}")
  361. else:
  362. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  363. print(f"⚠️ [Router Fallback] Invalid output '{router_response}'. Using delta default: {platforms}")
  364. except Exception as e:
  365. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  366. print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}")
  367. # Phase 1: MAP (Parallel Search) uses Qwen
  368. if not args.skip_research:
  369. print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
  370. phase1_tasks = []
  371. for p in platforms:
  372. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  373. out_file = str(raw_cases_dir / f"case_{p}.json")
  374. kwargs = {
  375. "task": task_desc,
  376. "output_file": out_file
  377. }
  378. phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model))
  379. phase1_results = await asyncio.gather(*phase1_tasks)
  380. for (task_cost, task_errors), p in zip(phase1_results, platforms):
  381. total_cost += task_cost
  382. costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
  383. global_errors.extend(task_errors)
  384. # Check if cases actually got written
  385. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  386. if not expected_file.exists():
  387. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  388. print(f"⚠️ [Warning] {err_msg}")
  389. global_errors.append(err_msg)
  390. else:
  391. print("\n⏭️ [Skip] Phase 1 Skipped via --skip-research. Using existing cases...")
  392. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  393. if not args.research_only:
  394. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  395. print(f"\n--- Phase 2: Parallel Distillation ({claude_model}) ---")
  396. blueprint_file = str(output_dir / "blueprint.json")
  397. capabilities_file = str(output_dir / "capabilities_extracted.json")
  398. raw_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  399. task_a = None
  400. task_b = None
  401. force_strategy_rerun = False
  402. if Path(blueprint_file).exists():
  403. print(f"⏭️ [Skip P2] blueprint.json already exists. Skipping P2_FilterBlueprint.")
  404. else:
  405. force_strategy_rerun = True
  406. if args.use_claude_sdk:
  407. print(" > Using [Anthropic SDK Core] for P2_FilterBlueprint")
  408. task_a = run_anthropic_sdk_task("filter_and_blueprint", {
  409. "requirement": requirement,
  410. "raw_files_glob": raw_glob,
  411. "output_file": blueprint_file
  412. }, "P2_FilterBlueprint", claude_model)
  413. else:
  414. print(" > Using [AgentRunner Core] for P2_FilterBlueprint")
  415. task_a = run_agent_task(runner_claude, "filter_and_blueprint", {
  416. "requirement": requirement,
  417. "raw_files_glob": raw_glob,
  418. "output_file": blueprint_file
  419. }, "P2_FilterBlueprint", claude_model)
  420. if Path(capabilities_file).exists():
  421. print(f"⏭️ [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.")
  422. else:
  423. force_strategy_rerun = True
  424. if args.use_claude_sdk:
  425. print(" > Using [Anthropic SDK Core] for P2_ExtractCaps")
  426. task_b = run_anthropic_sdk_task("extract_capabilities", {
  427. "requirement": requirement,
  428. "raw_files_glob": raw_glob,
  429. "output_file": capabilities_file
  430. }, "P2_ExtractCaps", claude_model)
  431. else:
  432. print(" > Using [AgentRunner Core] for P2_ExtractCaps")
  433. task_b = run_agent_task(runner_claude, "extract_capabilities", {
  434. "requirement": requirement,
  435. "raw_files_glob": raw_glob,
  436. "output_file": capabilities_file
  437. }, "P2_ExtractCaps", claude_model)
  438. to_await = []
  439. names_await = []
  440. if task_a:
  441. to_await.append(task_a)
  442. names_await.append("P2_FilterBlueprint")
  443. if task_b:
  444. to_await.append(task_b)
  445. names_await.append("P2_ExtractCaps")
  446. if to_await:
  447. phase2_results = await asyncio.gather(*to_await)
  448. for (cost, errs), t_name in zip(phase2_results, names_await):
  449. total_cost += cost
  450. costs_breakdown[t_name] = round(cost, 4)
  451. global_errors.extend(errs)
  452. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  453. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  454. strategy_file = str(output_dir / "strategy.json")
  455. if Path(strategy_file).exists() and not force_strategy_rerun:
  456. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  457. else:
  458. if Path(strategy_file).exists():
  459. print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
  460. if args.use_claude_sdk:
  461. print(" > Using [Anthropic SDK Core]")
  462. phase3_cost, phase3_errs = await run_anthropic_sdk_task("assemble_strategy", {
  463. "requirement": requirement,
  464. "blueprint_file": blueprint_file,
  465. "capabilities_file": capabilities_file,
  466. "output_file": strategy_file
  467. }, "P3_Assembler", claude_model)
  468. else:
  469. print(" > Using [AgentRunner Core]")
  470. phase3_cost, phase3_errs = await run_agent_task(runner_claude, "assemble_strategy", {
  471. "requirement": requirement,
  472. "blueprint_file": blueprint_file,
  473. "capabilities_file": capabilities_file,
  474. "output_file": strategy_file
  475. }, "P3_Assembler", claude_model)
  476. total_cost += phase3_cost
  477. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  478. global_errors.extend(phase3_errs)
  479. else:
  480. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  481. end_time = time.time()
  482. elapsed_sec = end_time - start_time
  483. # Save Metrics
  484. metrics_file = base_dir / "run_metrics.json"
  485. metrics_data = []
  486. if metrics_file.exists():
  487. with open(metrics_file, "r", encoding="utf-8") as f:
  488. try:
  489. metrics_data = json.load(f)
  490. except json.JSONDecodeError:
  491. pass
  492. metrics_data.append({
  493. "index": args.index,
  494. "requirement": requirement[:80] + "...",
  495. "duration_seconds": round(elapsed_sec, 2),
  496. "total_cost_usd": round(total_cost, 4),
  497. "costs_breakdown": costs_breakdown,
  498. "errors": global_errors,
  499. "timestamp": datetime.now().isoformat()
  500. })
  501. with open(metrics_file, "w", encoding="utf-8") as f:
  502. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  503. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  504. finally:
  505. pass
  506. print("✅ Pipeline run finished.")
  507. if strategy_file:
  508. print("✅ Strategy saved to:", strategy_file)
  509. if __name__ == "__main__":
  510. asyncio.run(main())