run_pipeline.py 71 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import os
  8. import sys
  9. import time
  10. from datetime import datetime
  11. from pathlib import Path
  12. PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
  13. # Force runtime working directory to project root so relative trace/cache paths
  14. # always land in the repo root no matter where this script is launched from.
  15. os.chdir(PROJECT_ROOT)
  16. # Add project root to path
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. from agent.llm.prompts import SimplePrompt
  21. from agent.core.runner import AgentRunner, RunConfig
  22. from agent.tools.builtin.knowledge import KnowledgeConfig
  23. from agent.trace import FileSystemTraceStore, Trace, Message
  24. from agent.llm import create_qwen_llm_call
  25. from agent.llm.openrouter import create_openrouter_llm_call
  26. from agent.llm.claude import create_claude_llm_call
  27. # config from existing setup
  28. from examples.process_research.config import (
  29. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  30. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  31. )
  32. from agent.utils import setup_logging
  33. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  34. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  35. base_dir = Path(__file__).parent
  36. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  37. prompt = SimplePrompt(prompt_path)
  38. messages = prompt.build_messages(**kwargs)
  39. target_tools = []
  40. if prompt_name == "extract_capabilities":
  41. target_tools = ["capability_search", "capability_list", "tool_search"]
  42. # 按 agent 类型配置工具组权限
  43. tool_groups_map = {
  44. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  45. "filter_and_blueprint": ["core"], # 只需文件读写
  46. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  47. "assemble_strategy": ["core"], # 只需文件读写
  48. }
  49. total_task_cost = 0.0
  50. task_errors = []
  51. out_file = kwargs.get("output_file")
  52. max_retries = 3
  53. last_trace_id = None
  54. last_validation_error = None
  55. final_trace_id = None # 用于返回最终成功的 trace_id
  56. def _instant_validate():
  57. """文件写出后立即校验并尝试修复,返回 error string 或 None"""
  58. nonlocal last_validation_error
  59. if not out_file or not Path(out_file).exists():
  60. return None
  61. try:
  62. with open(out_file, "r", encoding="utf-8") as f:
  63. raw = f.read()
  64. try:
  65. data = json.loads(raw)
  66. except json.JSONDecodeError:
  67. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  68. ok, data, desc = try_fix_and_parse(raw)
  69. if ok:
  70. with open(out_file, "w", encoding="utf-8") as f:
  71. json.dump(data, f, ensure_ascii=False, indent=2)
  72. print(f" 🔧 [Instant Fix] {desc}")
  73. else:
  74. last_validation_error = "JSON parse failed, auto-fix unsuccessful"
  75. return last_validation_error
  76. filename = Path(out_file).name
  77. err = None
  78. if filename.startswith("case_"):
  79. err = validate_case(data)
  80. elif filename == "blueprint.json":
  81. err = validate_blueprint(data)
  82. elif filename == "capabilities_extracted.json":
  83. err = validate_capabilities(data)
  84. elif filename == "strategy.json":
  85. err = validate_strategy(data)
  86. if err:
  87. last_validation_error = err
  88. print(f" ⚠️ [Instant Validation] {err}")
  89. return err
  90. else:
  91. print(f" ✅ [Instant Validation] {filename} OK")
  92. return None
  93. except Exception as e:
  94. last_validation_error = str(e)
  95. return str(e)
  96. for attempt in range(max_retries):
  97. if attempt > 0 and last_trace_id and last_validation_error:
  98. # 续跑模式:把错误信息告诉之前的 agent,让它修复
  99. print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
  100. fix_messages = [{
  101. "role": "user",
  102. "content": (
  103. f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
  104. f"错误详情:{last_validation_error}\n\n"
  105. f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
  106. f"只修复有问题的部分,不要丢弃已有的正确内容。"
  107. )
  108. }]
  109. fix_config = RunConfig(
  110. model=prompt.config.get("model") or model_name,
  111. temperature=0.1,
  112. name=f"{task_name}_Fix{attempt}",
  113. agent_type=prompt_name,
  114. tools=target_tools,
  115. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  116. trace_id=last_trace_id,
  117. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  118. )
  119. try:
  120. async for item in runner.run(messages=fix_messages, config=fix_config):
  121. if isinstance(item, Trace):
  122. last_trace_id = item.trace_id
  123. if item.status == "completed":
  124. total_task_cost += item.total_cost
  125. elif item.status == "failed":
  126. task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
  127. if isinstance(item, Message) and item.role == "tool":
  128. content = item.content if isinstance(item.content, dict) else {}
  129. if content.get("tool_name") in ("write_file", "write_json"):
  130. print(f" 💾 [Fix File Written by {task_name}]")
  131. _instant_validate()
  132. except Exception as e:
  133. err_msg = f"{type(e).__name__}: {e}"
  134. print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
  135. task_errors.append(f"{task_name} fix crashed: {err_msg}")
  136. elif attempt > 0:
  137. # 没有 trace_id 或没有 validation error,只能完全重跑
  138. print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
  139. if out_file and Path(out_file).exists():
  140. Path(out_file).unlink()
  141. run_config = RunConfig(
  142. model=prompt.config.get("model") or model_name,
  143. temperature=prompt.config.get("temperature") or 0.3,
  144. name=f"{task_name}_A{attempt}",
  145. agent_type=prompt_name,
  146. tools=target_tools,
  147. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  148. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  149. )
  150. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  151. try:
  152. async for item in runner.run(messages=messages, config=run_config):
  153. if isinstance(item, Trace):
  154. last_trace_id = item.trace_id
  155. if item.status == "completed":
  156. total_task_cost += item.total_cost
  157. elif item.status == "failed":
  158. task_errors.append(f"{task_name} Failed: {item.error_message}")
  159. if isinstance(item, Message):
  160. if item.role == "tool":
  161. content = item.content if isinstance(item.content, dict) else {}
  162. t_name = content.get("tool_name", "unknown")
  163. if t_name in ("write_file", "write_json"):
  164. print(f" 💾 [File Written by {task_name}]")
  165. _instant_validate()
  166. except Exception as e:
  167. err_msg = f"{type(e).__name__}: {e}"
  168. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  169. task_errors.append(f"{task_name} crashed: {err_msg}")
  170. else:
  171. # 首次执行
  172. run_config = RunConfig(
  173. model=prompt.config.get("model") or model_name,
  174. temperature=prompt.config.get("temperature") or 0.3,
  175. name=f"{task_name}_A{attempt}",
  176. agent_type=prompt_name,
  177. tools=target_tools,
  178. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  179. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  180. )
  181. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  182. try:
  183. async for item in runner.run(messages=messages, config=run_config):
  184. if isinstance(item, Trace):
  185. last_trace_id = item.trace_id
  186. if item.status == "completed":
  187. total_task_cost += item.total_cost
  188. elif item.status == "failed":
  189. task_errors.append(f"{task_name} Failed: {item.error_message}")
  190. if isinstance(item, Message):
  191. if item.role == "tool":
  192. content = item.content if isinstance(item.content, dict) else {}
  193. t_name = content.get("tool_name", "unknown")
  194. if t_name in ("write_file", "write_json"):
  195. print(f" 💾 [File Written by {task_name}]")
  196. _instant_validate()
  197. except Exception as e:
  198. err_msg = f"{type(e).__name__}: {e}"
  199. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  200. task_errors.append(f"{task_name} crashed: {err_msg}")
  201. # Verification & Recovery block
  202. if out_file and not Path(out_file).exists() and last_trace_id:
  203. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  204. recovery_messages = [{
  205. "role": "user",
  206. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  207. }]
  208. rec_config = RunConfig(
  209. model=model_name,
  210. temperature=0.1,
  211. name=task_name + "_Rec",
  212. agent_type=prompt_name,
  213. trace_id=last_trace_id,
  214. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  215. )
  216. try:
  217. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  218. if isinstance(r_item, Trace):
  219. if r_item.status == "completed":
  220. total_task_cost += r_item.total_cost
  221. elif r_item.status == "failed":
  222. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  223. if isinstance(r_item, Message) and r_item.role == "tool":
  224. content = r_item.content if isinstance(r_item.content, dict) else {}
  225. if content.get("tool_name") in ("write_file", "write_json"):
  226. print(f" 💾 [Recovery File Written by {task_name}]")
  227. _instant_validate()
  228. except Exception as e:
  229. err_msg = f"{type(e).__name__}: {e}"
  230. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  231. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  232. # Schema Validation (with auto-fix layer)
  233. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  234. if skip_validation:
  235. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  236. return total_task_cost, task_errors, last_trace_id
  237. try:
  238. with open(out_file, "r", encoding="utf-8") as f:
  239. raw_content = f.read()
  240. # Layer 1: 尝试直接解析
  241. try:
  242. data = json.loads(raw_content)
  243. except json.JSONDecodeError as parse_err:
  244. # Layer 2: 自动修复 JSON 语法错误
  245. print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...")
  246. try:
  247. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  248. success, data, fix_desc = try_fix_and_parse(raw_content)
  249. if success:
  250. # 修复成功,写回文件
  251. with open(out_file, "w", encoding="utf-8") as f:
  252. json.dump(data, f, ensure_ascii=False, indent=2)
  253. print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}")
  254. else:
  255. raise parse_err # 修复失败,抛出原始错误
  256. except ImportError:
  257. raise parse_err # fix_json_quotes 不可用,抛出原始错误
  258. filename = Path(out_file).name
  259. err = None
  260. if filename.startswith("case_"):
  261. err = validate_case(data)
  262. elif filename == "blueprint.json":
  263. err = validate_blueprint(data)
  264. elif filename == "capabilities_extracted.json":
  265. err = validate_capabilities(data)
  266. elif filename == "strategy.json":
  267. err = validate_strategy(data)
  268. if err:
  269. raise ValueError(f"Schema Validation Failed: {err}")
  270. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  271. final_trace_id = last_trace_id
  272. return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
  273. except Exception as e:
  274. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  275. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  276. task_errors.append(f"{task_name} Error: {e}")
  277. last_validation_error = str(e)
  278. if attempt == max_retries - 1:
  279. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  280. return total_task_cost, task_errors, last_trace_id
  281. else:
  282. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  283. last_validation_error = None
  284. if attempt == max_retries - 1:
  285. return total_task_cost, task_errors, last_trace_id
  286. return total_task_cost, task_errors, last_trace_id
  287. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  288. """
  289. 备用:使用纯净的官方 Anthropic SDK 驱动。
  290. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  291. """
  292. from anthropic import AsyncAnthropic
  293. from agent.tools.registry import get_tool_registry
  294. base_dir = Path(__file__).parent
  295. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  296. prompt = SimplePrompt(prompt_path)
  297. # 1. 组装输入 Message
  298. raw_messages = prompt.build_messages(**kwargs)
  299. system_prompt = ""
  300. messages = []
  301. for msg in raw_messages:
  302. if msg["role"] == "system":
  303. system_prompt += msg["content"] + "\n\n"
  304. else:
  305. messages.append({"role": msg["role"], "content": msg["content"]})
  306. # 2. 映射目标工具
  307. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  308. if prompt_name == "extract_capabilities":
  309. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  310. registry = get_tool_registry()
  311. schemas = registry.get_schemas(target_tools)
  312. anthropic_tools = []
  313. for s in schemas:
  314. anthropic_tools.append({
  315. "name": s["function"]["name"],
  316. "description": s["function"].get("description", ""),
  317. "input_schema": s["function"]["parameters"]
  318. })
  319. # 3. 初始化并开启 Loop
  320. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  321. client = AsyncAnthropic()
  322. total_task_cost = 0.0
  323. task_errors = []
  324. sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
  325. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  326. out_file = kwargs.get("output_file")
  327. max_retries = 3
  328. for attempt in range(max_retries):
  329. if attempt > 0:
  330. print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
  331. if out_file and Path(out_file).exists():
  332. Path(out_file).unlink()
  333. print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
  334. # Reset messages for retry
  335. messages_copy = list(messages)
  336. max_loops = 50
  337. for loop_idx in range(max_loops):
  338. try:
  339. # 去除前缀(兼容比如 openrouter 传入的名字)
  340. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  341. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
  342. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  343. response = await client.messages.create(
  344. model=target_model,
  345. max_tokens=4096,
  346. temperature=0.2,
  347. system=system_prompt,
  348. messages=messages_copy,
  349. tools=anthropic_tools
  350. )
  351. # (简略预估,不代表真实官方开销)
  352. if hasattr(response, 'usage'):
  353. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  354. total_task_cost += step_cost
  355. # 加入助手回复
  356. assistant_content = []
  357. tool_uses = []
  358. for content_block in response.content:
  359. if content_block.type == "text":
  360. text_val = content_block.text
  361. if text_val:
  362. assistant_content.append({"type": "text", "text": text_val})
  363. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  364. elif content_block.type == "tool_use":
  365. assistant_content.append({
  366. "type": "tool_use",
  367. "id": content_block.id,
  368. "name": content_block.name,
  369. "input": content_block.input
  370. })
  371. tool_uses.append(content_block)
  372. if not assistant_content:
  373. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  374. messages_copy.append({"role": "assistant", "content": assistant_content})
  375. # 出口:没有调用工具说明任务结束
  376. if not tool_uses:
  377. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
  378. break
  379. # 工具执行与回传
  380. tool_results = []
  381. for tu in tool_uses:
  382. if tu.name in ("write_file", "write_json"):
  383. print(f" 💾 [File Written by SDK] {task_name}")
  384. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  385. # 执行本地环境的函数
  386. result_str = await registry.execute(tu.name, tu.input)
  387. tool_results.append({
  388. "type": "tool_result",
  389. "tool_use_id": tu.id,
  390. "content": result_str
  391. })
  392. messages_copy.append({"role": "user", "content": tool_results})
  393. except Exception as e:
  394. err_msg = str(e)
  395. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  396. task_errors.append(err_msg)
  397. break
  398. # Verification & Recovery block for SDK (porting from AgentRunner)
  399. if out_file and not Path(out_file).exists():
  400. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  401. messages_copy.append({
  402. "role": "user",
  403. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  404. })
  405. try:
  406. target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
  407. rec_response = await client.messages.create(
  408. model=target_model,
  409. max_tokens=4096,
  410. temperature=0.1,
  411. system=system_prompt,
  412. messages=messages_copy,
  413. tools=anthropic_tools,
  414. tool_choice={"type": "any"}
  415. )
  416. for content_block in rec_response.content:
  417. if content_block.type == "tool_use":
  418. if content_block.name in ("write_file", "write_json"):
  419. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  420. await registry.execute(content_block.name, content_block.input)
  421. except Exception as e:
  422. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  423. task_errors.append(str(e))
  424. # Schema Validation
  425. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  426. if skip_validation:
  427. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  428. return total_task_cost, task_errors, None
  429. try:
  430. with open(out_file, "r", encoding="utf-8") as f:
  431. data = json.loads(f.read())
  432. filename = Path(out_file).name
  433. err = None
  434. if filename.startswith("case_"):
  435. err = validate_case(data)
  436. elif filename == "blueprint.json":
  437. err = validate_blueprint(data)
  438. elif filename == "capabilities_extracted.json":
  439. err = validate_capabilities(data)
  440. elif filename == "strategy.json":
  441. err = validate_strategy(data)
  442. if err:
  443. raise ValueError(f"Schema Validation Failed: {err}")
  444. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  445. return total_task_cost, task_errors # Success! Exit retry loop.
  446. except Exception as e:
  447. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  448. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  449. task_errors.append(f"{task_name} Error: {e}")
  450. if attempt == max_retries - 1:
  451. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  452. return total_task_cost, task_errors
  453. else:
  454. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  455. if attempt == max_retries - 1:
  456. return total_task_cost, task_errors
  457. return total_task_cost, task_errors
  458. async def main():
  459. parser = argparse.ArgumentParser()
  460. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  461. STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"]
  462. parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search")
  463. parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
  464. parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step")
  465. parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)")
  466. parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)")
  467. parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)")
  468. parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)")
  469. args = parser.parse_args()
  470. # ── 参数验证 ──
  471. # --case-index 只能与提取步骤一起使用
  472. if args.case_index is not None:
  473. extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"}
  474. if args.only_step and args.only_step not in extraction_steps:
  475. print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}")
  476. sys.exit(1)
  477. if not args.only_step:
  478. print("❌ Error: --case-index requires --only-step to specify which extraction step to run")
  479. sys.exit(1)
  480. # ── 三种模式互斥检查 ──
  481. mode_count = sum([
  482. args.only_step is not None,
  483. args.phase is not None,
  484. (args.start_from is not None or args.end_at is not None),
  485. ])
  486. if mode_count > 1:
  487. print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.")
  488. sys.exit(1)
  489. # 定义步骤拓扑(非线性,Phase 2 有两条并行分支)
  490. STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查
  491. def _step_in_range(step, start, end):
  492. """判断线性前缀中的 step 是否在 [start, end] 范围内"""
  493. all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"]
  494. if step not in all_steps or start not in all_steps:
  495. return False
  496. # 对于线性前缀,用索引比较
  497. if step in LINEAR_PREFIX:
  498. s_idx = LINEAR_PREFIX.index(step)
  499. start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1
  500. # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑
  501. if start_idx < 0:
  502. return False
  503. return s_idx >= start_idx
  504. return False
  505. PHASE_MAP = {
  506. 1: {"research", "source", "case-detailed"},
  507. 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"},
  508. 3: {"strategy"},
  509. }
  510. BRANCH_21 = ["process-cluster", "process-score"]
  511. BRANCH_22 = ["capability-enrich"]
  512. # 线性前缀(Phase 2 之前)
  513. LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"]
  514. # 计算需要执行的步骤集合
  515. active_steps = None # None = 全部执行
  516. if args.only_step:
  517. active_steps = {args.only_step}
  518. elif args.phase:
  519. active_steps = PHASE_MAP[args.phase]
  520. elif args.start_from or args.end_at:
  521. start = args.start_from or "research"
  522. end = args.end_at or "strategy"
  523. # 构建 active_steps,考虑并行分支
  524. active = set()
  525. # 1. 线性前缀部分
  526. for s in LINEAR_PREFIX:
  527. if _step_in_range(s, start, end):
  528. active.add(s)
  529. # 2. Phase 2 分支部分
  530. start_in_21 = start in BRANCH_21
  531. start_in_22 = start in BRANCH_22
  532. end_in_21 = end in BRANCH_21
  533. end_in_22 = end in BRANCH_22
  534. end_is_strategy = end == "strategy"
  535. # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑
  536. if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX):
  537. # 两条分支都包含(如果 start 允许的话)
  538. if not start_in_21 and not start_in_22:
  539. # start 在 Phase 2 之前或就是 Phase 2 的开头
  540. active.update(BRANCH_21)
  541. active.update(BRANCH_22)
  542. elif start_in_21:
  543. # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑
  544. idx = BRANCH_21.index(start)
  545. active.update(BRANCH_21[idx:])
  546. active.update(BRANCH_22)
  547. elif start_in_22:
  548. # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑
  549. idx = BRANCH_22.index(start)
  550. active.update(BRANCH_22[idx:])
  551. active.update(BRANCH_21)
  552. elif end_in_21:
  553. # end 在 2.1 分支内,只跑 2.1
  554. end_idx = BRANCH_21.index(end)
  555. start_idx = BRANCH_21.index(start) if start_in_21 else 0
  556. active.update(BRANCH_21[start_idx:end_idx + 1])
  557. elif end_in_22:
  558. # end 在 2.2 分支内,只跑 2.2
  559. end_idx = BRANCH_22.index(end)
  560. start_idx = BRANCH_22.index(start) if start_in_22 else 0
  561. active.update(BRANCH_22[start_idx:end_idx + 1])
  562. # 3. strategy
  563. if end_is_strategy:
  564. active.add("strategy")
  565. active_steps = active
  566. def should_run(step_name: str) -> bool:
  567. if active_steps is None:
  568. return True
  569. return step_name in active_steps
  570. if active_steps is not None:
  571. active_list = [s for s in STEP_ORDER if s in active_steps]
  572. print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}")
  573. base_dir = Path(__file__).parent
  574. # Load requirements locally
  575. req_path = base_dir / "db_requirements.json"
  576. with open(req_path, encoding='utf-8') as f:
  577. reqs = json.load(f)
  578. if args.index < 0 or args.index >= len(reqs):
  579. print("Index out of bounds")
  580. sys.exit(1)
  581. requirement = reqs[args.index]
  582. # 0. Setup directories
  583. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  584. output_dir.mkdir(parents=True, exist_ok=True)
  585. raw_cases_dir = output_dir / "raw_cases"
  586. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  587. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  588. print("=" * 60)
  589. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  590. print("=" * 60)
  591. # ── 前置文件检查 ──
  592. # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在)
  593. STEP_DEPS = {
  594. "research": [],
  595. "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))],
  596. "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())],
  597. "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
  598. "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
  599. "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())],
  600. "process-cluster": [
  601. ("case.json", lambda: (output_dir / "case.json").exists()),
  602. ],
  603. "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())],
  604. "capability-enrich": [
  605. ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()),
  606. ("case.json", lambda: (output_dir / "case.json").exists()),
  607. ],
  608. "strategy": [
  609. ("process.json", lambda: (output_dir / "process.json").exists()),
  610. ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()),
  611. ],
  612. }
  613. # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖)
  614. STEP_PRODUCES = {
  615. "research": {"case_*.json"},
  616. "source": {"source.json"},
  617. "generate-case": {"case.json"},
  618. "workflow-extract": {"case.json"}, # 原地更新
  619. "capability-extract": {"case.json"}, # 原地更新
  620. "apply-grounding": {"case.json"}, # 原地更新
  621. "process-cluster": {"blueprint_temp.json"},
  622. "process-score": {"process.json"},
  623. "capability-enrich": {"capabilities.json"},
  624. "strategy": {"strategy.json"},
  625. }
  626. if active_steps is not None:
  627. # 计算本次运行会生成的文件集合
  628. will_produce = set()
  629. for s in STEP_ORDER:
  630. if s in active_steps:
  631. will_produce.update(STEP_PRODUCES.get(s, set()))
  632. # 检查每个 active step 的前置文件
  633. missing = []
  634. for s in STEP_ORDER:
  635. if s not in active_steps:
  636. continue
  637. for dep_name, dep_check in STEP_DEPS.get(s, []):
  638. if dep_name in will_produce:
  639. continue # 上游 step 会在本次运行中生成
  640. if not dep_check():
  641. missing.append((s, dep_name))
  642. if missing:
  643. print(f"\n❌ [Pre-flight Check] Missing prerequisite files:")
  644. for step, dep in missing:
  645. print(f" - Step '{step}' requires '{dep}'")
  646. print(f"\nRun upstream steps first, or use --start-from to include them.")
  647. sys.exit(1)
  648. else:
  649. print(f"✅ [Pre-flight Check] All prerequisites satisfied")
  650. # Load presets
  651. presets_path = base_dir / "presets.json"
  652. if presets_path.exists():
  653. from agent.core.presets import load_presets_from_json
  654. load_presets_from_json(str(presets_path))
  655. print("✅ Configured Agent Presets (Skills Boundaries)")
  656. # Browser initialization removed to save resources
  657. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  658. # Instantiate two distinct LLM orchestrators
  659. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  660. # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode)
  661. claude_model = "openai/gpt-5.4"
  662. args.use_claude_sdk = False # 禁用纯 Native SDK 模式,走内部通用 AgentRunner (即可对接 OpenRouter)
  663. from agent.llm.openrouter import create_openrouter_llm_call
  664. claude_llm_call = create_openrouter_llm_call(model=claude_model)
  665. runner_qwen = AgentRunner(
  666. trace_store=store,
  667. llm_call=create_qwen_llm_call(model=qwen_model),
  668. skills_dir=SKILLS_DIR
  669. )
  670. runner_claude = AgentRunner(
  671. trace_store=store,
  672. llm_call=claude_llm_call,
  673. skills_dir=SKILLS_DIR
  674. )
  675. try:
  676. start_time = time.time()
  677. total_cost = 0.0
  678. costs_breakdown = {}
  679. global_errors = []
  680. strategy_file = None
  681. # ── --only-step 单步执行模式 ──────────────────────────────
  682. if args.only_step:
  683. step = args.only_step
  684. source_file = raw_cases_dir / "source.json"
  685. detailed_file = raw_cases_dir / "case_detailed.json"
  686. blueprint_temp_file = output_dir / "blueprint_temp.json"
  687. capabilities_temp_file = output_dir / "capabilities_temp.json"
  688. process_file = output_dir / "process.json"
  689. capabilities_file = output_dir / "capabilities.json"
  690. print(f"\n[Single Step Mode] Running only: {step}")
  691. if step == "research":
  692. # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响
  693. single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  694. if not single_platforms:
  695. print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube")
  696. sys.exit(1)
  697. print(f" 🔍 Research platforms: {single_platforms}")
  698. single_is_single = args.restart_mode.startswith("single_")
  699. phase1_tasks = []
  700. for p in single_platforms:
  701. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  702. out_file = str(raw_cases_dir / f"case_{p}.json")
  703. kwargs = {
  704. "task": task_desc,
  705. "output_file": out_file
  706. }
  707. phase1_tasks.append(
  708. run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single)
  709. )
  710. phase1_results = await asyncio.gather(*phase1_tasks)
  711. for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms):
  712. print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}")
  713. elif step == "source":
  714. # Phase 1.5: 提取原始 source.json
  715. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  716. result = extract_sources_to_json(raw_cases_dir)
  717. print(f" ✓ source.json: matched={result['total_matched']}")
  718. elif step == "generate-case":
  719. # Phase 1.5.5: 生成标准化 case.json
  720. from examples.process_pipeline.script.generate_case import generate_case_from_source
  721. result = await generate_case_from_source(raw_cases_dir)
  722. print(f" ✓ case.json: cases={result['total_cases']}")
  723. elif step == "workflow-extract":
  724. # Phase 1.6a: 提取 workflow 到 case.json
  725. case_file = output_dir / "case.json"
  726. if not case_file.exists():
  727. print(f" ❌ case.json not found. Run --only-step generate-case first.")
  728. sys.exit(1)
  729. # 如果指定了 --case-index,先过滤 case.json
  730. if args.case_index is not None:
  731. with open(case_file, "r", encoding="utf-8") as f:
  732. case_data = json.load(f)
  733. original_cases = case_data.get("cases", [])
  734. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  735. if not target_case:
  736. print(f" ❌ Case with index {args.case_index} not found in case.json")
  737. sys.exit(1)
  738. # 临时只保留目标 case
  739. case_data["cases"] = [target_case]
  740. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  741. with open(temp_case_file, "w", encoding="utf-8") as f:
  742. json.dump(case_data, f, ensure_ascii=False, indent=2)
  743. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  744. case_file_to_use = temp_case_file
  745. else:
  746. case_file_to_use = case_file
  747. from examples.process_pipeline.script.extract_workflow import extract_workflow
  748. result = await extract_workflow(
  749. case_file_to_use,
  750. claude_llm_call, model=claude_model
  751. )
  752. # 如果使用了临时文件,需要合并回原始 case.json
  753. if args.case_index is not None:
  754. with open(case_file_to_use, "r", encoding="utf-8") as f:
  755. updated_data = json.load(f)
  756. updated_case = updated_data["cases"][0]
  757. # 更新原始文件中的对应 case
  758. with open(case_file, "r", encoding="utf-8") as f:
  759. original_data = json.load(f)
  760. for i, c in enumerate(original_data["cases"]):
  761. if c.get("index") == args.case_index:
  762. original_data["cases"][i] = updated_case
  763. break
  764. with open(case_file, "w", encoding="utf-8") as f:
  765. json.dump(original_data, f, ensure_ascii=False, indent=2)
  766. temp_case_file.unlink() # 删除临时文件
  767. print(f" ✓ Merged case {args.case_index} back to case.json")
  768. print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}")
  769. elif step == "capability-extract":
  770. # Phase 1.6b: 提取 capabilities 到 case.json
  771. case_file = output_dir / "case.json"
  772. if not case_file.exists():
  773. print(f" ❌ case.json not found. Run --only-step generate-case first.")
  774. sys.exit(1)
  775. # 如果指定了 --case-index,先过滤 case.json
  776. if args.case_index is not None:
  777. with open(case_file, "r", encoding="utf-8") as f:
  778. case_data = json.load(f)
  779. original_cases = case_data.get("cases", [])
  780. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  781. if not target_case:
  782. print(f" ❌ Case with index {args.case_index} not found in case.json")
  783. sys.exit(1)
  784. case_data["cases"] = [target_case]
  785. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  786. with open(temp_case_file, "w", encoding="utf-8") as f:
  787. json.dump(case_data, f, ensure_ascii=False, indent=2)
  788. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  789. case_file_to_use = temp_case_file
  790. else:
  791. case_file_to_use = case_file
  792. from examples.process_pipeline.script.extract_capability import extract_capability
  793. result = await extract_capability(
  794. case_file_to_use,
  795. claude_llm_call, model=claude_model
  796. )
  797. # 如果使用了临时文件,需要合并回原始 case.json
  798. if args.case_index is not None:
  799. with open(case_file_to_use, "r", encoding="utf-8") as f:
  800. updated_data = json.load(f)
  801. updated_case = updated_data["cases"][0]
  802. with open(case_file, "r", encoding="utf-8") as f:
  803. original_data = json.load(f)
  804. for i, c in enumerate(original_data["cases"]):
  805. if c.get("index") == args.case_index:
  806. original_data["cases"][i] = updated_case
  807. break
  808. with open(case_file, "w", encoding="utf-8") as f:
  809. json.dump(original_data, f, ensure_ascii=False, indent=2)
  810. temp_case_file.unlink()
  811. print(f" ✓ Merged case {args.case_index} back to case.json")
  812. print(f" ✓ case.json + capabilities: success={result['success']}, failed={result['failed']}")
  813. elif step == "apply-grounding":
  814. # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to
  815. case_file = output_dir / "case.json"
  816. if not case_file.exists():
  817. print(f" ❌ case.json not found. Run workflow-extract and capability-extract first.")
  818. sys.exit(1)
  819. # 如果指定了 --case-index,先过滤 case.json
  820. if args.case_index is not None:
  821. with open(case_file, "r", encoding="utf-8") as f:
  822. case_data = json.load(f)
  823. original_cases = case_data.get("cases", [])
  824. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  825. if not target_case:
  826. print(f" ❌ Case with index {args.case_index} not found in case.json")
  827. sys.exit(1)
  828. case_data["cases"] = [target_case]
  829. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  830. with open(temp_case_file, "w", encoding="utf-8") as f:
  831. json.dump(case_data, f, ensure_ascii=False, indent=2)
  832. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  833. case_file_to_use = temp_case_file
  834. else:
  835. case_file_to_use = case_file
  836. from examples.process_pipeline.script.apply_to_grounding import apply_grounding
  837. result = await apply_grounding(
  838. case_file_to_use,
  839. claude_llm_call, model=claude_model
  840. )
  841. # 如果使用了临时文件,需要合并回原始 case.json
  842. if args.case_index is not None:
  843. with open(case_file_to_use, "r", encoding="utf-8") as f:
  844. updated_data = json.load(f)
  845. updated_case = updated_data["cases"][0]
  846. with open(case_file, "r", encoding="utf-8") as f:
  847. original_data = json.load(f)
  848. for i, c in enumerate(original_data["cases"]):
  849. if c.get("index") == args.case_index:
  850. original_data["cases"][i] = updated_case
  851. break
  852. with open(case_file, "w", encoding="utf-8") as f:
  853. json.dump(original_data, f, ensure_ascii=False, indent=2)
  854. temp_case_file.unlink()
  855. print(f" ✓ Merged case {args.case_index} back to case.json")
  856. print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}")
  857. elif step == "process-cluster":
  858. # Phase 2.1.1: 工序聚类
  859. if not detailed_file.exists():
  860. print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
  861. sys.exit(1)
  862. from examples.process_pipeline.script.cluster_processes import cluster_processes
  863. result = await cluster_processes(
  864. source_file=source_file, detailed_file=detailed_file,
  865. output_file=blueprint_temp_file, requirement=requirement,
  866. llm_call=claude_llm_call, model=claude_model,
  867. )
  868. print(f" ✓ blueprint_temp.json: clusters={result['clusters']}")
  869. elif step == "process-score":
  870. # Phase 2.1.2: 工序打分
  871. if not blueprint_temp_file.exists():
  872. print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.")
  873. sys.exit(1)
  874. from examples.process_pipeline.script.score_processes import score_blueprints
  875. result = await score_blueprints(
  876. blueprint_file=blueprint_temp_file, output_file=process_file,
  877. requirement=requirement, llm_call=claude_llm_call, model=claude_model,
  878. )
  879. print(f" ✓ process.json: scored={result['scored']}")
  880. elif step == "capability-extract":
  881. # Phase 2.2.1: 能力初步聚类
  882. if not detailed_file.exists():
  883. print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
  884. sys.exit(1)
  885. from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
  886. result = await extract_capabilities_workflow(
  887. detailed_file=detailed_file, source_file=source_file,
  888. output_file=capabilities_temp_file, requirement=requirement,
  889. llm_call=claude_llm_call, model=claude_model,
  890. )
  891. print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}")
  892. elif step == "capability-enrich":
  893. # Phase 2.2.2: 能力丰富化
  894. if not capabilities_temp_file.exists():
  895. print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.")
  896. sys.exit(1)
  897. if not source_file.exists():
  898. print(f" ❌ source.json not found. Run --only-step source first.")
  899. sys.exit(1)
  900. from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
  901. result = await enrich_all_capabilities(
  902. capabilities_temp_file=capabilities_temp_file, source_file=source_file,
  903. output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model,
  904. )
  905. print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}")
  906. elif step == "strategy":
  907. # Phase 3: 策略组装
  908. if not process_file.exists():
  909. print(f" ❌ process.json not found. Run --only-step process-score first.")
  910. sys.exit(1)
  911. if not capabilities_file.exists():
  912. print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.")
  913. sys.exit(1)
  914. strategy_file_path = output_dir / "strategy.json"
  915. from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
  916. result = await assemble_strategy(
  917. process_file=process_file, capabilities_file=capabilities_file,
  918. output_file=strategy_file_path, requirement=requirement,
  919. llm_call=claude_llm_call, model=claude_model,
  920. )
  921. print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}")
  922. elapsed = time.time() - start_time
  923. print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s")
  924. return
  925. # ── 正常 pipeline 流程 ──────────────────────────────
  926. existing_platforms = []
  927. if raw_cases_dir.exists():
  928. for f in raw_cases_dir.glob("case_*.json"):
  929. plat = f.stem.replace("case_", "")
  930. if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
  931. existing_platforms.append(plat)
  932. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  933. # Phase 0: Platform Selection (controlled by --platforms)
  934. if should_run("research"):
  935. new_platforms = [p for p in platforms if p not in existing_platforms]
  936. if not new_platforms:
  937. print(f"\n--- Phase 0: Skipping Research (All specified platforms already exist: {existing_platforms}) ---")
  938. platforms = []
  939. else:
  940. print(f"\n--- Phase 0: Using specified platforms ---")
  941. print(f"📡 Found existing cases: {existing_platforms}. Will research new platforms: {new_platforms}")
  942. platforms = new_platforms
  943. is_single_step = args.restart_mode.startswith("single_")
  944. # Phase 1: MAP (Parallel Search) uses Qwen
  945. if should_run("research"):
  946. print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
  947. phase1_tasks = []
  948. for p in platforms:
  949. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
  950. out_file = str(raw_cases_dir / f"case_{p}.json")
  951. kwargs = {
  952. "task": task_desc,
  953. "output_file": out_file
  954. }
  955. phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=is_single_step))
  956. phase1_results = await asyncio.gather(*phase1_tasks)
  957. phase1_trace_ids = {}
  958. for (task_cost, task_errors, trace_id), p in zip(phase1_results, platforms):
  959. total_cost += task_cost
  960. costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
  961. phase1_trace_ids[f"P1_Research_{p}"] = trace_id
  962. global_errors.extend(task_errors)
  963. # Check if cases actually got written
  964. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  965. if not expected_file.exists():
  966. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  967. print(f"⚠️ [Warning] {err_msg}")
  968. global_errors.append(err_msg)
  969. else:
  970. print("\n⏭️ [Skip] Phase 1 Skipped. Using existing cases...")
  971. # Phase 1.5: Extract raw post data from cache → raw_cases/source.json
  972. if should_run("source"):
  973. try:
  974. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  975. trace_id_list = [tid for tid in phase1_trace_ids.values() if tid] if 'phase1_trace_ids' in dir() else None
  976. src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
  977. print(
  978. f"📎 [Source Extraction] "
  979. f"matched={src_stats['total_matched']} "
  980. f"→ {raw_cases_dir / 'source.json'}"
  981. )
  982. except Exception as e:
  983. err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
  984. print(f"⚠️ [Warning] {err_msg}")
  985. global_errors.append(err_msg)
  986. # Phase 1.3: Generate case.json from source.json
  987. if should_run("generate-case"):
  988. source_file = raw_cases_dir / "source.json"
  989. case_file = output_dir / "case.json"
  990. if source_file.exists():
  991. try:
  992. from examples.process_pipeline.script.generate_case import generate_case
  993. print(f"\n--- Phase 1.3: Generate case.json ---")
  994. case_stats = await generate_case(source_file, case_file)
  995. print(
  996. f"📦 [Case Generation] "
  997. f"generated {case_stats.get('total', 0)} cases "
  998. f"→ {case_file}"
  999. )
  1000. except Exception as e:
  1001. err_msg = f"Case generation failed: {type(e).__name__}: {e}"
  1002. print(f"⚠️ [Warning] {err_msg}")
  1003. global_errors.append(err_msg)
  1004. # Phase 1.6: Extract workflow and capabilities sequentially → case.json
  1005. if should_run("workflow-extract"):
  1006. case_file = output_dir / "case.json"
  1007. if case_file.exists():
  1008. try:
  1009. from examples.process_pipeline.script.extract_workflow import extract_workflow
  1010. print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---")
  1011. workflow_stats = await extract_workflow(
  1012. case_file,
  1013. claude_llm_call,
  1014. model=claude_model,
  1015. max_concurrent=3
  1016. )
  1017. total_cost += workflow_stats.get("total_cost", 0.0)
  1018. costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
  1019. print(
  1020. f"🔍 [Workflow Extraction] "
  1021. f"success={workflow_stats['success']} "
  1022. f"failed={workflow_stats['failed']}"
  1023. )
  1024. except Exception as e:
  1025. err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
  1026. print(f"⚠️ [Warning] {err_msg}")
  1027. global_errors.append(err_msg)
  1028. if should_run("capability-extract"):
  1029. case_file = output_dir / "case.json"
  1030. if case_file.exists():
  1031. try:
  1032. from examples.process_pipeline.script.extract_capability import extract_capability
  1033. print(f"\n--- Phase 1.6b: Capability Extraction ({claude_model}) ---")
  1034. capability_stats = await extract_capability(
  1035. case_file,
  1036. claude_llm_call,
  1037. model=claude_model,
  1038. max_concurrent=3
  1039. )
  1040. total_cost += capability_stats.get("total_cost", 0.0)
  1041. costs_breakdown["P1.6b_CapabilityExtraction"] = round(capability_stats.get("total_cost", 0.0), 4)
  1042. print(
  1043. f"🧩 [Capability Extraction] "
  1044. f"success={capability_stats['success']} "
  1045. f"failed={capability_stats['failed']} "
  1046. f"→ {case_file}"
  1047. )
  1048. except Exception as e:
  1049. err_msg = f"Capability extraction failed: {type(e).__name__}: {e}"
  1050. print(f"⚠️ [Warning] {err_msg}")
  1051. global_errors.append(err_msg)
  1052. # Phase 1.7: Apply grounding (map apply_to_draft to apply_to)
  1053. if should_run("apply-grounding"):
  1054. case_file = output_dir / "case.json"
  1055. if case_file.exists():
  1056. try:
  1057. from examples.process_pipeline.script.apply_to_grounding import apply_grounding
  1058. print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---")
  1059. grounding_stats = await apply_grounding(
  1060. case_file,
  1061. claude_llm_call,
  1062. model=claude_model,
  1063. max_concurrent=3
  1064. )
  1065. total_cost += grounding_stats.get("total_cost", 0.0)
  1066. costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4)
  1067. print(
  1068. f"🗺️ [Apply Grounding] "
  1069. f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} "
  1070. f"→ {case_file}"
  1071. )
  1072. except Exception as e:
  1073. err_msg = f"Apply grounding failed: {type(e).__name__}: {e}"
  1074. print(f"⚠️ [Warning] {err_msg}")
  1075. global_errors.append(err_msg)
  1076. # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude
  1077. if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]):
  1078. print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---")
  1079. # 输出文件
  1080. process_file = str(output_dir / "process.json")
  1081. capabilities_file = str(output_dir / "capabilities.json")
  1082. # 中间文件
  1083. blueprint_temp_file = str(output_dir / "blueprint_temp.json")
  1084. capabilities_temp_file = str(output_dir / "capabilities_temp.json")
  1085. # 优先使用结构化数据:source.json + case_detailed.json
  1086. detailed_file = raw_cases_dir / "case_detailed.json"
  1087. source_file = raw_cases_dir / "source.json"
  1088. if detailed_file.exists():
  1089. input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/")
  1090. print(f" Using structured data: source.json + case_detailed.json")
  1091. else:
  1092. input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  1093. print(f" Fallback to raw cases: case_*.json")
  1094. force_strategy_rerun = False
  1095. force_active = active_steps is not None
  1096. # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ──
  1097. async def run_cluster_processes():
  1098. """2.1.1: 工序聚类 → blueprint_temp.json"""
  1099. if Path(blueprint_temp_file).exists() and not force_active:
  1100. print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping")
  1101. return 0.0
  1102. print(f" [2.1.1] Clustering processes...")
  1103. try:
  1104. from examples.process_pipeline.script.cluster_processes import cluster_processes
  1105. result = await cluster_processes(
  1106. source_file=source_file,
  1107. detailed_file=detailed_file,
  1108. output_file=Path(blueprint_temp_file),
  1109. requirement=requirement,
  1110. llm_call=claude_llm_call,
  1111. model=claude_model,
  1112. )
  1113. print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, "
  1114. f"generated {result.get('blueprints', 0)} blueprints")
  1115. return result.get("total_cost", 0.0)
  1116. except Exception as e:
  1117. err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}"
  1118. print(f" [2.1.1] ⚠️ {err_msg}")
  1119. global_errors.append(err_msg)
  1120. return 0.0
  1121. async def run_extract_capabilities():
  1122. """2.2.1: 能力初步聚类 → capabilities_temp.json"""
  1123. if Path(capabilities_temp_file).exists() and not force_active:
  1124. print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping")
  1125. return 0.0
  1126. print(f" [2.2.1] Extracting capabilities...")
  1127. try:
  1128. from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
  1129. result = await extract_capabilities_workflow(
  1130. detailed_file=detailed_file,
  1131. source_file=source_file,
  1132. output_file=Path(capabilities_temp_file),
  1133. requirement=requirement,
  1134. llm_call=claude_llm_call,
  1135. model=claude_model,
  1136. )
  1137. print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities")
  1138. return result.get("total_cost", 0.0)
  1139. except Exception as e:
  1140. err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}"
  1141. print(f" [2.2.1] ⚠️ {err_msg}")
  1142. global_errors.append(err_msg)
  1143. return 0.0
  1144. if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists():
  1145. force_strategy_rerun = True
  1146. step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities())
  1147. for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]):
  1148. total_cost += cost
  1149. costs_breakdown[name] = round(cost, 4)
  1150. # ── Step 2: 并行执行 2.1.2 和 2.2.2 ──────────────────────────────
  1151. async def run_score_processes():
  1152. """2.1.2: 工序匹配度打分"""
  1153. if Path(process_file).exists() and not force_active:
  1154. print(f" [2.1.2] ⏭️ process.json exists, skipping")
  1155. return 0.0
  1156. if not Path(blueprint_temp_file).exists():
  1157. print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping")
  1158. return 0.0
  1159. print(f" [2.1.2] Scoring processes...")
  1160. try:
  1161. from examples.process_pipeline.script.score_processes import score_blueprints
  1162. score_result = await score_blueprints(
  1163. blueprint_file=Path(blueprint_temp_file),
  1164. output_file=Path(process_file),
  1165. requirement=requirement,
  1166. llm_call=claude_llm_call,
  1167. model=claude_model,
  1168. )
  1169. print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints")
  1170. return score_result.get("total_cost", 0.0)
  1171. except Exception as e:
  1172. err_msg = f"P2.1.2 ScoreProcesses failed: {e}"
  1173. print(f" [2.1.2] ⚠️ {err_msg}")
  1174. global_errors.append(err_msg)
  1175. return 0.0
  1176. async def run_enrich_capabilities():
  1177. """2.2.2: 能力丰富化"""
  1178. if Path(capabilities_file).exists() and not force_active:
  1179. print(f" [2.2.2] ⏭️ capabilities.json exists, skipping")
  1180. return 0.0
  1181. if not Path(capabilities_temp_file).exists():
  1182. print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping")
  1183. return 0.0
  1184. if not source_file.exists():
  1185. print(f" [2.2.2] ⚠️ source.json not found, skipping")
  1186. return 0.0
  1187. print(f" [2.2.2] Enriching capabilities...")
  1188. try:
  1189. from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
  1190. enrich_result = await enrich_all_capabilities(
  1191. capabilities_temp_file=Path(capabilities_temp_file),
  1192. source_file=source_file,
  1193. output_file=Path(capabilities_file),
  1194. llm_call=claude_llm_call,
  1195. model=claude_model,
  1196. )
  1197. print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities")
  1198. return enrich_result.get("total_cost", 0.0)
  1199. except Exception as e:
  1200. err_msg = f"P2.2.2 EnrichCapabilities failed: {e}"
  1201. print(f" [2.2.2] ⚠️ {err_msg}")
  1202. global_errors.append(err_msg)
  1203. return 0.0
  1204. # 并行执行 Step 2
  1205. step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities())
  1206. for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]):
  1207. total_cost += cost
  1208. costs_breakdown[name] = round(cost, 4)
  1209. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  1210. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  1211. strategy_file_path = output_dir / "strategy.json"
  1212. if args.restart_mode == "single":
  1213. force_strategy_rerun = False
  1214. if strategy_file_path.exists() and not force_strategy_rerun and not force_active:
  1215. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  1216. else:
  1217. if strategy_file_path.exists():
  1218. print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
  1219. print(" > Using [Workflow Core]")
  1220. from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
  1221. try:
  1222. phase3_result = await assemble_strategy(
  1223. process_file=Path(process_file),
  1224. capabilities_file=Path(capabilities_file),
  1225. output_file=strategy_file_path,
  1226. requirement=requirement,
  1227. llm_call=claude_llm_call,
  1228. model=claude_model,
  1229. )
  1230. phase3_cost = phase3_result.get("total_cost", 0.0)
  1231. print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps")
  1232. except Exception as e:
  1233. err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}"
  1234. print(f" ⚠️ {err_msg}")
  1235. global_errors.append(err_msg)
  1236. phase3_cost = 0.0
  1237. total_cost += phase3_cost
  1238. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  1239. else:
  1240. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  1241. end_time = time.time()
  1242. elapsed_sec = end_time - start_time
  1243. # Save Metrics
  1244. metrics_file = base_dir / "run_metrics.json"
  1245. metrics_data = []
  1246. if metrics_file.exists():
  1247. with open(metrics_file, "r", encoding="utf-8") as f:
  1248. try:
  1249. metrics_data = json.load(f)
  1250. except json.JSONDecodeError:
  1251. pass
  1252. # Collect trace_ids from all phases
  1253. trace_ids = {}
  1254. if 'phase1_trace_ids' in dir():
  1255. trace_ids.update(phase1_trace_ids)
  1256. metrics_data.append({
  1257. "index": args.index,
  1258. "requirement": requirement[:80] + "...",
  1259. "duration_seconds": round(elapsed_sec, 2),
  1260. "total_cost_usd": round(total_cost, 4),
  1261. "costs_breakdown": costs_breakdown,
  1262. "trace_ids": trace_ids,
  1263. "errors": global_errors,
  1264. "timestamp": datetime.now().isoformat()
  1265. })
  1266. with open(metrics_file, "w", encoding="utf-8") as f:
  1267. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  1268. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  1269. finally:
  1270. pass
  1271. print("✅ Pipeline run finished.")
  1272. if strategy_file:
  1273. print("✅ Strategy saved to:", strategy_file)
  1274. if __name__ == "__main__":
  1275. asyncio.run(main())