run_pipeline.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import sys
  8. import time
  9. from datetime import datetime
  10. from pathlib import Path
  11. # Add project root to path
  12. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  13. from dotenv import load_dotenv
  14. load_dotenv()
  15. from agent.llm.prompts import SimplePrompt
  16. from agent.core.runner import AgentRunner, RunConfig
  17. from agent.tools.builtin.knowledge import KnowledgeConfig
  18. from agent.trace import FileSystemTraceStore, Trace, Message
  19. from agent.llm import create_qwen_llm_call
  20. from agent.llm.openrouter import create_openrouter_llm_call
  21. from agent.llm.claude import create_claude_llm_call
  22. # config from existing setup
  23. from examples.process_research.config import (
  24. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  25. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  26. )
  27. from agent.utils import setup_logging
  28. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str):
  29. base_dir = Path(__file__).parent
  30. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  31. prompt = SimplePrompt(prompt_path)
  32. messages = prompt.build_messages(**kwargs)
  33. target_tools = []
  34. if prompt_name == "extract_capabilities":
  35. target_tools = ["capability_search", "capability_list", "tool_search"]
  36. # 按 agent 类型配置工具组权限
  37. tool_groups_map = {
  38. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  39. "filter_and_blueprint": ["core"], # 只需文件读写
  40. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  41. "assemble_strategy": ["core"], # 只需文件读写
  42. }
  43. run_config = RunConfig(
  44. model=prompt.config.get("model") or model_name,
  45. temperature=prompt.config.get("temperature") or 0.3,
  46. name=task_name,
  47. agent_type=prompt_name,
  48. tools=target_tools,
  49. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  50. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  51. )
  52. task_cost = 0.0
  53. task_errors = []
  54. last_trace_id = None
  55. print(f"🚀 [Launch] {task_name}")
  56. try:
  57. async for item in runner.run(messages=messages, config=run_config):
  58. if isinstance(item, Trace):
  59. last_trace_id = item.trace_id
  60. if item.status == "completed":
  61. print(f"✅ [Done] {task_name} (Cost: ${item.total_cost:.4f})")
  62. task_cost = item.total_cost
  63. elif item.status == "failed":
  64. print(f"❌ [Fail] {task_name}: {item.error_message}")
  65. task_errors.append(f"{task_name} Failed: {item.error_message}")
  66. if isinstance(item, Message):
  67. if item.role == "tool":
  68. content = item.content if isinstance(item.content, dict) else {}
  69. t_name = content.get("tool_name", "unknown")
  70. if t_name in ("write_file", "write_json"):
  71. print(f" 💾 [File Written by {task_name}]")
  72. except Exception as e:
  73. err_msg = f"{type(e).__name__}: {e}"
  74. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  75. task_errors.append(f"{task_name} crashed: {err_msg}")
  76. # Verification & Recovery block
  77. out_file = kwargs.get("output_file")
  78. if out_file and not Path(out_file).exists() and last_trace_id:
  79. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  80. recovery_messages = [{
  81. "role": "user",
  82. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  83. }]
  84. rec_config = RunConfig(
  85. model=model_name,
  86. temperature=0.1,
  87. name=task_name + "_Rec",
  88. agent_type=prompt_name,
  89. trace_id=last_trace_id,
  90. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  91. )
  92. try:
  93. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  94. if isinstance(r_item, Trace):
  95. if r_item.status == "completed":
  96. task_cost += r_item.total_cost
  97. elif r_item.status == "failed":
  98. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  99. if isinstance(r_item, Message) and r_item.role == "tool":
  100. content = r_item.content if isinstance(r_item.content, dict) else {}
  101. if content.get("tool_name") in ("write_file", "write_json"):
  102. print(f" 💾 [Recovery File Written by {task_name}]")
  103. except Exception as e:
  104. err_msg = f"{type(e).__name__}: {e}"
  105. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  106. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  107. return task_cost, task_errors
  108. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str):
  109. """
  110. 备用:使用纯净的官方 Anthropic SDK 驱动。
  111. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  112. """
  113. from anthropic import AsyncAnthropic
  114. from agent.tools.registry import get_tool_registry
  115. base_dir = Path(__file__).parent
  116. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  117. prompt = SimplePrompt(prompt_path)
  118. # 1. 组装输入 Message
  119. raw_messages = prompt.build_messages(**kwargs)
  120. system_prompt = ""
  121. messages = []
  122. for msg in raw_messages:
  123. if msg["role"] == "system":
  124. system_prompt += msg["content"] + "\n\n"
  125. else:
  126. messages.append({"role": msg["role"], "content": msg["content"]})
  127. # 2. 映射目标工具
  128. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  129. if prompt_name == "extract_capabilities":
  130. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  131. registry = get_tool_registry()
  132. schemas = registry.get_schemas(target_tools)
  133. anthropic_tools = []
  134. for s in schemas:
  135. anthropic_tools.append({
  136. "name": s["function"]["name"],
  137. "description": s["function"].get("description", ""),
  138. "input_schema": s["function"]["parameters"]
  139. })
  140. # 3. 初始化并开启 Loop
  141. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  142. client = AsyncAnthropic()
  143. task_cost = 0.0
  144. task_errors = []
  145. print(f"🚀 [Launch Anthropic SDK] {task_name}")
  146. max_loops = 50
  147. for loop_idx in range(max_loops):
  148. try:
  149. # 去除前缀(兼容比如 openrouter 传入的名字)
  150. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  151. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
  152. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  153. response = await client.messages.create(
  154. model=target_model,
  155. max_tokens=4096,
  156. temperature=0.2,
  157. system=system_prompt,
  158. messages=messages,
  159. tools=anthropic_tools
  160. )
  161. # (简略预估,不代表真实官方开销)
  162. if hasattr(response, 'usage'):
  163. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  164. task_cost += step_cost
  165. # 加入助手回复
  166. assistant_content = []
  167. tool_uses = []
  168. for content_block in response.content:
  169. if content_block.type == "text":
  170. text_val = content_block.text
  171. if text_val:
  172. assistant_content.append({"type": "text", "text": text_val})
  173. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  174. elif content_block.type == "tool_use":
  175. assistant_content.append({
  176. "type": "tool_use",
  177. "id": content_block.id,
  178. "name": content_block.name,
  179. "input": content_block.input
  180. })
  181. tool_uses.append(content_block)
  182. if not assistant_content:
  183. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  184. messages.append({"role": "assistant", "content": assistant_content})
  185. # 出口:没有调用工具说明任务结束
  186. if not tool_uses:
  187. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${task_cost:.4f})")
  188. break
  189. # 工具执行与回传
  190. tool_results = []
  191. for tu in tool_uses:
  192. if tu.name in ("write_file", "write_json"):
  193. print(f" 💾 [File Written by SDK] {task_name}")
  194. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  195. # 执行本地环境的函数
  196. result_str = await registry.execute(tu.name, tu.input)
  197. tool_results.append({
  198. "type": "tool_result",
  199. "tool_use_id": tu.id,
  200. "content": result_str
  201. })
  202. messages.append({"role": "user", "content": tool_results})
  203. except Exception as e:
  204. err_msg = str(e)
  205. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  206. task_errors.append(err_msg)
  207. break
  208. # Verification & Recovery block for SDK (porting from AgentRunner)
  209. out_file = kwargs.get("output_file")
  210. if out_file and not Path(out_file).exists():
  211. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  212. messages.append({
  213. "role": "user",
  214. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  215. })
  216. try:
  217. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  218. rec_response = await client.messages.create(
  219. model=target_model,
  220. max_tokens=4096,
  221. temperature=0.1,
  222. system=system_prompt,
  223. messages=messages,
  224. tools=anthropic_tools,
  225. tool_choice={"type": "any"}
  226. )
  227. for content_block in rec_response.content:
  228. if content_block.type == "tool_use":
  229. if content_block.name in ("write_file", "write_json"):
  230. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  231. await registry.execute(content_block.name, content_block.input)
  232. except Exception as e:
  233. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  234. task_errors.append(str(e))
  235. return task_cost, task_errors
  236. async def main():
  237. parser = argparse.ArgumentParser()
  238. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  239. parser.add_argument("--skip-research", action="store_true", help="Skip Phase 1 and use existing raw cases")
  240. parser.add_argument("--research-only", action="store_true", help="Only run research phases, skip Phase 2 and 3")
  241. parser.add_argument("--platforms", type=str, default="xhs,youtube,bili,x", help="Comma-separated list of platforms to search")
  242. parser.add_argument("--use-claude-sdk", action="store_true", help="Use pure Anthropic SDK (run_anthropic_sdk_task) instead of internal AgentRunner for Phase 2/3")
  243. args = parser.parse_args()
  244. base_dir = Path(__file__).parent
  245. # Load requirements locally
  246. req_path = base_dir / "db_requirements.json"
  247. with open(req_path, encoding='utf-8') as f:
  248. reqs = json.load(f)
  249. if args.index < 0 or args.index >= len(reqs):
  250. print("Index out of bounds")
  251. sys.exit(1)
  252. requirement = reqs[args.index]
  253. # 0. Setup directories
  254. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  255. output_dir.mkdir(parents=True, exist_ok=True)
  256. raw_cases_dir = output_dir / "raw_cases"
  257. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  258. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  259. print("=" * 60)
  260. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  261. print("=" * 60)
  262. # Load presets
  263. presets_path = base_dir / "presets.json"
  264. if presets_path.exists():
  265. from agent.core.presets import load_presets_from_json
  266. load_presets_from_json(str(presets_path))
  267. print("✅ Configured Agent Presets (Skills Boundaries)")
  268. # Browser initialization removed to save resources
  269. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  270. # Instantiate two distinct LLM orchestrators
  271. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  272. # 当前使用原生 Claude 接口 (走 ANTHROPIC_API_KEY),而非 OpenRouter
  273. claude_model = "claude-sonnet-4-5" # 切换回 4-5 模型别名
  274. from agent.llm.claude import create_claude_llm_call
  275. claude_llm_call = create_claude_llm_call(model=claude_model)
  276. runner_qwen = AgentRunner(
  277. trace_store=store,
  278. llm_call=create_qwen_llm_call(model=qwen_model),
  279. skills_dir=SKILLS_DIR
  280. )
  281. runner_claude = AgentRunner(
  282. trace_store=store,
  283. llm_call=claude_llm_call,
  284. skills_dir=SKILLS_DIR
  285. )
  286. try:
  287. start_time = time.time()
  288. total_cost = 0.0
  289. costs_breakdown = {}
  290. global_errors = []
  291. strategy_file = None
  292. existing_platforms = []
  293. if raw_cases_dir.exists():
  294. for f in raw_cases_dir.glob("case_*.json"):
  295. plat = f.stem.replace("case_", "")
  296. if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
  297. existing_platforms.append(plat)
  298. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  299. needed_count = max(0, 4 - len(existing_platforms))
  300. # Phase 0: Dynamic Routing
  301. if not args.skip_research:
  302. if needed_count == 0:
  303. print(f"\n--- Phase 0: Skipping Routing (Already have {len(existing_platforms)} existing cases: {existing_platforms}) ---")
  304. platforms = []
  305. else:
  306. print(f"\n--- Phase 0: Dynamic Platform Routing ({qwen_model}) ---")
  307. print(f"📡 Found existing cases: {existing_platforms}. Requesting {needed_count} new platforms...")
  308. try:
  309. router_prompt = SimplePrompt(base_dir / "prompts" / "router.prompt")
  310. rmessages = router_prompt.build_messages(
  311. requirement=requirement,
  312. existing_platforms=",".join(existing_platforms) if existing_platforms else "无",
  313. needed_count=str(needed_count)
  314. )
  315. rconfig = RunConfig(
  316. model=qwen_model,
  317. temperature=0.1,
  318. name="P0_Router",
  319. agent_type="router",
  320. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  321. )
  322. print(f"🚀 [Launch] P0_Router calculating optimal platforms...")
  323. router_response = ""
  324. async for item in runner_qwen.run(messages=rmessages, config=rconfig):
  325. if isinstance(item, Message) and item.role == "assistant" and isinstance(item.content, dict):
  326. text = item.content.get("text", "")
  327. if text and not item.content.get("tool_calls"):
  328. router_response = text
  329. if isinstance(item, Trace) and item.status == "completed":
  330. total_cost += item.total_cost
  331. costs_breakdown["P0_Router"] = round(item.total_cost, 4)
  332. if router_response:
  333. import re
  334. # Extract all alphabetic words from the response to handle extra text or markdown
  335. words = set(re.findall(r'\b[a-z]+\b', router_response.lower()))
  336. valid_platforms = ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]
  337. # Intersect words with valid platforms (direct exact word matching, exclude existing)
  338. final_platforms = [p for p in valid_platforms if p in words and p not in existing_platforms]
  339. if final_platforms:
  340. platforms = final_platforms[:needed_count]
  341. print(f"🎯 [Router Decision] Selected {len(platforms)} new platforms: {platforms}")
  342. else:
  343. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  344. print(f"⚠️ [Router Fallback] Invalid output '{router_response}'. Using delta default: {platforms}")
  345. except Exception as e:
  346. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  347. print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}")
  348. # Phase 1: MAP (Parallel Search) uses Qwen
  349. if not args.skip_research:
  350. print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
  351. phase1_tasks = []
  352. for p in platforms:
  353. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  354. out_file = str(raw_cases_dir / f"case_{p}.json")
  355. kwargs = {
  356. "task": task_desc,
  357. "output_file": out_file
  358. }
  359. phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model))
  360. phase1_results = await asyncio.gather(*phase1_tasks)
  361. for (task_cost, task_errors), p in zip(phase1_results, platforms):
  362. total_cost += task_cost
  363. costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
  364. global_errors.extend(task_errors)
  365. # Check if cases actually got written
  366. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  367. if not expected_file.exists():
  368. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  369. print(f"⚠️ [Warning] {err_msg}")
  370. global_errors.append(err_msg)
  371. else:
  372. print("\n⏭️ [Skip] Phase 1 Skipped via --skip-research. Using existing cases...")
  373. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  374. if not args.research_only:
  375. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  376. print(f"\n--- Phase 2: Parallel Distillation ({claude_model}) ---")
  377. blueprint_file = str(output_dir / "blueprint.json")
  378. capabilities_file = str(output_dir / "capabilities_extracted.json")
  379. raw_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  380. task_a = None
  381. task_b = None
  382. if Path(blueprint_file).exists():
  383. print(f"⏭️ [Skip P2] blueprint.json already exists. Skipping P2_FilterBlueprint.")
  384. else:
  385. if args.use_claude_sdk:
  386. print(" > Using [Anthropic SDK Core] for P2_FilterBlueprint")
  387. task_a = run_anthropic_sdk_task("filter_and_blueprint", {
  388. "requirement": requirement,
  389. "raw_files_glob": raw_glob,
  390. "output_file": blueprint_file
  391. }, "P2_FilterBlueprint", claude_model)
  392. else:
  393. print(" > Using [AgentRunner Core] for P2_FilterBlueprint")
  394. task_a = run_agent_task(runner_claude, "filter_and_blueprint", {
  395. "requirement": requirement,
  396. "raw_files_glob": raw_glob,
  397. "output_file": blueprint_file
  398. }, "P2_FilterBlueprint", claude_model)
  399. if Path(capabilities_file).exists():
  400. print(f"⏭️ [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.")
  401. else:
  402. if args.use_claude_sdk:
  403. print(" > Using [Anthropic SDK Core] for P2_ExtractCaps")
  404. task_b = run_anthropic_sdk_task("extract_capabilities", {
  405. "requirement": requirement,
  406. "raw_files_glob": raw_glob,
  407. "output_file": capabilities_file
  408. }, "P2_ExtractCaps", claude_model)
  409. else:
  410. print(" > Using [AgentRunner Core] for P2_ExtractCaps")
  411. task_b = run_agent_task(runner_claude, "extract_capabilities", {
  412. "requirement": requirement,
  413. "raw_files_glob": raw_glob,
  414. "output_file": capabilities_file
  415. }, "P2_ExtractCaps", claude_model)
  416. to_await = []
  417. names_await = []
  418. if task_a:
  419. to_await.append(task_a)
  420. names_await.append("P2_FilterBlueprint")
  421. if task_b:
  422. to_await.append(task_b)
  423. names_await.append("P2_ExtractCaps")
  424. if to_await:
  425. phase2_results = await asyncio.gather(*to_await)
  426. for (cost, errs), t_name in zip(phase2_results, names_await):
  427. total_cost += cost
  428. costs_breakdown[t_name] = round(cost, 4)
  429. global_errors.extend(errs)
  430. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  431. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  432. strategy_file = str(output_dir / "strategy.json")
  433. if Path(strategy_file).exists():
  434. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  435. else:
  436. if args.use_claude_sdk:
  437. print(" > Using [Anthropic SDK Core]")
  438. phase3_cost, phase3_errs = await run_anthropic_sdk_task("assemble_strategy", {
  439. "requirement": requirement,
  440. "blueprint_file": blueprint_file,
  441. "capabilities_file": capabilities_file,
  442. "output_file": strategy_file
  443. }, "P3_Assembler", claude_model)
  444. else:
  445. print(" > Using [AgentRunner Core]")
  446. phase3_cost, phase3_errs = await run_agent_task(runner_claude, "assemble_strategy", {
  447. "requirement": requirement,
  448. "blueprint_file": blueprint_file,
  449. "capabilities_file": capabilities_file,
  450. "output_file": strategy_file
  451. }, "P3_Assembler", claude_model)
  452. total_cost += phase3_cost
  453. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  454. global_errors.extend(phase3_errs)
  455. else:
  456. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  457. end_time = time.time()
  458. elapsed_sec = end_time - start_time
  459. # Save Metrics
  460. metrics_file = base_dir / "run_metrics.json"
  461. metrics_data = []
  462. if metrics_file.exists():
  463. with open(metrics_file, "r", encoding="utf-8") as f:
  464. try:
  465. metrics_data = json.load(f)
  466. except json.JSONDecodeError:
  467. pass
  468. metrics_data.append({
  469. "index": args.index,
  470. "requirement": requirement[:80] + "...",
  471. "duration_seconds": round(elapsed_sec, 2),
  472. "total_cost_usd": round(total_cost, 4),
  473. "costs_breakdown": costs_breakdown,
  474. "errors": global_errors,
  475. "timestamp": datetime.now().isoformat()
  476. })
  477. with open(metrics_file, "w", encoding="utf-8") as f:
  478. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  479. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  480. finally:
  481. pass
  482. print("✅ Pipeline run finished.")
  483. if strategy_file:
  484. print("✅ Strategy saved to:", strategy_file)
  485. if __name__ == "__main__":
  486. asyncio.run(main())