run_pipeline.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import sys
  8. import time
  9. from datetime import datetime
  10. from pathlib import Path
  11. # Add project root to path
  12. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  13. from dotenv import load_dotenv
  14. load_dotenv()
  15. from agent.llm.prompts import SimplePrompt
  16. from agent.core.runner import AgentRunner, RunConfig
  17. from agent.tools.builtin.knowledge import KnowledgeConfig
  18. from agent.trace import FileSystemTraceStore, Trace, Message
  19. from agent.llm import create_qwen_llm_call
  20. from agent.llm.openrouter import create_openrouter_llm_call
  21. from agent.llm.claude import create_claude_llm_call
  22. # config from existing setup
  23. from examples.process_research.config import (
  24. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  25. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  26. )
  27. from agent.utils import setup_logging
  28. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str):
  29. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  30. base_dir = Path(__file__).parent
  31. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  32. prompt = SimplePrompt(prompt_path)
  33. messages = prompt.build_messages(**kwargs)
  34. target_tools = []
  35. if prompt_name == "extract_capabilities":
  36. target_tools = ["capability_search", "capability_list", "tool_search"]
  37. # 按 agent 类型配置工具组权限
  38. tool_groups_map = {
  39. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  40. "filter_and_blueprint": ["core"], # 只需文件读写
  41. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  42. "assemble_strategy": ["core"], # 只需文件读写
  43. }
  44. total_task_cost = 0.0
  45. task_errors = []
  46. out_file = kwargs.get("output_file")
  47. max_retries = 3
  48. last_trace_id = None
  49. last_validation_error = None
  50. final_trace_id = None # 用于返回最终成功的 trace_id
  51. for attempt in range(max_retries):
  52. if attempt > 0 and last_trace_id and last_validation_error:
  53. # 续跑模式:把错误信息告诉之前的 agent,让它修复
  54. print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
  55. fix_messages = [{
  56. "role": "user",
  57. "content": (
  58. f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
  59. f"错误详情:{last_validation_error}\n\n"
  60. f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
  61. f"只修复有问题的部分,不要丢弃已有的正确内容。"
  62. )
  63. }]
  64. fix_config = RunConfig(
  65. model=prompt.config.get("model") or model_name,
  66. temperature=0.1,
  67. name=f"{task_name}_Fix{attempt}",
  68. agent_type=prompt_name,
  69. tools=target_tools,
  70. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  71. trace_id=last_trace_id,
  72. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  73. )
  74. try:
  75. async for item in runner.run(messages=fix_messages, config=fix_config):
  76. if isinstance(item, Trace):
  77. last_trace_id = item.trace_id
  78. if item.status == "completed":
  79. total_task_cost += item.total_cost
  80. elif item.status == "failed":
  81. task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
  82. if isinstance(item, Message) and item.role == "tool":
  83. content = item.content if isinstance(item.content, dict) else {}
  84. if content.get("tool_name") in ("write_file", "write_json"):
  85. print(f" 💾 [Fix File Written by {task_name}]")
  86. except Exception as e:
  87. err_msg = f"{type(e).__name__}: {e}"
  88. print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
  89. task_errors.append(f"{task_name} fix crashed: {err_msg}")
  90. elif attempt > 0:
  91. # 没有 trace_id 或没有 validation error,只能完全重跑
  92. print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
  93. if out_file and Path(out_file).exists():
  94. Path(out_file).unlink()
  95. run_config = RunConfig(
  96. model=prompt.config.get("model") or model_name,
  97. temperature=prompt.config.get("temperature") or 0.3,
  98. name=f"{task_name}_A{attempt}",
  99. agent_type=prompt_name,
  100. tools=target_tools,
  101. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  102. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  103. )
  104. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  105. try:
  106. async for item in runner.run(messages=messages, config=run_config):
  107. if isinstance(item, Trace):
  108. last_trace_id = item.trace_id
  109. if item.status == "completed":
  110. total_task_cost += item.total_cost
  111. elif item.status == "failed":
  112. task_errors.append(f"{task_name} Failed: {item.error_message}")
  113. if isinstance(item, Message):
  114. if item.role == "tool":
  115. content = item.content if isinstance(item.content, dict) else {}
  116. t_name = content.get("tool_name", "unknown")
  117. if t_name in ("write_file", "write_json"):
  118. print(f" 💾 [File Written by {task_name}]")
  119. except Exception as e:
  120. err_msg = f"{type(e).__name__}: {e}"
  121. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  122. task_errors.append(f"{task_name} crashed: {err_msg}")
  123. else:
  124. # 首次执行
  125. run_config = RunConfig(
  126. model=prompt.config.get("model") or model_name,
  127. temperature=prompt.config.get("temperature") or 0.3,
  128. name=f"{task_name}_A{attempt}",
  129. agent_type=prompt_name,
  130. tools=target_tools,
  131. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  132. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  133. )
  134. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  135. try:
  136. async for item in runner.run(messages=messages, config=run_config):
  137. if isinstance(item, Trace):
  138. last_trace_id = item.trace_id
  139. if item.status == "completed":
  140. total_task_cost += item.total_cost
  141. elif item.status == "failed":
  142. task_errors.append(f"{task_name} Failed: {item.error_message}")
  143. if isinstance(item, Message):
  144. if item.role == "tool":
  145. content = item.content if isinstance(item.content, dict) else {}
  146. t_name = content.get("tool_name", "unknown")
  147. if t_name in ("write_file", "write_json"):
  148. print(f" 💾 [File Written by {task_name}]")
  149. except Exception as e:
  150. err_msg = f"{type(e).__name__}: {e}"
  151. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  152. task_errors.append(f"{task_name} crashed: {err_msg}")
  153. # Verification & Recovery block
  154. if out_file and not Path(out_file).exists() and last_trace_id:
  155. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  156. recovery_messages = [{
  157. "role": "user",
  158. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  159. }]
  160. rec_config = RunConfig(
  161. model=model_name,
  162. temperature=0.1,
  163. name=task_name + "_Rec",
  164. agent_type=prompt_name,
  165. trace_id=last_trace_id,
  166. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  167. )
  168. try:
  169. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  170. if isinstance(r_item, Trace):
  171. if r_item.status == "completed":
  172. total_task_cost += r_item.total_cost
  173. elif r_item.status == "failed":
  174. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  175. if isinstance(r_item, Message) and r_item.role == "tool":
  176. content = r_item.content if isinstance(r_item.content, dict) else {}
  177. if content.get("tool_name") in ("write_file", "write_json"):
  178. print(f" 💾 [Recovery File Written by {task_name}]")
  179. except Exception as e:
  180. err_msg = f"{type(e).__name__}: {e}"
  181. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  182. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  183. # Schema Validation
  184. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  185. try:
  186. with open(out_file, "r", encoding="utf-8") as f:
  187. data = json.loads(f.read())
  188. filename = Path(out_file).name
  189. err = None
  190. if filename.startswith("case_"):
  191. err = validate_case(data)
  192. elif filename == "blueprint.json":
  193. err = validate_blueprint(data)
  194. elif filename == "capabilities_extracted.json":
  195. err = validate_capabilities(data)
  196. elif filename == "strategy.json":
  197. err = validate_strategy(data)
  198. if err:
  199. raise ValueError(f"Schema Validation Failed: {err}")
  200. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  201. final_trace_id = last_trace_id
  202. return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
  203. except Exception as e:
  204. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  205. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  206. task_errors.append(f"{task_name} Error: {e}")
  207. last_validation_error = str(e)
  208. if attempt == max_retries - 1:
  209. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  210. return total_task_cost, task_errors, last_trace_id
  211. else:
  212. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  213. last_validation_error = None
  214. if attempt == max_retries - 1:
  215. return total_task_cost, task_errors, last_trace_id
  216. return total_task_cost, task_errors, last_trace_id
  217. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str):
  218. """
  219. 备用:使用纯净的官方 Anthropic SDK 驱动。
  220. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  221. """
  222. from anthropic import AsyncAnthropic
  223. from agent.tools.registry import get_tool_registry
  224. base_dir = Path(__file__).parent
  225. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  226. prompt = SimplePrompt(prompt_path)
  227. # 1. 组装输入 Message
  228. raw_messages = prompt.build_messages(**kwargs)
  229. system_prompt = ""
  230. messages = []
  231. for msg in raw_messages:
  232. if msg["role"] == "system":
  233. system_prompt += msg["content"] + "\n\n"
  234. else:
  235. messages.append({"role": msg["role"], "content": msg["content"]})
  236. # 2. 映射目标工具
  237. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  238. if prompt_name == "extract_capabilities":
  239. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  240. registry = get_tool_registry()
  241. schemas = registry.get_schemas(target_tools)
  242. anthropic_tools = []
  243. for s in schemas:
  244. anthropic_tools.append({
  245. "name": s["function"]["name"],
  246. "description": s["function"].get("description", ""),
  247. "input_schema": s["function"]["parameters"]
  248. })
  249. # 3. 初始化并开启 Loop
  250. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  251. client = AsyncAnthropic()
  252. total_task_cost = 0.0
  253. task_errors = []
  254. sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
  255. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  256. out_file = kwargs.get("output_file")
  257. max_retries = 3
  258. for attempt in range(max_retries):
  259. if attempt > 0:
  260. print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
  261. if out_file and Path(out_file).exists():
  262. Path(out_file).unlink()
  263. print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
  264. # Reset messages for retry
  265. messages_copy = list(messages)
  266. max_loops = 50
  267. for loop_idx in range(max_loops):
  268. try:
  269. # 去除前缀(兼容比如 openrouter 传入的名字)
  270. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  271. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
  272. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  273. response = await client.messages.create(
  274. model=target_model,
  275. max_tokens=4096,
  276. temperature=0.2,
  277. system=system_prompt,
  278. messages=messages_copy,
  279. tools=anthropic_tools
  280. )
  281. # (简略预估,不代表真实官方开销)
  282. if hasattr(response, 'usage'):
  283. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  284. total_task_cost += step_cost
  285. # 加入助手回复
  286. assistant_content = []
  287. tool_uses = []
  288. for content_block in response.content:
  289. if content_block.type == "text":
  290. text_val = content_block.text
  291. if text_val:
  292. assistant_content.append({"type": "text", "text": text_val})
  293. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  294. elif content_block.type == "tool_use":
  295. assistant_content.append({
  296. "type": "tool_use",
  297. "id": content_block.id,
  298. "name": content_block.name,
  299. "input": content_block.input
  300. })
  301. tool_uses.append(content_block)
  302. if not assistant_content:
  303. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  304. messages_copy.append({"role": "assistant", "content": assistant_content})
  305. # 出口:没有调用工具说明任务结束
  306. if not tool_uses:
  307. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
  308. break
  309. # 工具执行与回传
  310. tool_results = []
  311. for tu in tool_uses:
  312. if tu.name in ("write_file", "write_json"):
  313. print(f" 💾 [File Written by SDK] {task_name}")
  314. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  315. # 执行本地环境的函数
  316. result_str = await registry.execute(tu.name, tu.input)
  317. tool_results.append({
  318. "type": "tool_result",
  319. "tool_use_id": tu.id,
  320. "content": result_str
  321. })
  322. messages_copy.append({"role": "user", "content": tool_results})
  323. except Exception as e:
  324. err_msg = str(e)
  325. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  326. task_errors.append(err_msg)
  327. break
  328. # Verification & Recovery block for SDK (porting from AgentRunner)
  329. if out_file and not Path(out_file).exists():
  330. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  331. messages_copy.append({
  332. "role": "user",
  333. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  334. })
  335. try:
  336. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  337. rec_response = await client.messages.create(
  338. model=target_model,
  339. max_tokens=4096,
  340. temperature=0.1,
  341. system=system_prompt,
  342. messages=messages_copy,
  343. tools=anthropic_tools,
  344. tool_choice={"type": "any"}
  345. )
  346. for content_block in rec_response.content:
  347. if content_block.type == "tool_use":
  348. if content_block.name in ("write_file", "write_json"):
  349. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  350. await registry.execute(content_block.name, content_block.input)
  351. except Exception as e:
  352. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  353. task_errors.append(str(e))
  354. # Schema Validation
  355. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  356. try:
  357. with open(out_file, "r", encoding="utf-8") as f:
  358. data = json.loads(f.read())
  359. filename = Path(out_file).name
  360. err = None
  361. if filename.startswith("case_"):
  362. err = validate_case(data)
  363. elif filename == "blueprint.json":
  364. err = validate_blueprint(data)
  365. elif filename == "capabilities_extracted.json":
  366. err = validate_capabilities(data)
  367. elif filename == "strategy.json":
  368. err = validate_strategy(data)
  369. if err:
  370. raise ValueError(f"Schema Validation Failed: {err}")
  371. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  372. return total_task_cost, task_errors # Success! Exit retry loop.
  373. except Exception as e:
  374. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  375. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  376. task_errors.append(f"{task_name} Error: {e}")
  377. if attempt == max_retries - 1:
  378. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  379. return total_task_cost, task_errors
  380. else:
  381. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  382. if attempt == max_retries - 1:
  383. return total_task_cost, task_errors
  384. return total_task_cost, task_errors
  385. async def main():
  386. parser = argparse.ArgumentParser()
  387. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  388. parser.add_argument("--skip-research", action="store_true", help="Skip Phase 1 and use existing raw cases")
  389. parser.add_argument("--research-only", action="store_true", help="Only run research phases, skip Phase 2 and 3")
  390. parser.add_argument("--platforms", type=str, default="xhs,youtube,bili,x", help="Comma-separated list of platforms to search")
  391. parser.add_argument("--use-claude-sdk", action="store_true", help="Use pure Anthropic SDK (run_anthropic_sdk_task) instead of internal AgentRunner for Phase 2/3")
  392. parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
  393. args = parser.parse_args()
  394. base_dir = Path(__file__).parent
  395. # Load requirements locally
  396. req_path = base_dir / "db_requirements.json"
  397. with open(req_path, encoding='utf-8') as f:
  398. reqs = json.load(f)
  399. if args.index < 0 or args.index >= len(reqs):
  400. print("Index out of bounds")
  401. sys.exit(1)
  402. requirement = reqs[args.index]
  403. # 0. Setup directories
  404. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  405. output_dir.mkdir(parents=True, exist_ok=True)
  406. raw_cases_dir = output_dir / "raw_cases"
  407. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  408. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  409. print("=" * 60)
  410. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  411. print("=" * 60)
  412. # Load presets
  413. presets_path = base_dir / "presets.json"
  414. if presets_path.exists():
  415. from agent.core.presets import load_presets_from_json
  416. load_presets_from_json(str(presets_path))
  417. print("✅ Configured Agent Presets (Skills Boundaries)")
  418. # Browser initialization removed to save resources
  419. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  420. # Instantiate two distinct LLM orchestrators
  421. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  422. # 用户指示使用 OpenRouter 代理的 Claude 4.5
  423. claude_model = "anthropic/claude-4.5-sonnet"
  424. args.use_claude_sdk = False # 禁用纯 Native SDK 模式,走内部通用 AgentRunner (即可对接 OpenRouter)
  425. from agent.llm.openrouter import create_openrouter_llm_call
  426. claude_llm_call = create_openrouter_llm_call(model=claude_model)
  427. runner_qwen = AgentRunner(
  428. trace_store=store,
  429. llm_call=create_qwen_llm_call(model=qwen_model),
  430. skills_dir=SKILLS_DIR
  431. )
  432. runner_claude = AgentRunner(
  433. trace_store=store,
  434. llm_call=claude_llm_call,
  435. skills_dir=SKILLS_DIR
  436. )
  437. try:
  438. start_time = time.time()
  439. total_cost = 0.0
  440. costs_breakdown = {}
  441. global_errors = []
  442. strategy_file = None
  443. existing_platforms = []
  444. if raw_cases_dir.exists():
  445. for f in raw_cases_dir.glob("case_*.json"):
  446. plat = f.stem.replace("case_", "")
  447. if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
  448. existing_platforms.append(plat)
  449. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  450. needed_count = max(0, 4 - len(existing_platforms))
  451. # Phase 0: Dynamic Routing
  452. if not args.skip_research:
  453. if needed_count == 0:
  454. print(f"\n--- Phase 0: Skipping Routing (Already have {len(existing_platforms)} existing cases: {existing_platforms}) ---")
  455. platforms = []
  456. else:
  457. print(f"\n--- Phase 0: Dynamic Platform Routing ({qwen_model}) ---")
  458. print(f"📡 Found existing cases: {existing_platforms}. Requesting {needed_count} new platforms...")
  459. try:
  460. router_prompt = SimplePrompt(base_dir / "prompts" / "router.prompt")
  461. rmessages = router_prompt.build_messages(
  462. requirement=requirement,
  463. existing_platforms=",".join(existing_platforms) if existing_platforms else "无",
  464. needed_count=str(needed_count)
  465. )
  466. rconfig = RunConfig(
  467. model=qwen_model,
  468. temperature=0.1,
  469. name="P0_Router",
  470. agent_type="router",
  471. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  472. )
  473. print(f"🚀 [Launch] P0_Router calculating optimal platforms...")
  474. router_response = ""
  475. async for item in runner_qwen.run(messages=rmessages, config=rconfig):
  476. if isinstance(item, Message) and item.role == "assistant" and isinstance(item.content, dict):
  477. text = item.content.get("text", "")
  478. if text and not item.content.get("tool_calls"):
  479. router_response = text
  480. if isinstance(item, Trace) and item.status == "completed":
  481. total_cost += item.total_cost
  482. costs_breakdown["P0_Router"] = round(item.total_cost, 4)
  483. if router_response:
  484. import re
  485. # Extract all alphabetic words from the response to handle extra text or markdown
  486. words = set(re.findall(r'\b[a-z]+\b', router_response.lower()))
  487. valid_platforms = ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]
  488. # Intersect words with valid platforms (direct exact word matching, exclude existing)
  489. final_platforms = [p for p in valid_platforms if p in words and p not in existing_platforms]
  490. if final_platforms:
  491. platforms = final_platforms[:needed_count]
  492. print(f"🎯 [Router Decision] Selected {len(platforms)} new platforms: {platforms}")
  493. else:
  494. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  495. print(f"⚠️ [Router Fallback] Invalid output '{router_response}'. Using delta default: {platforms}")
  496. except Exception as e:
  497. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  498. print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}")
  499. # Phase 1: MAP (Parallel Search) uses Qwen
  500. if not args.skip_research:
  501. print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
  502. phase1_tasks = []
  503. for p in platforms:
  504. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  505. out_file = str(raw_cases_dir / f"case_{p}.json")
  506. kwargs = {
  507. "task": task_desc,
  508. "output_file": out_file
  509. }
  510. phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model))
  511. phase1_results = await asyncio.gather(*phase1_tasks)
  512. phase1_trace_ids = {}
  513. for (task_cost, task_errors, trace_id), p in zip(phase1_results, platforms):
  514. total_cost += task_cost
  515. costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
  516. phase1_trace_ids[f"P1_Research_{p}"] = trace_id
  517. global_errors.extend(task_errors)
  518. # Check if cases actually got written
  519. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  520. if not expected_file.exists():
  521. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  522. print(f"⚠️ [Warning] {err_msg}")
  523. global_errors.append(err_msg)
  524. else:
  525. print("\n⏭️ [Skip] Phase 1 Skipped via --skip-research. Using existing cases...")
  526. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  527. if not args.research_only:
  528. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  529. print(f"\n--- Phase 2: Parallel Distillation ({claude_model}) ---")
  530. blueprint_file = str(output_dir / "blueprint.json")
  531. capabilities_file = str(output_dir / "capabilities_extracted.json")
  532. raw_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  533. task_a = None
  534. task_b = None
  535. force_strategy_rerun = False
  536. if Path(blueprint_file).exists():
  537. print(f"⏭️ [Skip P2] blueprint.json already exists. Skipping P2_FilterBlueprint.")
  538. else:
  539. force_strategy_rerun = True
  540. if args.use_claude_sdk:
  541. print(" > Using [Anthropic SDK Core] for P2_FilterBlueprint")
  542. task_a = run_anthropic_sdk_task("filter_and_blueprint", {
  543. "requirement": requirement,
  544. "raw_files_glob": raw_glob,
  545. "output_file": blueprint_file
  546. }, "P2_FilterBlueprint", claude_model)
  547. else:
  548. print(" > Using [AgentRunner Core] for P2_FilterBlueprint")
  549. task_a = run_agent_task(runner_claude, "filter_and_blueprint", {
  550. "requirement": requirement,
  551. "raw_files_glob": raw_glob,
  552. "output_file": blueprint_file
  553. }, "P2_FilterBlueprint", claude_model)
  554. if Path(capabilities_file).exists():
  555. print(f"⏭️ [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.")
  556. else:
  557. force_strategy_rerun = True
  558. if args.use_claude_sdk:
  559. print(" > Using [Anthropic SDK Core] for P2_ExtractCaps")
  560. task_b = run_anthropic_sdk_task("extract_capabilities", {
  561. "requirement": requirement,
  562. "raw_files_glob": raw_glob,
  563. "output_file": capabilities_file
  564. }, "P2_ExtractCaps", claude_model)
  565. else:
  566. print(" > Using [AgentRunner Core] for P2_ExtractCaps")
  567. task_b = run_agent_task(runner_claude, "extract_capabilities", {
  568. "requirement": requirement,
  569. "raw_files_glob": raw_glob,
  570. "output_file": capabilities_file
  571. }, "P2_ExtractCaps", claude_model)
  572. to_await = []
  573. names_await = []
  574. if task_a:
  575. to_await.append(task_a)
  576. names_await.append("P2_FilterBlueprint")
  577. if task_b:
  578. to_await.append(task_b)
  579. names_await.append("P2_ExtractCaps")
  580. if to_await:
  581. phase2_results = await asyncio.gather(*to_await)
  582. phase2_trace_ids = {}
  583. for (cost, errs, trace_id), t_name in zip(phase2_results, names_await):
  584. total_cost += cost
  585. costs_breakdown[t_name] = round(cost, 4)
  586. phase2_trace_ids[t_name] = trace_id
  587. global_errors.extend(errs)
  588. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  589. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  590. strategy_file = str(output_dir / "strategy.json")
  591. if Path(strategy_file).exists() and not force_strategy_rerun:
  592. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  593. else:
  594. if Path(strategy_file).exists():
  595. print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
  596. if args.use_claude_sdk:
  597. print(" > Using [Anthropic SDK Core]")
  598. phase3_cost, phase3_errs, phase3_trace_id = await run_anthropic_sdk_task("assemble_strategy", {
  599. "requirement": requirement,
  600. "blueprint_file": blueprint_file,
  601. "capabilities_file": capabilities_file,
  602. "output_file": strategy_file
  603. }, "P3_Assembler", claude_model)
  604. else:
  605. print(" > Using [AgentRunner Core]")
  606. phase3_cost, phase3_errs, phase3_trace_id = await run_agent_task(runner_claude, "assemble_strategy", {
  607. "requirement": requirement,
  608. "blueprint_file": blueprint_file,
  609. "capabilities_file": capabilities_file,
  610. "output_file": strategy_file
  611. }, "P3_Assembler", claude_model)
  612. total_cost += phase3_cost
  613. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  614. global_errors.extend(phase3_errs)
  615. else:
  616. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  617. end_time = time.time()
  618. elapsed_sec = end_time - start_time
  619. # Save Metrics
  620. metrics_file = base_dir / "run_metrics.json"
  621. metrics_data = []
  622. if metrics_file.exists():
  623. with open(metrics_file, "r", encoding="utf-8") as f:
  624. try:
  625. metrics_data = json.load(f)
  626. except json.JSONDecodeError:
  627. pass
  628. # Collect trace_ids from all phases
  629. trace_ids = {}
  630. if not args.skip_research and 'phase1_trace_ids' in dir():
  631. trace_ids.update(phase1_trace_ids)
  632. if not args.research_only:
  633. if 'phase2_trace_ids' in dir():
  634. trace_ids.update(phase2_trace_ids)
  635. if 'phase3_trace_id' in dir() and phase3_trace_id:
  636. trace_ids["P3_Assembler"] = phase3_trace_id
  637. metrics_data.append({
  638. "index": args.index,
  639. "requirement": requirement[:80] + "...",
  640. "duration_seconds": round(elapsed_sec, 2),
  641. "total_cost_usd": round(total_cost, 4),
  642. "costs_breakdown": costs_breakdown,
  643. "trace_ids": trace_ids,
  644. "errors": global_errors,
  645. "timestamp": datetime.now().isoformat()
  646. })
  647. with open(metrics_file, "w", encoding="utf-8") as f:
  648. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  649. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  650. finally:
  651. pass
  652. print("✅ Pipeline run finished.")
  653. if strategy_file:
  654. print("✅ Strategy saved to:", strategy_file)
  655. if __name__ == "__main__":
  656. asyncio.run(main())