run_pipeline.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import os
  8. import sys
  9. import time
  10. from datetime import datetime
  11. from pathlib import Path
  12. PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
  13. # Force runtime working directory to project root so relative trace/cache paths
  14. # always land in the repo root no matter where this script is launched from.
  15. os.chdir(PROJECT_ROOT)
  16. # Add project root to path
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. from agent.llm.prompts import SimplePrompt
  21. from agent.core.runner import AgentRunner, RunConfig
  22. from agent.tools.builtin.knowledge import KnowledgeConfig
  23. from agent.trace import FileSystemTraceStore, Trace, Message
  24. from agent.llm import create_qwen_llm_call
  25. from agent.llm.openrouter import create_openrouter_llm_call
  26. from agent.llm.claude import create_claude_llm_call
  27. # config from existing setup
  28. from examples.process_research.config import (
  29. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  30. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  31. )
  32. from agent.utils import setup_logging
  33. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  34. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  35. base_dir = Path(__file__).parent
  36. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  37. prompt = SimplePrompt(prompt_path)
  38. messages = prompt.build_messages(**kwargs)
  39. target_tools = []
  40. if prompt_name == "extract_capabilities":
  41. target_tools = ["capability_search", "capability_list", "tool_search"]
  42. # 按 agent 类型配置工具组权限
  43. tool_groups_map = {
  44. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  45. "filter_and_blueprint": ["core"], # 只需文件读写
  46. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  47. "assemble_strategy": ["core"], # 只需文件读写
  48. }
  49. total_task_cost = 0.0
  50. task_errors = []
  51. out_file = kwargs.get("output_file")
  52. max_retries = 3
  53. last_trace_id = None
  54. last_validation_error = None
  55. final_trace_id = None # 用于返回最终成功的 trace_id
  56. for attempt in range(max_retries):
  57. if attempt > 0 and last_trace_id and last_validation_error:
  58. # 续跑模式:把错误信息告诉之前的 agent,让它修复
  59. print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
  60. fix_messages = [{
  61. "role": "user",
  62. "content": (
  63. f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
  64. f"错误详情:{last_validation_error}\n\n"
  65. f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
  66. f"只修复有问题的部分,不要丢弃已有的正确内容。"
  67. )
  68. }]
  69. fix_config = RunConfig(
  70. model=prompt.config.get("model") or model_name,
  71. temperature=0.1,
  72. name=f"{task_name}_Fix{attempt}",
  73. agent_type=prompt_name,
  74. tools=target_tools,
  75. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  76. trace_id=last_trace_id,
  77. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  78. )
  79. try:
  80. async for item in runner.run(messages=fix_messages, config=fix_config):
  81. if isinstance(item, Trace):
  82. last_trace_id = item.trace_id
  83. if item.status == "completed":
  84. total_task_cost += item.total_cost
  85. elif item.status == "failed":
  86. task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
  87. if isinstance(item, Message) and item.role == "tool":
  88. content = item.content if isinstance(item.content, dict) else {}
  89. if content.get("tool_name") in ("write_file", "write_json"):
  90. print(f" 💾 [Fix File Written by {task_name}]")
  91. except Exception as e:
  92. err_msg = f"{type(e).__name__}: {e}"
  93. print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
  94. task_errors.append(f"{task_name} fix crashed: {err_msg}")
  95. elif attempt > 0:
  96. # 没有 trace_id 或没有 validation error,只能完全重跑
  97. print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
  98. if out_file and Path(out_file).exists():
  99. Path(out_file).unlink()
  100. run_config = RunConfig(
  101. model=prompt.config.get("model") or model_name,
  102. temperature=prompt.config.get("temperature") or 0.3,
  103. name=f"{task_name}_A{attempt}",
  104. agent_type=prompt_name,
  105. tools=target_tools,
  106. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  107. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  108. )
  109. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  110. try:
  111. async for item in runner.run(messages=messages, config=run_config):
  112. if isinstance(item, Trace):
  113. last_trace_id = item.trace_id
  114. if item.status == "completed":
  115. total_task_cost += item.total_cost
  116. elif item.status == "failed":
  117. task_errors.append(f"{task_name} Failed: {item.error_message}")
  118. if isinstance(item, Message):
  119. if item.role == "tool":
  120. content = item.content if isinstance(item.content, dict) else {}
  121. t_name = content.get("tool_name", "unknown")
  122. if t_name in ("write_file", "write_json"):
  123. print(f" 💾 [File Written by {task_name}]")
  124. except Exception as e:
  125. err_msg = f"{type(e).__name__}: {e}"
  126. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  127. task_errors.append(f"{task_name} crashed: {err_msg}")
  128. else:
  129. # 首次执行
  130. run_config = RunConfig(
  131. model=prompt.config.get("model") or model_name,
  132. temperature=prompt.config.get("temperature") or 0.3,
  133. name=f"{task_name}_A{attempt}",
  134. agent_type=prompt_name,
  135. tools=target_tools,
  136. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  137. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  138. )
  139. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  140. try:
  141. async for item in runner.run(messages=messages, config=run_config):
  142. if isinstance(item, Trace):
  143. last_trace_id = item.trace_id
  144. if item.status == "completed":
  145. total_task_cost += item.total_cost
  146. elif item.status == "failed":
  147. task_errors.append(f"{task_name} Failed: {item.error_message}")
  148. if isinstance(item, Message):
  149. if item.role == "tool":
  150. content = item.content if isinstance(item.content, dict) else {}
  151. t_name = content.get("tool_name", "unknown")
  152. if t_name in ("write_file", "write_json"):
  153. print(f" 💾 [File Written by {task_name}]")
  154. except Exception as e:
  155. err_msg = f"{type(e).__name__}: {e}"
  156. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  157. task_errors.append(f"{task_name} crashed: {err_msg}")
  158. # Verification & Recovery block
  159. if out_file and not Path(out_file).exists() and last_trace_id:
  160. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  161. recovery_messages = [{
  162. "role": "user",
  163. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  164. }]
  165. rec_config = RunConfig(
  166. model=model_name,
  167. temperature=0.1,
  168. name=task_name + "_Rec",
  169. agent_type=prompt_name,
  170. trace_id=last_trace_id,
  171. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  172. )
  173. try:
  174. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  175. if isinstance(r_item, Trace):
  176. if r_item.status == "completed":
  177. total_task_cost += r_item.total_cost
  178. elif r_item.status == "failed":
  179. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  180. if isinstance(r_item, Message) and r_item.role == "tool":
  181. content = r_item.content if isinstance(r_item.content, dict) else {}
  182. if content.get("tool_name") in ("write_file", "write_json"):
  183. print(f" 💾 [Recovery File Written by {task_name}]")
  184. except Exception as e:
  185. err_msg = f"{type(e).__name__}: {e}"
  186. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  187. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  188. # Schema Validation
  189. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  190. if skip_validation:
  191. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  192. return total_task_cost, task_errors, last_trace_id
  193. try:
  194. with open(out_file, "r", encoding="utf-8") as f:
  195. data = json.loads(f.read())
  196. filename = Path(out_file).name
  197. err = None
  198. if filename.startswith("case_"):
  199. err = validate_case(data)
  200. elif filename == "blueprint.json":
  201. err = validate_blueprint(data)
  202. elif filename == "capabilities_extracted.json":
  203. err = validate_capabilities(data)
  204. elif filename == "strategy.json":
  205. err = validate_strategy(data)
  206. if err:
  207. raise ValueError(f"Schema Validation Failed: {err}")
  208. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  209. final_trace_id = last_trace_id
  210. return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
  211. except Exception as e:
  212. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  213. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  214. task_errors.append(f"{task_name} Error: {e}")
  215. last_validation_error = str(e)
  216. if attempt == max_retries - 1:
  217. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  218. return total_task_cost, task_errors, last_trace_id
  219. else:
  220. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  221. last_validation_error = None
  222. if attempt == max_retries - 1:
  223. return total_task_cost, task_errors, last_trace_id
  224. return total_task_cost, task_errors, last_trace_id
  225. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  226. """
  227. 备用:使用纯净的官方 Anthropic SDK 驱动。
  228. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  229. """
  230. from anthropic import AsyncAnthropic
  231. from agent.tools.registry import get_tool_registry
  232. base_dir = Path(__file__).parent
  233. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  234. prompt = SimplePrompt(prompt_path)
  235. # 1. 组装输入 Message
  236. raw_messages = prompt.build_messages(**kwargs)
  237. system_prompt = ""
  238. messages = []
  239. for msg in raw_messages:
  240. if msg["role"] == "system":
  241. system_prompt += msg["content"] + "\n\n"
  242. else:
  243. messages.append({"role": msg["role"], "content": msg["content"]})
  244. # 2. 映射目标工具
  245. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  246. if prompt_name == "extract_capabilities":
  247. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  248. registry = get_tool_registry()
  249. schemas = registry.get_schemas(target_tools)
  250. anthropic_tools = []
  251. for s in schemas:
  252. anthropic_tools.append({
  253. "name": s["function"]["name"],
  254. "description": s["function"].get("description", ""),
  255. "input_schema": s["function"]["parameters"]
  256. })
  257. # 3. 初始化并开启 Loop
  258. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  259. client = AsyncAnthropic()
  260. total_task_cost = 0.0
  261. task_errors = []
  262. sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
  263. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  264. out_file = kwargs.get("output_file")
  265. max_retries = 3
  266. for attempt in range(max_retries):
  267. if attempt > 0:
  268. print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
  269. if out_file and Path(out_file).exists():
  270. Path(out_file).unlink()
  271. print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
  272. # Reset messages for retry
  273. messages_copy = list(messages)
  274. max_loops = 50
  275. for loop_idx in range(max_loops):
  276. try:
  277. # 去除前缀(兼容比如 openrouter 传入的名字)
  278. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  279. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
  280. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  281. response = await client.messages.create(
  282. model=target_model,
  283. max_tokens=4096,
  284. temperature=0.2,
  285. system=system_prompt,
  286. messages=messages_copy,
  287. tools=anthropic_tools
  288. )
  289. # (简略预估,不代表真实官方开销)
  290. if hasattr(response, 'usage'):
  291. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  292. total_task_cost += step_cost
  293. # 加入助手回复
  294. assistant_content = []
  295. tool_uses = []
  296. for content_block in response.content:
  297. if content_block.type == "text":
  298. text_val = content_block.text
  299. if text_val:
  300. assistant_content.append({"type": "text", "text": text_val})
  301. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  302. elif content_block.type == "tool_use":
  303. assistant_content.append({
  304. "type": "tool_use",
  305. "id": content_block.id,
  306. "name": content_block.name,
  307. "input": content_block.input
  308. })
  309. tool_uses.append(content_block)
  310. if not assistant_content:
  311. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  312. messages_copy.append({"role": "assistant", "content": assistant_content})
  313. # 出口:没有调用工具说明任务结束
  314. if not tool_uses:
  315. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
  316. break
  317. # 工具执行与回传
  318. tool_results = []
  319. for tu in tool_uses:
  320. if tu.name in ("write_file", "write_json"):
  321. print(f" 💾 [File Written by SDK] {task_name}")
  322. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  323. # 执行本地环境的函数
  324. result_str = await registry.execute(tu.name, tu.input)
  325. tool_results.append({
  326. "type": "tool_result",
  327. "tool_use_id": tu.id,
  328. "content": result_str
  329. })
  330. messages_copy.append({"role": "user", "content": tool_results})
  331. except Exception as e:
  332. err_msg = str(e)
  333. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  334. task_errors.append(err_msg)
  335. break
  336. # Verification & Recovery block for SDK (porting from AgentRunner)
  337. if out_file and not Path(out_file).exists():
  338. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  339. messages_copy.append({
  340. "role": "user",
  341. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  342. })
  343. try:
  344. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  345. rec_response = await client.messages.create(
  346. model=target_model,
  347. max_tokens=4096,
  348. temperature=0.1,
  349. system=system_prompt,
  350. messages=messages_copy,
  351. tools=anthropic_tools,
  352. tool_choice={"type": "any"}
  353. )
  354. for content_block in rec_response.content:
  355. if content_block.type == "tool_use":
  356. if content_block.name in ("write_file", "write_json"):
  357. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  358. await registry.execute(content_block.name, content_block.input)
  359. except Exception as e:
  360. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  361. task_errors.append(str(e))
  362. # Schema Validation
  363. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  364. if skip_validation:
  365. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  366. return total_task_cost, task_errors, None
  367. try:
  368. with open(out_file, "r", encoding="utf-8") as f:
  369. data = json.loads(f.read())
  370. filename = Path(out_file).name
  371. err = None
  372. if filename.startswith("case_"):
  373. err = validate_case(data)
  374. elif filename == "blueprint.json":
  375. err = validate_blueprint(data)
  376. elif filename == "capabilities_extracted.json":
  377. err = validate_capabilities(data)
  378. elif filename == "strategy.json":
  379. err = validate_strategy(data)
  380. if err:
  381. raise ValueError(f"Schema Validation Failed: {err}")
  382. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  383. return total_task_cost, task_errors # Success! Exit retry loop.
  384. except Exception as e:
  385. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  386. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  387. task_errors.append(f"{task_name} Error: {e}")
  388. if attempt == max_retries - 1:
  389. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  390. return total_task_cost, task_errors
  391. else:
  392. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  393. if attempt == max_retries - 1:
  394. return total_task_cost, task_errors
  395. return total_task_cost, task_errors
  396. async def main():
  397. parser = argparse.ArgumentParser()
  398. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  399. parser.add_argument("--skip-research", action="store_true", help="Skip Phase 1 and use existing raw cases")
  400. parser.add_argument("--research-only", action="store_true", help="Only run research phases, skip Phase 2 and 3")
  401. parser.add_argument("--platforms", type=str, default="xhs,youtube,bili,x", help="Comma-separated list of platforms to search")
  402. parser.add_argument("--use-claude-sdk", action="store_true", help="Use pure Anthropic SDK (run_anthropic_sdk_task) instead of internal AgentRunner for Phase 2/3")
  403. parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
  404. args = parser.parse_args()
  405. base_dir = Path(__file__).parent
  406. # Load requirements locally
  407. req_path = base_dir / "db_requirements.json"
  408. with open(req_path, encoding='utf-8') as f:
  409. reqs = json.load(f)
  410. if args.index < 0 or args.index >= len(reqs):
  411. print("Index out of bounds")
  412. sys.exit(1)
  413. requirement = reqs[args.index]
  414. # 0. Setup directories
  415. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  416. output_dir.mkdir(parents=True, exist_ok=True)
  417. raw_cases_dir = output_dir / "raw_cases"
  418. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  419. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  420. print("=" * 60)
  421. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  422. print("=" * 60)
  423. # Load presets
  424. presets_path = base_dir / "presets.json"
  425. if presets_path.exists():
  426. from agent.core.presets import load_presets_from_json
  427. load_presets_from_json(str(presets_path))
  428. print("✅ Configured Agent Presets (Skills Boundaries)")
  429. # Browser initialization removed to save resources
  430. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  431. # Instantiate two distinct LLM orchestrators
  432. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  433. # 用户指示使用 OpenRouter 代理的 Claude 4.5
  434. claude_model = "anthropic/claude-4.5-sonnet"
  435. args.use_claude_sdk = False # 禁用纯 Native SDK 模式,走内部通用 AgentRunner (即可对接 OpenRouter)
  436. from agent.llm.openrouter import create_openrouter_llm_call
  437. claude_llm_call = create_openrouter_llm_call(model=claude_model)
  438. runner_qwen = AgentRunner(
  439. trace_store=store,
  440. llm_call=create_qwen_llm_call(model=qwen_model),
  441. skills_dir=SKILLS_DIR
  442. )
  443. runner_claude = AgentRunner(
  444. trace_store=store,
  445. llm_call=claude_llm_call,
  446. skills_dir=SKILLS_DIR
  447. )
  448. try:
  449. start_time = time.time()
  450. total_cost = 0.0
  451. costs_breakdown = {}
  452. global_errors = []
  453. strategy_file = None
  454. existing_platforms = []
  455. if raw_cases_dir.exists():
  456. for f in raw_cases_dir.glob("case_*.json"):
  457. plat = f.stem.replace("case_", "")
  458. if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
  459. existing_platforms.append(plat)
  460. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  461. needed_count = max(0, 4 - len(existing_platforms))
  462. # Phase 0: Dynamic Routing
  463. if not args.skip_research:
  464. if needed_count == 0:
  465. print(f"\n--- Phase 0: Skipping Routing (Already have {len(existing_platforms)} existing cases: {existing_platforms}) ---")
  466. platforms = []
  467. else:
  468. print(f"\n--- Phase 0: Dynamic Platform Routing ({qwen_model}) ---")
  469. print(f"📡 Found existing cases: {existing_platforms}. Requesting {needed_count} new platforms...")
  470. try:
  471. router_prompt = SimplePrompt(base_dir / "prompts" / "router.prompt")
  472. rmessages = router_prompt.build_messages(
  473. requirement=requirement,
  474. existing_platforms=",".join(existing_platforms) if existing_platforms else "无",
  475. needed_count=str(needed_count)
  476. )
  477. rconfig = RunConfig(
  478. model=qwen_model,
  479. temperature=0.1,
  480. name="P0_Router",
  481. agent_type="router",
  482. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  483. )
  484. print(f"🚀 [Launch] P0_Router calculating optimal platforms...")
  485. router_response = ""
  486. async for item in runner_qwen.run(messages=rmessages, config=rconfig):
  487. if isinstance(item, Message) and item.role == "assistant" and isinstance(item.content, dict):
  488. text = item.content.get("text", "")
  489. if text and not item.content.get("tool_calls"):
  490. router_response = text
  491. if isinstance(item, Trace) and item.status == "completed":
  492. total_cost += item.total_cost
  493. costs_breakdown["P0_Router"] = round(item.total_cost, 4)
  494. if router_response:
  495. import re
  496. # Extract all alphabetic words from the response to handle extra text or markdown
  497. words = set(re.findall(r'\b[a-z]+\b', router_response.lower()))
  498. valid_platforms = ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]
  499. # Intersect words with valid platforms (direct exact word matching, exclude existing)
  500. final_platforms = [p for p in valid_platforms if p in words and p not in existing_platforms]
  501. if final_platforms:
  502. platforms = final_platforms[:needed_count]
  503. print(f"🎯 [Router Decision] Selected {len(platforms)} new platforms: {platforms}")
  504. else:
  505. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  506. print(f"⚠️ [Router Fallback] Invalid output '{router_response}'. Using delta default: {platforms}")
  507. except Exception as e:
  508. platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
  509. print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}")
  510. is_single_step = args.restart_mode.startswith("single_")
  511. # Phase 1: MAP (Parallel Search) uses Qwen
  512. if not args.skip_research:
  513. print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
  514. phase1_tasks = []
  515. for p in platforms:
  516. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  517. out_file = str(raw_cases_dir / f"case_{p}.json")
  518. kwargs = {
  519. "task": task_desc,
  520. "output_file": out_file
  521. }
  522. phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=is_single_step))
  523. phase1_results = await asyncio.gather(*phase1_tasks)
  524. phase1_trace_ids = {}
  525. for (task_cost, task_errors, trace_id), p in zip(phase1_results, platforms):
  526. total_cost += task_cost
  527. costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
  528. phase1_trace_ids[f"P1_Research_{p}"] = trace_id
  529. global_errors.extend(task_errors)
  530. # Check if cases actually got written
  531. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  532. if not expected_file.exists():
  533. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  534. print(f"⚠️ [Warning] {err_msg}")
  535. global_errors.append(err_msg)
  536. # Phase 1.5: Extract raw post data from cache → raw_cases/source.json
  537. try:
  538. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  539. # 使用本次 pipeline 的 trace_ids 精确定位 cache 文件
  540. trace_id_list = [tid for tid in phase1_trace_ids.values() if tid]
  541. src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
  542. print(
  543. f"📎 [Source Extraction] "
  544. f"existing={src_stats['total_existing']} "
  545. f"new={src_stats['total_matched']} "
  546. f"total={src_stats['total_existing'] + src_stats['total_matched']} "
  547. f"→ {raw_cases_dir / 'source.json'}"
  548. )
  549. except Exception as e:
  550. err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
  551. print(f"⚠️ [Warning] {err_msg}")
  552. global_errors.append(err_msg)
  553. # Phase 1.6: Process sources with Claude → case_detailed.json
  554. source_file = raw_cases_dir / "source.json"
  555. if source_file.exists():
  556. try:
  557. from examples.process_pipeline.script.process_sources import process_sources
  558. print(f"\n--- Phase 1.6: Workflow Extraction ({claude_model}) ---")
  559. detailed_file = raw_cases_dir / "case_detailed.json"
  560. workflow_stats = await process_sources(
  561. source_file,
  562. detailed_file,
  563. claude_llm_call,
  564. model=claude_model,
  565. max_concurrent=3
  566. )
  567. total_cost += workflow_stats.get("total_cost", 0.0)
  568. costs_breakdown["P1.6_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
  569. print(
  570. f"🔍 [Workflow Extraction] "
  571. f"success={workflow_stats['success']} "
  572. f"failed={workflow_stats['failed']} "
  573. f"→ {detailed_file}"
  574. )
  575. except Exception as e:
  576. err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
  577. print(f"⚠️ [Warning] {err_msg}")
  578. global_errors.append(err_msg)
  579. else:
  580. print("\n⏭️ [Skip] Phase 1 Skipped via --skip-research. Using existing cases...")
  581. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  582. if not args.research_only:
  583. # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
  584. print(f"\n--- Phase 2: Parallel Distillation ({claude_model}) ---")
  585. blueprint_file = str(output_dir / "blueprint.json")
  586. capabilities_file = str(output_dir / "capabilities_extracted.json")
  587. raw_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  588. task_a = None
  589. task_b = None
  590. force_strategy_rerun = False
  591. if Path(blueprint_file).exists():
  592. print(f"⏭️ [Skip P2] blueprint.json already exists. Skipping P2_FilterBlueprint.")
  593. else:
  594. force_strategy_rerun = True
  595. if args.use_claude_sdk:
  596. print(" > Using [Anthropic SDK Core] for P2_FilterBlueprint")
  597. task_a = run_anthropic_sdk_task("filter_and_blueprint", {
  598. "requirement": requirement,
  599. "raw_files_glob": raw_glob,
  600. "output_file": blueprint_file
  601. }, "P2_FilterBlueprint", claude_model, skip_validation=is_single_step)
  602. else:
  603. print(" > Using [AgentRunner Core] for P2_FilterBlueprint")
  604. task_a = run_agent_task(runner_claude, "filter_and_blueprint", {
  605. "requirement": requirement,
  606. "raw_files_glob": raw_glob,
  607. "output_file": blueprint_file
  608. }, "P2_FilterBlueprint", claude_model, skip_validation=is_single_step)
  609. if Path(capabilities_file).exists():
  610. print(f"⏭️ [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.")
  611. else:
  612. force_strategy_rerun = True
  613. if args.use_claude_sdk:
  614. print(" > Using [Anthropic SDK Core] for P2_ExtractCaps")
  615. task_b = run_anthropic_sdk_task("extract_capabilities", {
  616. "requirement": requirement,
  617. "raw_files_glob": raw_glob,
  618. "output_file": capabilities_file
  619. }, "P2_ExtractCaps", claude_model, skip_validation=is_single_step)
  620. else:
  621. print(" > Using [AgentRunner Core] for P2_ExtractCaps")
  622. task_b = run_agent_task(runner_claude, "extract_capabilities", {
  623. "requirement": requirement,
  624. "raw_files_glob": raw_glob,
  625. "output_file": capabilities_file
  626. }, "P2_ExtractCaps", claude_model, skip_validation=is_single_step)
  627. to_await = []
  628. names_await = []
  629. if task_a:
  630. to_await.append(task_a)
  631. names_await.append("P2_FilterBlueprint")
  632. if task_b:
  633. to_await.append(task_b)
  634. names_await.append("P2_ExtractCaps")
  635. if to_await:
  636. phase2_results = await asyncio.gather(*to_await)
  637. phase2_trace_ids = {}
  638. for (cost, errs, trace_id), t_name in zip(phase2_results, names_await):
  639. total_cost += cost
  640. costs_breakdown[t_name] = round(cost, 4)
  641. phase2_trace_ids[t_name] = trace_id
  642. global_errors.extend(errs)
  643. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  644. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  645. strategy_file = str(output_dir / "strategy.json")
  646. if args.restart_mode == "single":
  647. force_strategy_rerun = False
  648. if Path(strategy_file).exists() and not force_strategy_rerun:
  649. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  650. else:
  651. if Path(strategy_file).exists():
  652. print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
  653. if args.use_claude_sdk:
  654. print(" > Using [Anthropic SDK Core]")
  655. phase3_cost, phase3_errs, phase3_trace_id = await run_anthropic_sdk_task("assemble_strategy", {
  656. "requirement": requirement,
  657. "blueprint_file": blueprint_file,
  658. "capabilities_file": capabilities_file,
  659. "output_file": strategy_file
  660. }, "P3_Assembler", claude_model, skip_validation=is_single_step)
  661. else:
  662. print(" > Using [AgentRunner Core]")
  663. phase3_cost, phase3_errs, phase3_trace_id = await run_agent_task(runner_claude, "assemble_strategy", {
  664. "requirement": requirement,
  665. "blueprint_file": blueprint_file,
  666. "capabilities_file": capabilities_file,
  667. "output_file": strategy_file
  668. }, "P3_Assembler", claude_model, skip_validation=is_single_step)
  669. total_cost += phase3_cost
  670. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  671. global_errors.extend(phase3_errs)
  672. else:
  673. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  674. end_time = time.time()
  675. elapsed_sec = end_time - start_time
  676. # Save Metrics
  677. metrics_file = base_dir / "run_metrics.json"
  678. metrics_data = []
  679. if metrics_file.exists():
  680. with open(metrics_file, "r", encoding="utf-8") as f:
  681. try:
  682. metrics_data = json.load(f)
  683. except json.JSONDecodeError:
  684. pass
  685. # Collect trace_ids from all phases
  686. trace_ids = {}
  687. if not args.skip_research and 'phase1_trace_ids' in dir():
  688. trace_ids.update(phase1_trace_ids)
  689. if not args.research_only:
  690. if 'phase2_trace_ids' in dir():
  691. trace_ids.update(phase2_trace_ids)
  692. if 'phase3_trace_id' in dir() and phase3_trace_id:
  693. trace_ids["P3_Assembler"] = phase3_trace_id
  694. metrics_data.append({
  695. "index": args.index,
  696. "requirement": requirement[:80] + "...",
  697. "duration_seconds": round(elapsed_sec, 2),
  698. "total_cost_usd": round(total_cost, 4),
  699. "costs_breakdown": costs_breakdown,
  700. "trace_ids": trace_ids,
  701. "errors": global_errors,
  702. "timestamp": datetime.now().isoformat()
  703. })
  704. with open(metrics_file, "w", encoding="utf-8") as f:
  705. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  706. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  707. finally:
  708. pass
  709. print("✅ Pipeline run finished.")
  710. if strategy_file:
  711. print("✅ Strategy saved to:", strategy_file)
  712. if __name__ == "__main__":
  713. asyncio.run(main())