run_pipeline.py 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import os
  8. import sys
  9. import time
  10. from datetime import datetime
  11. from pathlib import Path
  12. PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
  13. # Force runtime working directory to project root so relative trace/cache paths
  14. # always land in the repo root no matter where this script is launched from.
  15. os.chdir(PROJECT_ROOT)
  16. # Add project root to path
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. from agent.llm.prompts import SimplePrompt
  21. from agent.core.runner import AgentRunner, RunConfig
  22. from agent.tools.builtin.knowledge import KnowledgeConfig
  23. from agent.trace import FileSystemTraceStore, Trace, Message
  24. from agent.llm import create_qwen_llm_call
  25. from agent.llm.openrouter import create_openrouter_llm_call
  26. from agent.llm.claude import create_claude_llm_call
  27. # config from existing setup
  28. from examples.process_research.config import (
  29. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  30. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  31. )
  32. from agent.utils import setup_logging
  33. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  34. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  35. base_dir = Path(__file__).parent
  36. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  37. prompt = SimplePrompt(prompt_path)
  38. messages = prompt.build_messages(**kwargs)
  39. target_tools = []
  40. if prompt_name == "extract_capabilities":
  41. target_tools = ["capability_search", "capability_list", "tool_search"]
  42. # 按 agent 类型配置工具组权限
  43. tool_groups_map = {
  44. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  45. "filter_and_blueprint": ["core"], # 只需文件读写
  46. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  47. "assemble_strategy": ["core"], # 只需文件读写
  48. }
  49. total_task_cost = 0.0
  50. task_errors = []
  51. out_file = kwargs.get("output_file")
  52. max_retries = 3
  53. last_trace_id = None
  54. last_validation_error = None
  55. final_trace_id = None # 用于返回最终成功的 trace_id
  56. def _instant_validate():
  57. """文件写出后立即校验并尝试修复,返回 error string 或 None"""
  58. nonlocal last_validation_error
  59. if not out_file or not Path(out_file).exists():
  60. return None
  61. try:
  62. with open(out_file, "r", encoding="utf-8") as f:
  63. raw = f.read()
  64. try:
  65. data = json.loads(raw)
  66. except json.JSONDecodeError:
  67. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  68. ok, data, desc = try_fix_and_parse(raw)
  69. if ok:
  70. with open(out_file, "w", encoding="utf-8") as f:
  71. json.dump(data, f, ensure_ascii=False, indent=2)
  72. print(f" 🔧 [Instant Fix] {desc}")
  73. else:
  74. last_validation_error = "JSON parse failed, auto-fix unsuccessful"
  75. return last_validation_error
  76. filename = Path(out_file).name
  77. err = None
  78. if filename.startswith("case_"):
  79. err = validate_case(data)
  80. elif filename == "blueprint.json":
  81. err = validate_blueprint(data)
  82. elif filename == "capabilities_extracted.json":
  83. err = validate_capabilities(data)
  84. elif filename == "strategy.json":
  85. err = validate_strategy(data)
  86. if err:
  87. last_validation_error = err
  88. print(f" ⚠️ [Instant Validation] {err}")
  89. return err
  90. else:
  91. print(f" ✅ [Instant Validation] {filename} OK")
  92. return None
  93. except Exception as e:
  94. last_validation_error = str(e)
  95. return str(e)
  96. for attempt in range(max_retries):
  97. if attempt > 0 and last_trace_id and last_validation_error:
  98. # 续跑模式:把错误信息告诉之前的 agent,让它修复
  99. print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
  100. fix_messages = [{
  101. "role": "user",
  102. "content": (
  103. f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
  104. f"错误详情:{last_validation_error}\n\n"
  105. f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
  106. f"只修复有问题的部分,不要丢弃已有的正确内容。"
  107. )
  108. }]
  109. fix_config = RunConfig(
  110. model=prompt.config.get("model") or model_name,
  111. temperature=0.1,
  112. name=f"{task_name}_Fix{attempt}",
  113. agent_type=prompt_name,
  114. tools=target_tools,
  115. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  116. trace_id=last_trace_id,
  117. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  118. )
  119. try:
  120. async for item in runner.run(messages=fix_messages, config=fix_config):
  121. if isinstance(item, Trace):
  122. last_trace_id = item.trace_id
  123. if item.status == "completed":
  124. total_task_cost += item.total_cost
  125. elif item.status == "failed":
  126. task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
  127. if isinstance(item, Message) and item.role == "tool":
  128. content = item.content if isinstance(item.content, dict) else {}
  129. if content.get("tool_name") in ("write_file", "write_json"):
  130. print(f" 💾 [Fix File Written by {task_name}]")
  131. _instant_validate()
  132. except Exception as e:
  133. err_msg = f"{type(e).__name__}: {e}"
  134. print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
  135. task_errors.append(f"{task_name} fix crashed: {err_msg}")
  136. elif attempt > 0:
  137. # 没有 trace_id 或没有 validation error,只能完全重跑
  138. print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
  139. if out_file and Path(out_file).exists():
  140. Path(out_file).unlink()
  141. run_config = RunConfig(
  142. model=prompt.config.get("model") or model_name,
  143. temperature=prompt.config.get("temperature") or 0.3,
  144. name=f"{task_name}_A{attempt}",
  145. agent_type=prompt_name,
  146. tools=target_tools,
  147. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  148. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  149. )
  150. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  151. try:
  152. async for item in runner.run(messages=messages, config=run_config):
  153. if isinstance(item, Trace):
  154. last_trace_id = item.trace_id
  155. if item.status == "completed":
  156. total_task_cost += item.total_cost
  157. elif item.status == "failed":
  158. task_errors.append(f"{task_name} Failed: {item.error_message}")
  159. if isinstance(item, Message):
  160. if item.role == "tool":
  161. content = item.content if isinstance(item.content, dict) else {}
  162. t_name = content.get("tool_name", "unknown")
  163. if t_name in ("write_file", "write_json"):
  164. print(f" 💾 [File Written by {task_name}]")
  165. _instant_validate()
  166. except Exception as e:
  167. err_msg = f"{type(e).__name__}: {e}"
  168. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  169. task_errors.append(f"{task_name} crashed: {err_msg}")
  170. else:
  171. # 首次执行
  172. run_config = RunConfig(
  173. model=prompt.config.get("model") or model_name,
  174. temperature=prompt.config.get("temperature") or 0.3,
  175. name=f"{task_name}_A{attempt}",
  176. agent_type=prompt_name,
  177. tools=target_tools,
  178. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  179. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  180. )
  181. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  182. try:
  183. async for item in runner.run(messages=messages, config=run_config):
  184. if isinstance(item, Trace):
  185. last_trace_id = item.trace_id
  186. if item.status == "completed":
  187. total_task_cost += item.total_cost
  188. elif item.status == "failed":
  189. task_errors.append(f"{task_name} Failed: {item.error_message}")
  190. if isinstance(item, Message):
  191. if item.role == "tool":
  192. content = item.content if isinstance(item.content, dict) else {}
  193. t_name = content.get("tool_name", "unknown")
  194. if t_name in ("write_file", "write_json"):
  195. print(f" 💾 [File Written by {task_name}]")
  196. _instant_validate()
  197. except Exception as e:
  198. err_msg = f"{type(e).__name__}: {e}"
  199. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  200. task_errors.append(f"{task_name} crashed: {err_msg}")
  201. # Verification & Recovery block
  202. if out_file and not Path(out_file).exists() and last_trace_id:
  203. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  204. recovery_messages = [{
  205. "role": "user",
  206. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  207. }]
  208. rec_config = RunConfig(
  209. model=model_name,
  210. temperature=0.1,
  211. name=task_name + "_Rec",
  212. agent_type=prompt_name,
  213. trace_id=last_trace_id,
  214. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  215. )
  216. try:
  217. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  218. if isinstance(r_item, Trace):
  219. if r_item.status == "completed":
  220. total_task_cost += r_item.total_cost
  221. elif r_item.status == "failed":
  222. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  223. if isinstance(r_item, Message) and r_item.role == "tool":
  224. content = r_item.content if isinstance(r_item.content, dict) else {}
  225. if content.get("tool_name") in ("write_file", "write_json"):
  226. print(f" 💾 [Recovery File Written by {task_name}]")
  227. _instant_validate()
  228. except Exception as e:
  229. err_msg = f"{type(e).__name__}: {e}"
  230. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  231. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  232. # Schema Validation (with auto-fix layer)
  233. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  234. if skip_validation:
  235. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  236. return total_task_cost, task_errors, last_trace_id
  237. try:
  238. with open(out_file, "r", encoding="utf-8") as f:
  239. raw_content = f.read()
  240. # Layer 1: 尝试直接解析
  241. try:
  242. data = json.loads(raw_content)
  243. except json.JSONDecodeError as parse_err:
  244. # Layer 2: 自动修复 JSON 语法错误
  245. print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...")
  246. try:
  247. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  248. success, data, fix_desc = try_fix_and_parse(raw_content)
  249. if success:
  250. # 修复成功,写回文件
  251. with open(out_file, "w", encoding="utf-8") as f:
  252. json.dump(data, f, ensure_ascii=False, indent=2)
  253. print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}")
  254. else:
  255. raise parse_err # 修复失败,抛出原始错误
  256. except ImportError:
  257. raise parse_err # fix_json_quotes 不可用,抛出原始错误
  258. filename = Path(out_file).name
  259. err = None
  260. if filename.startswith("case_"):
  261. err = validate_case(data)
  262. elif filename == "blueprint.json":
  263. err = validate_blueprint(data)
  264. elif filename == "capabilities_extracted.json":
  265. err = validate_capabilities(data)
  266. elif filename == "strategy.json":
  267. err = validate_strategy(data)
  268. if err:
  269. raise ValueError(f"Schema Validation Failed: {err}")
  270. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  271. final_trace_id = last_trace_id
  272. return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
  273. except Exception as e:
  274. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  275. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  276. task_errors.append(f"{task_name} Error: {e}")
  277. last_validation_error = str(e)
  278. if attempt == max_retries - 1:
  279. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  280. return total_task_cost, task_errors, last_trace_id
  281. else:
  282. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  283. last_validation_error = None
  284. if attempt == max_retries - 1:
  285. return total_task_cost, task_errors, last_trace_id
  286. return total_task_cost, task_errors, last_trace_id
  287. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  288. """
  289. 备用:使用纯净的官方 Anthropic SDK 驱动。
  290. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  291. """
  292. from anthropic import AsyncAnthropic
  293. from agent.tools.registry import get_tool_registry
  294. base_dir = Path(__file__).parent
  295. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  296. prompt = SimplePrompt(prompt_path)
  297. # 1. 组装输入 Message
  298. raw_messages = prompt.build_messages(**kwargs)
  299. system_prompt = ""
  300. messages = []
  301. for msg in raw_messages:
  302. if msg["role"] == "system":
  303. system_prompt += msg["content"] + "\n\n"
  304. else:
  305. messages.append({"role": msg["role"], "content": msg["content"]})
  306. # 2. 映射目标工具
  307. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  308. if prompt_name == "extract_capabilities":
  309. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  310. registry = get_tool_registry()
  311. schemas = registry.get_schemas(target_tools)
  312. anthropic_tools = []
  313. for s in schemas:
  314. anthropic_tools.append({
  315. "name": s["function"]["name"],
  316. "description": s["function"].get("description", ""),
  317. "input_schema": s["function"]["parameters"]
  318. })
  319. # 3. 初始化并开启 Loop
  320. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  321. client = AsyncAnthropic()
  322. total_task_cost = 0.0
  323. task_errors = []
  324. sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
  325. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  326. out_file = kwargs.get("output_file")
  327. max_retries = 3
  328. for attempt in range(max_retries):
  329. if attempt > 0:
  330. print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
  331. if out_file and Path(out_file).exists():
  332. Path(out_file).unlink()
  333. print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
  334. # Reset messages for retry
  335. messages_copy = list(messages)
  336. max_loops = 50
  337. for loop_idx in range(max_loops):
  338. try:
  339. # 去除前缀(兼容比如 openrouter 传入的名字)
  340. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  341. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
  342. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  343. response = await client.messages.create(
  344. model=target_model,
  345. max_tokens=4096,
  346. temperature=0.2,
  347. system=system_prompt,
  348. messages=messages_copy,
  349. tools=anthropic_tools
  350. )
  351. # (简略预估,不代表真实官方开销)
  352. if hasattr(response, 'usage'):
  353. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  354. total_task_cost += step_cost
  355. # 加入助手回复
  356. assistant_content = []
  357. tool_uses = []
  358. for content_block in response.content:
  359. if content_block.type == "text":
  360. text_val = content_block.text
  361. if text_val:
  362. assistant_content.append({"type": "text", "text": text_val})
  363. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  364. elif content_block.type == "tool_use":
  365. assistant_content.append({
  366. "type": "tool_use",
  367. "id": content_block.id,
  368. "name": content_block.name,
  369. "input": content_block.input
  370. })
  371. tool_uses.append(content_block)
  372. if not assistant_content:
  373. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  374. messages_copy.append({"role": "assistant", "content": assistant_content})
  375. # 出口:没有调用工具说明任务结束
  376. if not tool_uses:
  377. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
  378. break
  379. # 工具执行与回传
  380. tool_results = []
  381. for tu in tool_uses:
  382. if tu.name in ("write_file", "write_json"):
  383. print(f" 💾 [File Written by SDK] {task_name}")
  384. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  385. # 执行本地环境的函数
  386. result_str = await registry.execute(tu.name, tu.input)
  387. tool_results.append({
  388. "type": "tool_result",
  389. "tool_use_id": tu.id,
  390. "content": result_str
  391. })
  392. messages_copy.append({"role": "user", "content": tool_results})
  393. except Exception as e:
  394. err_msg = str(e)
  395. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  396. task_errors.append(err_msg)
  397. break
  398. # Verification & Recovery block for SDK (porting from AgentRunner)
  399. if out_file and not Path(out_file).exists():
  400. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  401. messages_copy.append({
  402. "role": "user",
  403. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  404. })
  405. try:
  406. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  407. rec_response = await client.messages.create(
  408. model=target_model,
  409. max_tokens=4096,
  410. temperature=0.1,
  411. system=system_prompt,
  412. messages=messages_copy,
  413. tools=anthropic_tools,
  414. tool_choice={"type": "any"}
  415. )
  416. for content_block in rec_response.content:
  417. if content_block.type == "tool_use":
  418. if content_block.name in ("write_file", "write_json"):
  419. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  420. await registry.execute(content_block.name, content_block.input)
  421. except Exception as e:
  422. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  423. task_errors.append(str(e))
  424. # Schema Validation
  425. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  426. if skip_validation:
  427. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  428. return total_task_cost, task_errors, None
  429. try:
  430. with open(out_file, "r", encoding="utf-8") as f:
  431. data = json.loads(f.read())
  432. filename = Path(out_file).name
  433. err = None
  434. if filename.startswith("case_"):
  435. err = validate_case(data)
  436. elif filename == "blueprint.json":
  437. err = validate_blueprint(data)
  438. elif filename == "capabilities_extracted.json":
  439. err = validate_capabilities(data)
  440. elif filename == "strategy.json":
  441. err = validate_strategy(data)
  442. if err:
  443. raise ValueError(f"Schema Validation Failed: {err}")
  444. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  445. return total_task_cost, task_errors # Success! Exit retry loop.
  446. except Exception as e:
  447. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  448. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  449. task_errors.append(f"{task_name} Error: {e}")
  450. if attempt == max_retries - 1:
  451. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  452. return total_task_cost, task_errors
  453. else:
  454. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  455. if attempt == max_retries - 1:
  456. return total_task_cost, task_errors
  457. return total_task_cost, task_errors
  458. async def main():
  459. parser = argparse.ArgumentParser()
  460. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  461. STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"]
  462. parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search")
  463. parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
  464. parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step")
  465. parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)")
  466. parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)")
  467. parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)")
  468. parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)")
  469. parser.add_argument("--use-claude-sdk", action="store_true", help="Use Claude SDK (CLAUDE_CODE_KEY/URL) instead of OpenRouter")
  470. args = parser.parse_args()
  471. # ── 参数验证 ──
  472. # --case-index 只能与提取步骤一起使用
  473. if args.case_index is not None:
  474. extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"}
  475. if args.only_step and args.only_step not in extraction_steps:
  476. print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}")
  477. sys.exit(1)
  478. if not args.only_step:
  479. print("❌ Error: --case-index requires --only-step to specify which extraction step to run")
  480. sys.exit(1)
  481. # ── 三种模式互斥检查 ──
  482. mode_count = sum([
  483. args.only_step is not None,
  484. args.phase is not None,
  485. (args.start_from is not None or args.end_at is not None),
  486. ])
  487. if mode_count > 1:
  488. print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.")
  489. sys.exit(1)
  490. # 定义步骤拓扑(非线性,Phase 2 有两条并行分支)
  491. STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查
  492. def _step_in_range(step, start, end):
  493. """判断线性前缀中的 step 是否在 [start, end] 范围内"""
  494. all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"]
  495. if step not in all_steps or start not in all_steps:
  496. return False
  497. # 对于线性前缀,用索引比较
  498. if step in LINEAR_PREFIX:
  499. s_idx = LINEAR_PREFIX.index(step)
  500. start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1
  501. # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑
  502. if start_idx < 0:
  503. return False
  504. return s_idx >= start_idx
  505. return False
  506. PHASE_MAP = {
  507. 1: {"research", "source", "case-detailed"},
  508. 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"},
  509. 3: {"strategy"},
  510. }
  511. BRANCH_21 = ["process-cluster", "process-score"]
  512. BRANCH_22 = ["capability-enrich"]
  513. # 线性前缀(Phase 2 之前)
  514. LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"]
  515. # 计算需要执行的步骤集合
  516. active_steps = None # None = 全部执行
  517. if args.only_step:
  518. active_steps = {args.only_step}
  519. elif args.phase:
  520. active_steps = PHASE_MAP[args.phase]
  521. elif args.start_from or args.end_at:
  522. start = args.start_from or "research"
  523. end = args.end_at or "strategy"
  524. # 构建 active_steps,考虑并行分支
  525. active = set()
  526. # 1. 线性前缀部分
  527. for s in LINEAR_PREFIX:
  528. if _step_in_range(s, start, end):
  529. active.add(s)
  530. # 2. Phase 2 分支部分
  531. start_in_21 = start in BRANCH_21
  532. start_in_22 = start in BRANCH_22
  533. end_in_21 = end in BRANCH_21
  534. end_in_22 = end in BRANCH_22
  535. end_is_strategy = end == "strategy"
  536. # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑
  537. if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX):
  538. # 两条分支都包含(如果 start 允许的话)
  539. if not start_in_21 and not start_in_22:
  540. # start 在 Phase 2 之前或就是 Phase 2 的开头
  541. active.update(BRANCH_21)
  542. active.update(BRANCH_22)
  543. elif start_in_21:
  544. # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑
  545. idx = BRANCH_21.index(start)
  546. active.update(BRANCH_21[idx:])
  547. active.update(BRANCH_22)
  548. elif start_in_22:
  549. # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑
  550. idx = BRANCH_22.index(start)
  551. active.update(BRANCH_22[idx:])
  552. active.update(BRANCH_21)
  553. elif end_in_21:
  554. # end 在 2.1 分支内,只跑 2.1
  555. end_idx = BRANCH_21.index(end)
  556. start_idx = BRANCH_21.index(start) if start_in_21 else 0
  557. active.update(BRANCH_21[start_idx:end_idx + 1])
  558. elif end_in_22:
  559. # end 在 2.2 分支内,只跑 2.2
  560. end_idx = BRANCH_22.index(end)
  561. start_idx = BRANCH_22.index(start) if start_in_22 else 0
  562. active.update(BRANCH_22[start_idx:end_idx + 1])
  563. # 3. strategy
  564. if end_is_strategy:
  565. active.add("strategy")
  566. active_steps = active
  567. def should_run(step_name: str) -> bool:
  568. if active_steps is None:
  569. return True
  570. return step_name in active_steps
  571. if active_steps is not None:
  572. active_list = [s for s in STEP_ORDER if s in active_steps]
  573. print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}")
  574. base_dir = Path(__file__).parent
  575. # Load requirements locally
  576. req_path = base_dir / "db_requirements.json"
  577. with open(req_path, encoding='utf-8') as f:
  578. reqs = json.load(f)
  579. if args.index < 0 or args.index >= len(reqs):
  580. print("Index out of bounds")
  581. sys.exit(1)
  582. requirement = reqs[args.index]
  583. # 0. Setup directories
  584. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  585. output_dir.mkdir(parents=True, exist_ok=True)
  586. raw_cases_dir = output_dir / "raw_cases"
  587. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  588. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  589. print("=" * 60)
  590. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  591. print("=" * 60)
  592. # ── 前置文件检查 ──
  593. # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在)
  594. STEP_DEPS = {
  595. "research": [],
  596. "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))],
  597. "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())],
  598. "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
  599. "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
  600. "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())],
  601. "process-cluster": [
  602. ("case.json", lambda: (output_dir / "case.json").exists()),
  603. ],
  604. "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())],
  605. "capability-enrich": [
  606. ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()),
  607. ("case.json", lambda: (output_dir / "case.json").exists()),
  608. ],
  609. "strategy": [
  610. ("process.json", lambda: (output_dir / "process.json").exists()),
  611. ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()),
  612. ],
  613. }
  614. # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖)
  615. STEP_PRODUCES = {
  616. "research": {"case_*.json"},
  617. "source": {"source.json"},
  618. "generate-case": {"case.json"},
  619. "workflow-extract": {"case.json"}, # 原地更新
  620. "capability-extract": {"case.json"}, # 原地更新
  621. "apply-grounding": {"case.json"}, # 原地更新
  622. "process-cluster": {"blueprint_temp.json"},
  623. "process-score": {"process.json"},
  624. "capability-enrich": {"capabilities.json"},
  625. "strategy": {"strategy.json"},
  626. }
  627. if active_steps is not None:
  628. # 计算本次运行会生成的文件集合
  629. will_produce = set()
  630. for s in STEP_ORDER:
  631. if s in active_steps:
  632. will_produce.update(STEP_PRODUCES.get(s, set()))
  633. # 检查每个 active step 的前置文件
  634. missing = []
  635. for s in STEP_ORDER:
  636. if s not in active_steps:
  637. continue
  638. for dep_name, dep_check in STEP_DEPS.get(s, []):
  639. if dep_name in will_produce:
  640. continue # 上游 step 会在本次运行中生成
  641. if not dep_check():
  642. missing.append((s, dep_name))
  643. if missing:
  644. print(f"\n❌ [Pre-flight Check] Missing prerequisite files:")
  645. for step, dep in missing:
  646. print(f" - Step '{step}' requires '{dep}'")
  647. print(f"\nRun upstream steps first, or use --start-from to include them.")
  648. sys.exit(1)
  649. else:
  650. print(f"✅ [Pre-flight Check] All prerequisites satisfied")
  651. # Load presets
  652. presets_path = base_dir / "presets.json"
  653. if presets_path.exists():
  654. from agent.core.presets import load_presets_from_json
  655. load_presets_from_json(str(presets_path))
  656. print("✅ Configured Agent Presets (Skills Boundaries)")
  657. # Browser initialization removed to save resources
  658. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  659. # Instantiate two distinct LLM orchestrators
  660. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  661. # 根据 --use-claude-sdk 参数选择 LLM 提供商
  662. if args.use_claude_sdk:
  663. # 使用 Claude SDK (CLAUDE_CODE_KEY/URL 或 ANTHROPIC_API_KEY/BASE_URL)
  664. claude_model = "claude-sonnet-4-5"
  665. print(f"✅ Using Claude SDK with model: {claude_model}")
  666. print(f" API Key: {os.getenv('CLAUDE_CODE_KEY', 'N/A')[:20]}...")
  667. print(f" Base URL: {os.getenv('CLAUDE_CODE_URL', os.getenv('ANTHROPIC_BASE_URL', 'https://api.anthropic.com'))}")
  668. claude_llm_call = create_claude_llm_call(model=claude_model)
  669. else:
  670. # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode)
  671. claude_model = "openai/gpt-5.4"
  672. print(f"✅ Using OpenRouter with model: {claude_model}")
  673. from agent.llm.openrouter import create_openrouter_llm_call
  674. claude_llm_call = create_openrouter_llm_call(model=claude_model)
  675. runner_qwen = AgentRunner(
  676. trace_store=store,
  677. llm_call=create_qwen_llm_call(model=qwen_model),
  678. skills_dir=SKILLS_DIR
  679. )
  680. runner_claude = AgentRunner(
  681. trace_store=store,
  682. llm_call=claude_llm_call,
  683. skills_dir=SKILLS_DIR
  684. )
  685. try:
  686. start_time = time.time()
  687. total_cost = 0.0
  688. costs_breakdown = {}
  689. global_errors = []
  690. strategy_file = None
  691. # ── --only-step 单步执行模式 ──────────────────────────────
  692. if args.only_step:
  693. step = args.only_step
  694. source_file = raw_cases_dir / "source.json"
  695. detailed_file = raw_cases_dir / "case_detailed.json"
  696. blueprint_temp_file = output_dir / "blueprint_temp.json"
  697. capabilities_temp_file = output_dir / "capabilities_temp.json"
  698. process_file = output_dir / "process.json"
  699. capabilities_file = output_dir / "capabilities.json"
  700. print(f"\n[Single Step Mode] Running only: {step}")
  701. if step == "research":
  702. # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响
  703. single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  704. if not single_platforms:
  705. print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube")
  706. sys.exit(1)
  707. print(f" 🔍 Research platforms: {single_platforms}")
  708. single_is_single = args.restart_mode.startswith("single_")
  709. phase1_tasks = []
  710. for p in single_platforms:
  711. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  712. out_file = str(raw_cases_dir / f"case_{p}.json")
  713. kwargs = {
  714. "task": task_desc,
  715. "output_file": out_file
  716. }
  717. phase1_tasks.append(
  718. run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single)
  719. )
  720. phase1_results = await asyncio.gather(*phase1_tasks)
  721. for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms):
  722. print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}")
  723. elif step == "source":
  724. # Phase 1.5: 提取原始 source.json
  725. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  726. result = extract_sources_to_json(raw_cases_dir)
  727. print(f" ✓ source.json: matched={result['total_matched']}")
  728. elif step == "generate-case":
  729. # Phase 1.5.5: 生成标准化 case.json
  730. from examples.process_pipeline.script.generate_case import generate_case_from_source
  731. result = await generate_case_from_source(raw_cases_dir)
  732. print(f" ✓ case.json: cases={result['total_cases']}")
  733. elif step == "workflow-extract":
  734. # Phase 1.6a: 提取 workflow 到 case.json
  735. case_file = output_dir / "case.json"
  736. if not case_file.exists():
  737. print(f" ❌ case.json not found. Run --only-step generate-case first.")
  738. sys.exit(1)
  739. # 如果指定了 --case-index,先过滤 case.json
  740. if args.case_index is not None:
  741. with open(case_file, "r", encoding="utf-8") as f:
  742. case_data = json.load(f)
  743. original_cases = case_data.get("cases", [])
  744. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  745. if not target_case:
  746. print(f" ❌ Case with index {args.case_index} not found in case.json")
  747. sys.exit(1)
  748. # 临时只保留目标 case
  749. case_data["cases"] = [target_case]
  750. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  751. with open(temp_case_file, "w", encoding="utf-8") as f:
  752. json.dump(case_data, f, ensure_ascii=False, indent=2)
  753. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  754. case_file_to_use = temp_case_file
  755. else:
  756. case_file_to_use = case_file
  757. from examples.process_pipeline.script.extract_workflow import extract_workflow
  758. result = await extract_workflow(
  759. case_file_to_use,
  760. claude_llm_call, model=claude_model
  761. )
  762. # 如果使用了临时文件,需要合并回原始 case.json
  763. if args.case_index is not None:
  764. with open(case_file_to_use, "r", encoding="utf-8") as f:
  765. updated_data = json.load(f)
  766. updated_case = updated_data["cases"][0]
  767. # 更新原始文件中的对应 case
  768. with open(case_file, "r", encoding="utf-8") as f:
  769. original_data = json.load(f)
  770. for i, c in enumerate(original_data["cases"]):
  771. if c.get("index") == args.case_index:
  772. original_data["cases"][i] = updated_case
  773. break
  774. with open(case_file, "w", encoding="utf-8") as f:
  775. json.dump(original_data, f, ensure_ascii=False, indent=2)
  776. temp_case_file.unlink() # 删除临时文件
  777. print(f" ✓ Merged case {args.case_index} back to case.json")
  778. print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}")
  779. elif step == "capability-extract":
  780. # Phase 1.6b: 提取 capabilities 到 case.json
  781. case_file = output_dir / "case.json"
  782. if not case_file.exists():
  783. print(f" ❌ case.json not found. Run --only-step generate-case first.")
  784. sys.exit(1)
  785. # 如果指定了 --case-index,先过滤 case.json
  786. if args.case_index is not None:
  787. with open(case_file, "r", encoding="utf-8") as f:
  788. case_data = json.load(f)
  789. original_cases = case_data.get("cases", [])
  790. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  791. if not target_case:
  792. print(f" ❌ Case with index {args.case_index} not found in case.json")
  793. sys.exit(1)
  794. case_data["cases"] = [target_case]
  795. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  796. with open(temp_case_file, "w", encoding="utf-8") as f:
  797. json.dump(case_data, f, ensure_ascii=False, indent=2)
  798. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  799. case_file_to_use = temp_case_file
  800. else:
  801. case_file_to_use = case_file
  802. from examples.process_pipeline.script.extract_capability import extract_capability
  803. result = await extract_capability(
  804. case_file_to_use,
  805. claude_llm_call, model=claude_model
  806. )
  807. # 如果使用了临时文件,需要合并回原始 case.json
  808. if args.case_index is not None:
  809. with open(case_file_to_use, "r", encoding="utf-8") as f:
  810. updated_data = json.load(f)
  811. updated_case = updated_data["cases"][0]
  812. with open(case_file, "r", encoding="utf-8") as f:
  813. original_data = json.load(f)
  814. for i, c in enumerate(original_data["cases"]):
  815. if c.get("index") == args.case_index:
  816. original_data["cases"][i] = updated_case
  817. break
  818. with open(case_file, "w", encoding="utf-8") as f:
  819. json.dump(original_data, f, ensure_ascii=False, indent=2)
  820. temp_case_file.unlink()
  821. print(f" ✓ Merged case {args.case_index} back to case.json")
  822. print(f" ✓ case.json + capabilities: success={result['success']}, failed={result['failed']}")
  823. elif step == "apply-grounding":
  824. # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to
  825. case_file = output_dir / "case.json"
  826. if not case_file.exists():
  827. print(f" ❌ case.json not found. Run workflow-extract and capability-extract first.")
  828. sys.exit(1)
  829. # 如果指定了 --case-index,先过滤 case.json
  830. if args.case_index is not None:
  831. with open(case_file, "r", encoding="utf-8") as f:
  832. case_data = json.load(f)
  833. original_cases = case_data.get("cases", [])
  834. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  835. if not target_case:
  836. print(f" ❌ Case with index {args.case_index} not found in case.json")
  837. sys.exit(1)
  838. case_data["cases"] = [target_case]
  839. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  840. with open(temp_case_file, "w", encoding="utf-8") as f:
  841. json.dump(case_data, f, ensure_ascii=False, indent=2)
  842. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  843. case_file_to_use = temp_case_file
  844. else:
  845. case_file_to_use = case_file
  846. from examples.process_pipeline.script.apply_to_grounding import apply_grounding
  847. result = await apply_grounding(
  848. case_file_to_use,
  849. claude_llm_call, model=claude_model
  850. )
  851. # 如果使用了临时文件,需要合并回原始 case.json
  852. if args.case_index is not None:
  853. with open(case_file_to_use, "r", encoding="utf-8") as f:
  854. updated_data = json.load(f)
  855. updated_case = updated_data["cases"][0]
  856. with open(case_file, "r", encoding="utf-8") as f:
  857. original_data = json.load(f)
  858. for i, c in enumerate(original_data["cases"]):
  859. if c.get("index") == args.case_index:
  860. original_data["cases"][i] = updated_case
  861. break
  862. with open(case_file, "w", encoding="utf-8") as f:
  863. json.dump(original_data, f, ensure_ascii=False, indent=2)
  864. temp_case_file.unlink()
  865. print(f" ✓ Merged case {args.case_index} back to case.json")
  866. print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}")
  867. elif step == "process-cluster":
  868. # Phase 2.1.1: 工序聚类
  869. if not detailed_file.exists():
  870. print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
  871. sys.exit(1)
  872. from examples.process_pipeline.script.cluster_processes import cluster_processes
  873. result = await cluster_processes(
  874. source_file=source_file, detailed_file=detailed_file,
  875. output_file=blueprint_temp_file, requirement=requirement,
  876. llm_call=claude_llm_call, model=claude_model,
  877. )
  878. print(f" ✓ blueprint_temp.json: clusters={result['clusters']}")
  879. elif step == "process-score":
  880. # Phase 2.1.2: 工序打分
  881. if not blueprint_temp_file.exists():
  882. print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.")
  883. sys.exit(1)
  884. from examples.process_pipeline.script.score_processes import score_blueprints
  885. result = await score_blueprints(
  886. blueprint_file=blueprint_temp_file, output_file=process_file,
  887. requirement=requirement, llm_call=claude_llm_call, model=claude_model,
  888. )
  889. print(f" ✓ process.json: scored={result['scored']}")
  890. elif step == "capability-extract":
  891. # Phase 2.2.1: 能力初步聚类
  892. if not detailed_file.exists():
  893. print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
  894. sys.exit(1)
  895. from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
  896. result = await extract_capabilities_workflow(
  897. detailed_file=detailed_file, source_file=source_file,
  898. output_file=capabilities_temp_file, requirement=requirement,
  899. llm_call=claude_llm_call, model=claude_model,
  900. )
  901. print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}")
  902. elif step == "capability-enrich":
  903. # Phase 2.2.2: 能力丰富化
  904. if not capabilities_temp_file.exists():
  905. print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.")
  906. sys.exit(1)
  907. if not source_file.exists():
  908. print(f" ❌ source.json not found. Run --only-step source first.")
  909. sys.exit(1)
  910. from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
  911. result = await enrich_all_capabilities(
  912. capabilities_temp_file=capabilities_temp_file, source_file=source_file,
  913. output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model,
  914. )
  915. print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}")
  916. elif step == "strategy":
  917. # Phase 3: 策略组装
  918. if not process_file.exists():
  919. print(f" ❌ process.json not found. Run --only-step process-score first.")
  920. sys.exit(1)
  921. if not capabilities_file.exists():
  922. print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.")
  923. sys.exit(1)
  924. strategy_file_path = output_dir / "strategy.json"
  925. from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
  926. result = await assemble_strategy(
  927. process_file=process_file, capabilities_file=capabilities_file,
  928. output_file=strategy_file_path, requirement=requirement,
  929. llm_call=claude_llm_call, model=claude_model,
  930. )
  931. print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}")
  932. elapsed = time.time() - start_time
  933. print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s")
  934. return
  935. # ── 正常 pipeline 流程 ──────────────────────────────
  936. existing_platforms = []
  937. if raw_cases_dir.exists():
  938. for f in raw_cases_dir.glob("case_*.json"):
  939. plat = f.stem.replace("case_", "")
  940. if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
  941. existing_platforms.append(plat)
  942. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  943. # Phase 0: Platform Selection (controlled by --platforms)
  944. if should_run("research"):
  945. new_platforms = [p for p in platforms if p not in existing_platforms]
  946. if not new_platforms:
  947. print(f"\n--- Phase 0: Skipping Research (All specified platforms already exist: {existing_platforms}) ---")
  948. platforms = []
  949. else:
  950. print(f"\n--- Phase 0: Using specified platforms ---")
  951. print(f"📡 Found existing cases: {existing_platforms}. Will research new platforms: {new_platforms}")
  952. platforms = new_platforms
  953. is_single_step = args.restart_mode.startswith("single_")
  954. # Phase 1: MAP (Parallel Search) uses Qwen
  955. if should_run("research"):
  956. print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
  957. phase1_tasks = []
  958. for p in platforms:
  959. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  960. out_file = str(raw_cases_dir / f"case_{p}.json")
  961. kwargs = {
  962. "task": task_desc,
  963. "output_file": out_file
  964. }
  965. phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=is_single_step))
  966. phase1_results = await asyncio.gather(*phase1_tasks)
  967. phase1_trace_ids = {}
  968. for (task_cost, task_errors, trace_id), p in zip(phase1_results, platforms):
  969. total_cost += task_cost
  970. costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
  971. phase1_trace_ids[f"P1_Research_{p}"] = trace_id
  972. global_errors.extend(task_errors)
  973. # Check if cases actually got written
  974. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  975. if not expected_file.exists():
  976. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  977. print(f"⚠️ [Warning] {err_msg}")
  978. global_errors.append(err_msg)
  979. else:
  980. print("\n⏭️ [Skip] Phase 1 Skipped. Using existing cases...")
  981. # Phase 1.5: Extract raw post data from cache → raw_cases/source.json
  982. if should_run("source"):
  983. try:
  984. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  985. trace_id_list = [tid for tid in phase1_trace_ids.values() if tid] if 'phase1_trace_ids' in dir() else None
  986. src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
  987. print(
  988. f"📎 [Source Extraction] "
  989. f"matched={src_stats['total_matched']} "
  990. f"→ {raw_cases_dir / 'source.json'}"
  991. )
  992. except Exception as e:
  993. err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
  994. print(f"⚠️ [Warning] {err_msg}")
  995. global_errors.append(err_msg)
  996. # Phase 1.3: Generate case.json from source.json
  997. if should_run("generate-case"):
  998. source_file = raw_cases_dir / "source.json"
  999. case_file = output_dir / "case.json"
  1000. if source_file.exists():
  1001. try:
  1002. from examples.process_pipeline.script.generate_case import generate_case
  1003. print(f"\n--- Phase 1.3: Generate case.json ---")
  1004. case_stats = await generate_case(source_file, case_file)
  1005. print(
  1006. f"📦 [Case Generation] "
  1007. f"generated {case_stats.get('total', 0)} cases "
  1008. f"→ {case_file}"
  1009. )
  1010. except Exception as e:
  1011. err_msg = f"Case generation failed: {type(e).__name__}: {e}"
  1012. print(f"⚠️ [Warning] {err_msg}")
  1013. global_errors.append(err_msg)
  1014. # Phase 1.6: Extract workflow and capabilities sequentially → case.json
  1015. if should_run("workflow-extract"):
  1016. case_file = output_dir / "case.json"
  1017. if case_file.exists():
  1018. try:
  1019. from examples.process_pipeline.script.extract_workflow import extract_workflow
  1020. print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---")
  1021. workflow_stats = await extract_workflow(
  1022. case_file,
  1023. claude_llm_call,
  1024. model=claude_model,
  1025. max_concurrent=3
  1026. )
  1027. total_cost += workflow_stats.get("total_cost", 0.0)
  1028. costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
  1029. print(
  1030. f"🔍 [Workflow Extraction] "
  1031. f"success={workflow_stats['success']} "
  1032. f"failed={workflow_stats['failed']}"
  1033. )
  1034. except Exception as e:
  1035. err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
  1036. print(f"⚠️ [Warning] {err_msg}")
  1037. global_errors.append(err_msg)
  1038. if should_run("capability-extract"):
  1039. case_file = output_dir / "case.json"
  1040. if case_file.exists():
  1041. try:
  1042. from examples.process_pipeline.script.extract_capability import extract_capability
  1043. print(f"\n--- Phase 1.6b: Capability Extraction ({claude_model}) ---")
  1044. capability_stats = await extract_capability(
  1045. case_file,
  1046. claude_llm_call,
  1047. model=claude_model,
  1048. max_concurrent=3
  1049. )
  1050. total_cost += capability_stats.get("total_cost", 0.0)
  1051. costs_breakdown["P1.6b_CapabilityExtraction"] = round(capability_stats.get("total_cost", 0.0), 4)
  1052. print(
  1053. f"🧩 [Capability Extraction] "
  1054. f"success={capability_stats['success']} "
  1055. f"failed={capability_stats['failed']} "
  1056. f"→ {case_file}"
  1057. )
  1058. except Exception as e:
  1059. err_msg = f"Capability extraction failed: {type(e).__name__}: {e}"
  1060. print(f"⚠️ [Warning] {err_msg}")
  1061. global_errors.append(err_msg)
  1062. # Phase 1.7: Apply grounding (map apply_to_draft to apply_to)
  1063. if should_run("apply-grounding"):
  1064. case_file = output_dir / "case.json"
  1065. if case_file.exists():
  1066. try:
  1067. from examples.process_pipeline.script.apply_to_grounding import apply_grounding
  1068. print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---")
  1069. grounding_stats = await apply_grounding(
  1070. case_file,
  1071. claude_llm_call,
  1072. model=claude_model,
  1073. max_concurrent=3
  1074. )
  1075. total_cost += grounding_stats.get("total_cost", 0.0)
  1076. costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4)
  1077. print(
  1078. f"🗺️ [Apply Grounding] "
  1079. f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} "
  1080. f"→ {case_file}"
  1081. )
  1082. except Exception as e:
  1083. err_msg = f"Apply grounding failed: {type(e).__name__}: {e}"
  1084. print(f"⚠️ [Warning] {err_msg}")
  1085. global_errors.append(err_msg)
  1086. # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude
  1087. if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]):
  1088. print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---")
  1089. # 输出文件
  1090. process_file = str(output_dir / "process.json")
  1091. capabilities_file = str(output_dir / "capabilities.json")
  1092. # 中间文件
  1093. blueprint_temp_file = str(output_dir / "blueprint_temp.json")
  1094. capabilities_temp_file = str(output_dir / "capabilities_temp.json")
  1095. # 优先使用结构化数据:source.json + case_detailed.json
  1096. detailed_file = raw_cases_dir / "case_detailed.json"
  1097. source_file = raw_cases_dir / "source.json"
  1098. if detailed_file.exists():
  1099. input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/")
  1100. print(f" Using structured data: source.json + case_detailed.json")
  1101. else:
  1102. input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  1103. print(f" Fallback to raw cases: case_*.json")
  1104. force_strategy_rerun = False
  1105. force_active = active_steps is not None
  1106. # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ──
  1107. async def run_cluster_processes():
  1108. """2.1.1: 工序聚类 → blueprint_temp.json"""
  1109. if Path(blueprint_temp_file).exists() and not force_active:
  1110. print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping")
  1111. return 0.0
  1112. print(f" [2.1.1] Clustering processes...")
  1113. try:
  1114. from examples.process_pipeline.script.cluster_processes import cluster_processes
  1115. result = await cluster_processes(
  1116. source_file=source_file,
  1117. detailed_file=detailed_file,
  1118. output_file=Path(blueprint_temp_file),
  1119. requirement=requirement,
  1120. llm_call=claude_llm_call,
  1121. model=claude_model,
  1122. )
  1123. print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, "
  1124. f"generated {result.get('blueprints', 0)} blueprints")
  1125. return result.get("total_cost", 0.0)
  1126. except Exception as e:
  1127. err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}"
  1128. print(f" [2.1.1] ⚠️ {err_msg}")
  1129. global_errors.append(err_msg)
  1130. return 0.0
  1131. async def run_extract_capabilities():
  1132. """2.2.1: 能力初步聚类 → capabilities_temp.json"""
  1133. if Path(capabilities_temp_file).exists() and not force_active:
  1134. print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping")
  1135. return 0.0
  1136. print(f" [2.2.1] Extracting capabilities...")
  1137. try:
  1138. from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
  1139. result = await extract_capabilities_workflow(
  1140. detailed_file=detailed_file,
  1141. source_file=source_file,
  1142. output_file=Path(capabilities_temp_file),
  1143. requirement=requirement,
  1144. llm_call=claude_llm_call,
  1145. model=claude_model,
  1146. )
  1147. print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities")
  1148. return result.get("total_cost", 0.0)
  1149. except Exception as e:
  1150. err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}"
  1151. print(f" [2.2.1] ⚠️ {err_msg}")
  1152. global_errors.append(err_msg)
  1153. return 0.0
  1154. if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists():
  1155. force_strategy_rerun = True
  1156. step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities())
  1157. for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]):
  1158. total_cost += cost
  1159. costs_breakdown[name] = round(cost, 4)
  1160. # ── Step 2: 并行执行 2.1.2 和 2.2.2 ──────────────────────────────
  1161. async def run_score_processes():
  1162. """2.1.2: 工序匹配度打分"""
  1163. if Path(process_file).exists() and not force_active:
  1164. print(f" [2.1.2] ⏭️ process.json exists, skipping")
  1165. return 0.0
  1166. if not Path(blueprint_temp_file).exists():
  1167. print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping")
  1168. return 0.0
  1169. print(f" [2.1.2] Scoring processes...")
  1170. try:
  1171. from examples.process_pipeline.script.score_processes import score_blueprints
  1172. score_result = await score_blueprints(
  1173. blueprint_file=Path(blueprint_temp_file),
  1174. output_file=Path(process_file),
  1175. requirement=requirement,
  1176. llm_call=claude_llm_call,
  1177. model=claude_model,
  1178. )
  1179. print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints")
  1180. return score_result.get("total_cost", 0.0)
  1181. except Exception as e:
  1182. err_msg = f"P2.1.2 ScoreProcesses failed: {e}"
  1183. print(f" [2.1.2] ⚠️ {err_msg}")
  1184. global_errors.append(err_msg)
  1185. return 0.0
  1186. async def run_enrich_capabilities():
  1187. """2.2.2: 能力丰富化"""
  1188. if Path(capabilities_file).exists() and not force_active:
  1189. print(f" [2.2.2] ⏭️ capabilities.json exists, skipping")
  1190. return 0.0
  1191. if not Path(capabilities_temp_file).exists():
  1192. print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping")
  1193. return 0.0
  1194. if not source_file.exists():
  1195. print(f" [2.2.2] ⚠️ source.json not found, skipping")
  1196. return 0.0
  1197. print(f" [2.2.2] Enriching capabilities...")
  1198. try:
  1199. from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
  1200. enrich_result = await enrich_all_capabilities(
  1201. capabilities_temp_file=Path(capabilities_temp_file),
  1202. source_file=source_file,
  1203. output_file=Path(capabilities_file),
  1204. llm_call=claude_llm_call,
  1205. model=claude_model,
  1206. )
  1207. print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities")
  1208. return enrich_result.get("total_cost", 0.0)
  1209. except Exception as e:
  1210. err_msg = f"P2.2.2 EnrichCapabilities failed: {e}"
  1211. print(f" [2.2.2] ⚠️ {err_msg}")
  1212. global_errors.append(err_msg)
  1213. return 0.0
  1214. # 并行执行 Step 2
  1215. step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities())
  1216. for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]):
  1217. total_cost += cost
  1218. costs_breakdown[name] = round(cost, 4)
  1219. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  1220. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  1221. strategy_file_path = output_dir / "strategy.json"
  1222. if args.restart_mode == "single":
  1223. force_strategy_rerun = False
  1224. if strategy_file_path.exists() and not force_strategy_rerun and not force_active:
  1225. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  1226. else:
  1227. if strategy_file_path.exists():
  1228. print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
  1229. print(" > Using [Workflow Core]")
  1230. from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
  1231. try:
  1232. phase3_result = await assemble_strategy(
  1233. process_file=Path(process_file),
  1234. capabilities_file=Path(capabilities_file),
  1235. output_file=strategy_file_path,
  1236. requirement=requirement,
  1237. llm_call=claude_llm_call,
  1238. model=claude_model,
  1239. )
  1240. phase3_cost = phase3_result.get("total_cost", 0.0)
  1241. print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps")
  1242. except Exception as e:
  1243. err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}"
  1244. print(f" ⚠️ {err_msg}")
  1245. global_errors.append(err_msg)
  1246. phase3_cost = 0.0
  1247. total_cost += phase3_cost
  1248. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  1249. else:
  1250. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  1251. end_time = time.time()
  1252. elapsed_sec = end_time - start_time
  1253. # Save Metrics
  1254. metrics_file = base_dir / "run_metrics.json"
  1255. metrics_data = []
  1256. if metrics_file.exists():
  1257. with open(metrics_file, "r", encoding="utf-8") as f:
  1258. try:
  1259. metrics_data = json.load(f)
  1260. except json.JSONDecodeError:
  1261. pass
  1262. # Collect trace_ids from all phases
  1263. trace_ids = {}
  1264. if 'phase1_trace_ids' in dir():
  1265. trace_ids.update(phase1_trace_ids)
  1266. metrics_data.append({
  1267. "index": args.index,
  1268. "requirement": requirement[:80] + "...",
  1269. "duration_seconds": round(elapsed_sec, 2),
  1270. "total_cost_usd": round(total_cost, 4),
  1271. "costs_breakdown": costs_breakdown,
  1272. "trace_ids": trace_ids,
  1273. "errors": global_errors,
  1274. "timestamp": datetime.now().isoformat()
  1275. })
  1276. with open(metrics_file, "w", encoding="utf-8") as f:
  1277. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  1278. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  1279. finally:
  1280. pass
  1281. print("✅ Pipeline run finished.")
  1282. if strategy_file:
  1283. print("✅ Strategy saved to:", strategy_file)
  1284. if __name__ == "__main__":
  1285. asyncio.run(main())