run_pipeline.py 74 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508
  1. """
  2. V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
  3. """
  4. import argparse
  5. import asyncio
  6. import json
  7. import os
  8. import sys
  9. import time
  10. from datetime import datetime
  11. from pathlib import Path
  12. PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
  13. # Force runtime working directory to project root so relative trace/cache paths
  14. # always land in the repo root no matter where this script is launched from.
  15. os.chdir(PROJECT_ROOT)
  16. # Add project root to path
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. from agent.llm.prompts import SimplePrompt
  21. from agent.core.runner import AgentRunner, RunConfig
  22. from agent.tools.builtin.knowledge import KnowledgeConfig
  23. from agent.trace import FileSystemTraceStore, Trace, Message
  24. from agent.llm import create_qwen_llm_call
  25. from agent.llm.openrouter import create_openrouter_llm_call
  26. from agent.llm.claude import create_claude_llm_call
  27. # config from existing setup
  28. from examples.process_research.config import (
  29. OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
  30. BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
  31. )
  32. from agent.utils import setup_logging
  33. async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False, start_trace_id: str = None, additional_messages: list = None):
  34. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  35. base_dir = Path(__file__).parent
  36. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  37. prompt = SimplePrompt(prompt_path)
  38. messages = prompt.build_messages(**kwargs)
  39. if additional_messages:
  40. messages.extend(additional_messages)
  41. target_tools = []
  42. if prompt_name == "extract_capabilities":
  43. target_tools = ["capability_search", "capability_list", "tool_search"]
  44. # 按 agent 类型配置工具组权限
  45. tool_groups_map = {
  46. "researcher": ["core", "content"], # 搜索+文件,无浏览器
  47. "filter_and_blueprint": ["core"], # 只需文件读写
  48. "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
  49. "assemble_strategy": ["core"], # 只需文件读写
  50. }
  51. total_task_cost = 0.0
  52. task_errors = []
  53. out_file = kwargs.get("output_file")
  54. max_retries = 3
  55. last_trace_id = start_trace_id
  56. last_validation_error = None
  57. final_trace_id = None # 用于返回最终成功的 trace_id
  58. def _instant_validate():
  59. """文件写出后立即校验并尝试修复,返回 error string 或 None"""
  60. nonlocal last_validation_error
  61. if not out_file or not Path(out_file).exists():
  62. return None
  63. try:
  64. with open(out_file, "r", encoding="utf-8") as f:
  65. raw = f.read()
  66. try:
  67. data = json.loads(raw)
  68. except json.JSONDecodeError:
  69. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  70. ok, data, desc = try_fix_and_parse(raw)
  71. if ok:
  72. with open(out_file, "w", encoding="utf-8") as f:
  73. json.dump(data, f, ensure_ascii=False, indent=2)
  74. print(f" 🔧 [Instant Fix] {desc}")
  75. else:
  76. last_validation_error = "JSON parse failed, auto-fix unsuccessful"
  77. return last_validation_error
  78. filename = Path(out_file).name
  79. err = None
  80. if filename.startswith("case_"):
  81. err = validate_case(data)
  82. elif filename == "blueprint.json":
  83. err = validate_blueprint(data)
  84. elif filename == "capabilities_extracted.json":
  85. err = validate_capabilities(data)
  86. elif filename == "strategy.json":
  87. err = validate_strategy(data)
  88. if err:
  89. last_validation_error = err
  90. print(f" ⚠️ [Instant Validation] {err}")
  91. return err
  92. else:
  93. print(f" ✅ [Instant Validation] {filename} OK")
  94. return None
  95. except Exception as e:
  96. last_validation_error = str(e)
  97. return str(e)
  98. for attempt in range(max_retries):
  99. if attempt > 0 and last_trace_id and last_validation_error:
  100. # 续跑模式:把错误信息告诉之前的 agent,让它修复
  101. print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
  102. fix_messages = [{
  103. "role": "user",
  104. "content": (
  105. f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
  106. f"错误详情:{last_validation_error}\n\n"
  107. f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
  108. f"只修复有问题的部分,不要丢弃已有的正确内容。"
  109. )
  110. }]
  111. fix_config = RunConfig(
  112. model=prompt.config.get("model") or model_name,
  113. temperature=0.1,
  114. name=f"{task_name}_Fix{attempt}",
  115. agent_type=prompt_name,
  116. tools=target_tools,
  117. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  118. trace_id=last_trace_id,
  119. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  120. )
  121. try:
  122. async for item in runner.run(messages=fix_messages, config=fix_config):
  123. if isinstance(item, Trace):
  124. last_trace_id = item.trace_id
  125. if item.status == "completed":
  126. total_task_cost += item.total_cost
  127. elif item.status == "failed":
  128. task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
  129. if isinstance(item, Message) and item.role == "tool":
  130. content = item.content if isinstance(item.content, dict) else {}
  131. if content.get("tool_name") in ("write_file", "write_json"):
  132. print(f" 💾 [Fix File Written by {task_name}]")
  133. _instant_validate()
  134. except Exception as e:
  135. err_msg = f"{type(e).__name__}: {e}"
  136. print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
  137. task_errors.append(f"{task_name} fix crashed: {err_msg}")
  138. elif attempt > 0:
  139. # 没有 trace_id 或没有 validation error,只能完全重跑
  140. print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
  141. if out_file and Path(out_file).exists():
  142. Path(out_file).unlink()
  143. run_config = RunConfig(
  144. model=prompt.config.get("model") or model_name,
  145. temperature=prompt.config.get("temperature") or 0.3,
  146. name=f"{task_name}_A{attempt}",
  147. agent_type=prompt_name,
  148. tools=target_tools,
  149. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  150. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  151. )
  152. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  153. try:
  154. async for item in runner.run(messages=messages, config=run_config):
  155. if isinstance(item, Trace):
  156. last_trace_id = item.trace_id
  157. if item.status == "completed":
  158. total_task_cost += item.total_cost
  159. elif item.status == "failed":
  160. task_errors.append(f"{task_name} Failed: {item.error_message}")
  161. if isinstance(item, Message):
  162. if item.role == "tool":
  163. content = item.content if isinstance(item.content, dict) else {}
  164. t_name = content.get("tool_name", "unknown")
  165. if t_name in ("write_file", "write_json"):
  166. print(f" 💾 [File Written by {task_name}]")
  167. _instant_validate()
  168. except Exception as e:
  169. err_msg = f"{type(e).__name__}: {e}"
  170. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  171. task_errors.append(f"{task_name} crashed: {err_msg}")
  172. else:
  173. # 首次执行
  174. run_config = RunConfig(
  175. model=prompt.config.get("model") or model_name,
  176. temperature=prompt.config.get("temperature") or 0.3,
  177. name=f"{task_name}_A{attempt}",
  178. agent_type=prompt_name,
  179. tools=target_tools,
  180. tool_groups=tool_groups_map.get(prompt_name, ["core"]),
  181. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False),
  182. trace_id=last_trace_id
  183. )
  184. print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
  185. try:
  186. async for item in runner.run(messages=messages, config=run_config):
  187. if isinstance(item, Trace):
  188. last_trace_id = item.trace_id
  189. if item.status == "completed":
  190. total_task_cost += item.total_cost
  191. elif item.status == "failed":
  192. task_errors.append(f"{task_name} Failed: {item.error_message}")
  193. if isinstance(item, Message):
  194. if item.role == "tool":
  195. content = item.content if isinstance(item.content, dict) else {}
  196. t_name = content.get("tool_name", "unknown")
  197. if t_name in ("write_file", "write_json"):
  198. print(f" 💾 [File Written by {task_name}]")
  199. _instant_validate()
  200. except Exception as e:
  201. err_msg = f"{type(e).__name__}: {e}"
  202. print(f"❌ [Exception] {task_name} crashed: {err_msg}")
  203. task_errors.append(f"{task_name} crashed: {err_msg}")
  204. # Verification & Recovery block
  205. if out_file and not Path(out_file).exists() and last_trace_id:
  206. print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
  207. recovery_messages = [{
  208. "role": "user",
  209. "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
  210. }]
  211. rec_config = RunConfig(
  212. model=model_name,
  213. temperature=0.1,
  214. name=task_name + "_Rec",
  215. agent_type=prompt_name,
  216. trace_id=last_trace_id,
  217. knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
  218. )
  219. try:
  220. async for r_item in runner.run(messages=recovery_messages, config=rec_config):
  221. if isinstance(r_item, Trace):
  222. if r_item.status == "completed":
  223. total_task_cost += r_item.total_cost
  224. elif r_item.status == "failed":
  225. task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
  226. if isinstance(r_item, Message) and r_item.role == "tool":
  227. content = r_item.content if isinstance(r_item.content, dict) else {}
  228. if content.get("tool_name") in ("write_file", "write_json"):
  229. print(f" 💾 [Recovery File Written by {task_name}]")
  230. _instant_validate()
  231. except Exception as e:
  232. err_msg = f"{type(e).__name__}: {e}"
  233. print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
  234. task_errors.append(f"{task_name} recovery crashed: {err_msg}")
  235. # Schema Validation (with auto-fix layer)
  236. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  237. if skip_validation:
  238. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  239. return total_task_cost, task_errors, last_trace_id
  240. try:
  241. with open(out_file, "r", encoding="utf-8") as f:
  242. raw_content = f.read()
  243. # Layer 1: 尝试直接解析
  244. try:
  245. data = json.loads(raw_content)
  246. except json.JSONDecodeError as parse_err:
  247. # Layer 2: 自动修复 JSON 语法错误
  248. print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...")
  249. try:
  250. from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
  251. success, data, fix_desc = try_fix_and_parse(raw_content)
  252. if success:
  253. # 修复成功,写回文件
  254. with open(out_file, "w", encoding="utf-8") as f:
  255. json.dump(data, f, ensure_ascii=False, indent=2)
  256. print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}")
  257. else:
  258. raise parse_err # 修复失败,抛出原始错误
  259. except ImportError:
  260. raise parse_err # fix_json_quotes 不可用,抛出原始错误
  261. filename = Path(out_file).name
  262. err = None
  263. if filename.startswith("case_"):
  264. err = validate_case(data)
  265. elif filename == "blueprint.json":
  266. err = validate_blueprint(data)
  267. elif filename == "capabilities_extracted.json":
  268. err = validate_capabilities(data)
  269. elif filename == "strategy.json":
  270. err = validate_strategy(data)
  271. if err:
  272. raise ValueError(f"Schema Validation Failed: {err}")
  273. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  274. final_trace_id = last_trace_id
  275. return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
  276. except Exception as e:
  277. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  278. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  279. task_errors.append(f"{task_name} Error: {e}")
  280. last_validation_error = str(e)
  281. if attempt == max_retries - 1:
  282. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  283. return total_task_cost, task_errors, last_trace_id
  284. else:
  285. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  286. last_validation_error = None
  287. if attempt == max_retries - 1:
  288. return total_task_cost, task_errors, last_trace_id
  289. return total_task_cost, task_errors, last_trace_id
  290. async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
  291. """
  292. 备用:使用纯净的官方 Anthropic SDK 驱动。
  293. 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
  294. """
  295. from anthropic import AsyncAnthropic
  296. from agent.tools.registry import get_tool_registry
  297. base_dir = Path(__file__).parent
  298. prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
  299. prompt = SimplePrompt(prompt_path)
  300. # 1. 组装输入 Message
  301. raw_messages = prompt.build_messages(**kwargs)
  302. system_prompt = ""
  303. messages = []
  304. for msg in raw_messages:
  305. if msg["role"] == "system":
  306. system_prompt += msg["content"] + "\n\n"
  307. else:
  308. messages.append({"role": msg["role"], "content": msg["content"]})
  309. # 2. 映射目标工具
  310. target_tools = ["write_file", "write_json", "read_file", "glob_files"]
  311. if prompt_name == "extract_capabilities":
  312. target_tools.extend(["capability_search", "capability_list", "tool_search"])
  313. registry = get_tool_registry()
  314. schemas = registry.get_schemas(target_tools)
  315. anthropic_tools = []
  316. for s in schemas:
  317. anthropic_tools.append({
  318. "name": s["function"]["name"],
  319. "description": s["function"].get("description", ""),
  320. "input_schema": s["function"]["parameters"]
  321. })
  322. # 3. 初始化并开启 Loop
  323. # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
  324. client = AsyncAnthropic()
  325. total_task_cost = 0.0
  326. task_errors = []
  327. sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
  328. from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
  329. out_file = kwargs.get("output_file")
  330. max_retries = 3
  331. for attempt in range(max_retries):
  332. if attempt > 0:
  333. print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
  334. if out_file and Path(out_file).exists():
  335. Path(out_file).unlink()
  336. print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
  337. # Reset messages for retry
  338. messages_copy = list(messages)
  339. max_loops = 50
  340. for loop_idx in range(max_loops):
  341. try:
  342. # 去除前缀(兼容比如 openrouter 传入的名字)
  343. clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
  344. # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-6
  345. target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model
  346. response = await client.messages.create(
  347. model=target_model,
  348. max_tokens=4096,
  349. temperature=0.2,
  350. system=system_prompt,
  351. messages=messages_copy,
  352. tools=anthropic_tools
  353. )
  354. # (简略预估,不代表真实官方开销)
  355. if hasattr(response, 'usage'):
  356. step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
  357. total_task_cost += step_cost
  358. # 加入助手回复
  359. assistant_content = []
  360. tool_uses = []
  361. for content_block in response.content:
  362. if content_block.type == "text":
  363. text_val = content_block.text
  364. if text_val:
  365. assistant_content.append({"type": "text", "text": text_val})
  366. print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
  367. elif content_block.type == "tool_use":
  368. assistant_content.append({
  369. "type": "tool_use",
  370. "id": content_block.id,
  371. "name": content_block.name,
  372. "input": content_block.input
  373. })
  374. tool_uses.append(content_block)
  375. if not assistant_content:
  376. assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
  377. messages_copy.append({"role": "assistant", "content": assistant_content})
  378. # 出口:没有调用工具说明任务结束
  379. if not tool_uses:
  380. print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
  381. break
  382. # 工具执行与回传
  383. tool_results = []
  384. for tu in tool_uses:
  385. if tu.name in ("write_file", "write_json"):
  386. print(f" 💾 [File Written by SDK] {task_name}")
  387. print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
  388. # 执行本地环境的函数
  389. result_str = await registry.execute(tu.name, tu.input)
  390. tool_results.append({
  391. "type": "tool_result",
  392. "tool_use_id": tu.id,
  393. "content": result_str
  394. })
  395. messages_copy.append({"role": "user", "content": tool_results})
  396. except Exception as e:
  397. err_msg = str(e)
  398. print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
  399. task_errors.append(err_msg)
  400. break
  401. # Verification & Recovery block for SDK (porting from AgentRunner)
  402. if out_file and not Path(out_file).exists():
  403. print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
  404. messages_copy.append({
  405. "role": "user",
  406. "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
  407. })
  408. try:
  409. target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model
  410. rec_response = await client.messages.create(
  411. model=target_model,
  412. max_tokens=4096,
  413. temperature=0.1,
  414. system=system_prompt,
  415. messages=messages_copy,
  416. tools=anthropic_tools,
  417. tool_choice={"type": "any"}
  418. )
  419. for content_block in rec_response.content:
  420. if content_block.type == "tool_use":
  421. if content_block.name in ("write_file", "write_json"):
  422. print(f" 💾 [Recovery File Written by SDK] {task_name}")
  423. await registry.execute(content_block.name, content_block.input)
  424. except Exception as e:
  425. print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
  426. task_errors.append(str(e))
  427. # Schema Validation
  428. if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
  429. if skip_validation:
  430. print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
  431. return total_task_cost, task_errors, None
  432. try:
  433. with open(out_file, "r", encoding="utf-8") as f:
  434. data = json.loads(f.read())
  435. filename = Path(out_file).name
  436. err = None
  437. if filename.startswith("case_"):
  438. err = validate_case(data)
  439. elif filename == "blueprint.json":
  440. err = validate_blueprint(data)
  441. elif filename == "capabilities_extracted.json":
  442. err = validate_capabilities(data)
  443. elif filename == "strategy.json":
  444. err = validate_strategy(data)
  445. if err:
  446. raise ValueError(f"Schema Validation Failed: {err}")
  447. print(f" ✅ [Schema Validated] {Path(out_file).name}")
  448. return total_task_cost, task_errors # Success! Exit retry loop.
  449. except Exception as e:
  450. err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
  451. print(f"❌ [Validation Error] {task_name}: {err_msg}")
  452. task_errors.append(f"{task_name} Error: {e}")
  453. if attempt == max_retries - 1:
  454. print(f"❌ [Retry Limit] {task_name} exhausted retries.")
  455. return total_task_cost, task_errors
  456. else:
  457. print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
  458. if attempt == max_retries - 1:
  459. return total_task_cost, task_errors
  460. return total_task_cost, task_errors
  461. async def main():
  462. parser = argparse.ArgumentParser()
  463. parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
  464. STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"]
  465. parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search")
  466. parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
  467. parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step")
  468. parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)")
  469. parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)")
  470. parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)")
  471. parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)")
  472. parser.add_argument("--use-claude-sdk", action="store_true", help="Use Claude SDK (CLAUDE_CODE_KEY/URL) instead of OpenRouter")
  473. args = parser.parse_args()
  474. # ── 参数验证 ──
  475. # --case-index 只能与提取步骤一起使用
  476. if args.case_index is not None:
  477. extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"}
  478. if args.only_step and args.only_step not in extraction_steps:
  479. print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}")
  480. sys.exit(1)
  481. if not args.only_step:
  482. print("❌ Error: --case-index requires --only-step to specify which extraction step to run")
  483. sys.exit(1)
  484. # ── 三种模式互斥检查 ──
  485. mode_count = sum([
  486. args.only_step is not None,
  487. args.phase is not None,
  488. (args.start_from is not None or args.end_at is not None),
  489. ])
  490. if mode_count > 1:
  491. print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.")
  492. sys.exit(1)
  493. # 定义步骤拓扑(非线性,Phase 2 有两条并行分支)
  494. STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查
  495. def _step_in_range(step, start, end):
  496. """判断线性前缀中的 step 是否在 [start, end] 范围内"""
  497. all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"]
  498. if step not in all_steps or start not in all_steps:
  499. return False
  500. # 对于线性前缀,用索引比较
  501. if step in LINEAR_PREFIX:
  502. s_idx = LINEAR_PREFIX.index(step)
  503. start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1
  504. # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑
  505. if start_idx < 0:
  506. return False
  507. return s_idx >= start_idx
  508. return False
  509. PHASE_MAP = {
  510. 1: {"research", "source", "case-detailed"},
  511. 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"},
  512. 3: {"strategy"},
  513. }
  514. BRANCH_21 = ["process-cluster", "process-score"]
  515. BRANCH_22 = ["capability-enrich"]
  516. # 线性前缀(Phase 2 之前)
  517. LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"]
  518. # 计算需要执行的步骤集合
  519. active_steps = None # None = 全部执行
  520. if args.only_step:
  521. active_steps = {args.only_step}
  522. elif args.phase:
  523. active_steps = PHASE_MAP[args.phase]
  524. elif args.start_from or args.end_at:
  525. start = args.start_from or "research"
  526. end = args.end_at or "strategy"
  527. # 构建 active_steps,考虑并行分支
  528. active = set()
  529. # 1. 线性前缀部分
  530. for s in LINEAR_PREFIX:
  531. if _step_in_range(s, start, end):
  532. active.add(s)
  533. # 2. Phase 2 分支部分
  534. start_in_21 = start in BRANCH_21
  535. start_in_22 = start in BRANCH_22
  536. end_in_21 = end in BRANCH_21
  537. end_in_22 = end in BRANCH_22
  538. end_is_strategy = end == "strategy"
  539. # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑
  540. if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX):
  541. # 两条分支都包含(如果 start 允许的话)
  542. if not start_in_21 and not start_in_22:
  543. # start 在 Phase 2 之前或就是 Phase 2 的开头
  544. active.update(BRANCH_21)
  545. active.update(BRANCH_22)
  546. elif start_in_21:
  547. # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑
  548. idx = BRANCH_21.index(start)
  549. active.update(BRANCH_21[idx:])
  550. active.update(BRANCH_22)
  551. elif start_in_22:
  552. # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑
  553. idx = BRANCH_22.index(start)
  554. active.update(BRANCH_22[idx:])
  555. active.update(BRANCH_21)
  556. elif end_in_21:
  557. # end 在 2.1 分支内,只跑 2.1
  558. end_idx = BRANCH_21.index(end)
  559. start_idx = BRANCH_21.index(start) if start_in_21 else 0
  560. active.update(BRANCH_21[start_idx:end_idx + 1])
  561. elif end_in_22:
  562. # end 在 2.2 分支内,只跑 2.2
  563. end_idx = BRANCH_22.index(end)
  564. start_idx = BRANCH_22.index(start) if start_in_22 else 0
  565. active.update(BRANCH_22[start_idx:end_idx + 1])
  566. # 3. strategy
  567. if end_is_strategy:
  568. active.add("strategy")
  569. active_steps = active
  570. def should_run(step_name: str) -> bool:
  571. if active_steps is None:
  572. return True
  573. return step_name in active_steps
  574. if active_steps is not None:
  575. active_list = [s for s in STEP_ORDER if s in active_steps]
  576. print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}")
  577. base_dir = Path(__file__).parent
  578. # Load requirements locally
  579. req_path = base_dir / "db_requirements.json"
  580. with open(req_path, encoding='utf-8') as f:
  581. reqs = json.load(f)
  582. if args.index < 0 or args.index >= len(reqs):
  583. print("Index out of bounds")
  584. sys.exit(1)
  585. requirement = reqs[args.index]
  586. # 0. Setup directories
  587. output_dir = base_dir / "output" / f"{(args.index+1):03d}"
  588. output_dir.mkdir(parents=True, exist_ok=True)
  589. raw_cases_dir = output_dir / "raw_cases"
  590. raw_cases_dir.mkdir(parents=True, exist_ok=True)
  591. setup_logging(level=LOG_LEVEL, file=LOG_FILE)
  592. print("=" * 60)
  593. print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
  594. print("=" * 60)
  595. # ── 前置文件检查 ──
  596. # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在)
  597. STEP_DEPS = {
  598. "research": [],
  599. "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))],
  600. "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())],
  601. "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
  602. "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
  603. "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())],
  604. "process-cluster": [
  605. ("case.json", lambda: (output_dir / "case.json").exists()),
  606. ],
  607. "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())],
  608. "capability-enrich": [
  609. ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()),
  610. ("case.json", lambda: (output_dir / "case.json").exists()),
  611. ],
  612. "strategy": [
  613. ("process.json", lambda: (output_dir / "process.json").exists()),
  614. ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()),
  615. ],
  616. }
  617. # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖)
  618. STEP_PRODUCES = {
  619. "research": {"case_*.json"},
  620. "source": {"source.json"},
  621. "generate-case": {"case.json"},
  622. "workflow-extract": {"case.json"}, # 原地更新
  623. "capability-extract": {"case.json"}, # 原地更新
  624. "apply-grounding": {"case.json"}, # 原地更新
  625. "process-cluster": {"blueprint_temp.json"},
  626. "process-score": {"process.json"},
  627. "capability-enrich": {"capabilities.json"},
  628. "strategy": {"strategy.json"},
  629. }
  630. if active_steps is not None:
  631. # 计算本次运行会生成的文件集合
  632. will_produce = set()
  633. for s in STEP_ORDER:
  634. if s in active_steps:
  635. will_produce.update(STEP_PRODUCES.get(s, set()))
  636. # 检查每个 active step 的前置文件
  637. missing = []
  638. for s in STEP_ORDER:
  639. if s not in active_steps:
  640. continue
  641. for dep_name, dep_check in STEP_DEPS.get(s, []):
  642. if dep_name in will_produce:
  643. continue # 上游 step 会在本次运行中生成
  644. if not dep_check():
  645. missing.append((s, dep_name))
  646. if missing:
  647. print(f"\n❌ [Pre-flight Check] Missing prerequisite files:")
  648. for step, dep in missing:
  649. print(f" - Step '{step}' requires '{dep}'")
  650. print(f"\nRun upstream steps first, or use --start-from to include them.")
  651. sys.exit(1)
  652. else:
  653. print(f"✅ [Pre-flight Check] All prerequisites satisfied")
  654. # Load presets
  655. presets_path = base_dir / "presets.json"
  656. if presets_path.exists():
  657. from agent.core.presets import load_presets_from_json
  658. load_presets_from_json(str(presets_path))
  659. print("✅ Configured Agent Presets (Skills Boundaries)")
  660. # Browser initialization removed to save resources
  661. store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
  662. # Instantiate two distinct LLM orchestrators
  663. qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
  664. # 根据 --use-claude-sdk 参数选择 LLM 提供商
  665. if args.use_claude_sdk:
  666. # 使用 Claude SDK (CLAUDE_CODE_KEY/URL 或 ANTHROPIC_API_KEY/BASE_URL)
  667. claude_model = "claude-sonnet-4-6"
  668. print(f"✅ Using Claude SDK with model: {claude_model}")
  669. # 所有环节走 Claude Agent SDK(OAuth / Max 订阅)
  670. from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call
  671. claude_llm_call = create_claude_code_oauth_llm_call(model=claude_model)
  672. workflow_llm_call = claude_llm_call
  673. print(f"✅ All Claude operations will use Claude Agent SDK (OAuth/Max subscription)")
  674. else:
  675. # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode)
  676. claude_model = "openai/gpt-5.4"
  677. print(f"✅ Using OpenRouter with model: {claude_model}")
  678. from agent.llm.openrouter import create_openrouter_llm_call
  679. claude_llm_call = create_openrouter_llm_call(model=claude_model)
  680. workflow_llm_call = claude_llm_call # 没开 SDK 时与默认一致
  681. runner_qwen = AgentRunner(
  682. trace_store=store,
  683. llm_call=create_qwen_llm_call(model=qwen_model),
  684. skills_dir=SKILLS_DIR
  685. )
  686. runner_claude = AgentRunner(
  687. trace_store=store,
  688. llm_call=claude_llm_call,
  689. skills_dir=SKILLS_DIR
  690. )
  691. try:
  692. start_time = time.time()
  693. total_cost = 0.0
  694. costs_breakdown = {}
  695. global_errors = []
  696. strategy_file = None
  697. TARGET_QUALIFIED_CASES = 15
  698. # ── --only-step 单步执行模式 ──────────────────────────────
  699. if args.only_step:
  700. step = args.only_step
  701. source_file = raw_cases_dir / "source.json"
  702. detailed_file = raw_cases_dir / "case_detailed.json"
  703. blueprint_temp_file = output_dir / "blueprint_temp.json"
  704. capabilities_temp_file = output_dir / "capabilities_temp.json"
  705. process_file = output_dir / "process.json"
  706. capabilities_file = output_dir / "capabilities.json"
  707. print(f"\n[Single Step Mode] Running only: {step}")
  708. if step == "research":
  709. # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响
  710. single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  711. if not single_platforms:
  712. print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube")
  713. sys.exit(1)
  714. print(f" 🔍 Research platforms: {single_platforms}")
  715. single_is_single = args.restart_mode.startswith("single_")
  716. phase1_tasks = []
  717. for p in single_platforms:
  718. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=80、正文充实)。"
  719. out_file = str(raw_cases_dir / f"case_{p}.json")
  720. kwargs = {
  721. "task": task_desc,
  722. "output_file": out_file
  723. }
  724. phase1_tasks.append(
  725. run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single)
  726. )
  727. phase1_results = await asyncio.gather(*phase1_tasks)
  728. for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms):
  729. print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}")
  730. elif step == "source":
  731. # Phase 1.5: 提取原始 source.json
  732. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  733. result = extract_sources_to_json(raw_cases_dir)
  734. print(f" ✓ source.json: matched={result['total_matched']}, filtered={result['filtered_total']}")
  735. if result.get("filtered_reasons"):
  736. for reason, cnt in result["filtered_reasons"].items():
  737. print(f" - {reason}: {cnt}")
  738. elif step == "generate-case":
  739. # Phase 1.5.5: 生成标准化 case.json
  740. from examples.process_pipeline.script.generate_case import generate_case_from_source
  741. result = await generate_case_from_source(raw_cases_dir)
  742. print(f" ✓ case.json: cases={result['total_cases']}")
  743. elif step == "workflow-extract":
  744. # Phase 1.6a: 提取 workflow 到 case.json
  745. case_file = output_dir / "case.json"
  746. if not case_file.exists():
  747. print(f" ❌ case.json not found. Run --only-step generate-case first.")
  748. sys.exit(1)
  749. # 如果指定了 --case-index,先过滤 case.json
  750. if args.case_index is not None:
  751. with open(case_file, "r", encoding="utf-8") as f:
  752. case_data = json.load(f)
  753. original_cases = case_data.get("cases", [])
  754. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  755. if not target_case:
  756. print(f" ❌ Case with index {args.case_index} not found in case.json")
  757. sys.exit(1)
  758. # 临时只保留目标 case
  759. case_data["cases"] = [target_case]
  760. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  761. with open(temp_case_file, "w", encoding="utf-8") as f:
  762. json.dump(case_data, f, ensure_ascii=False, indent=2)
  763. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  764. case_file_to_use = temp_case_file
  765. else:
  766. case_file_to_use = case_file
  767. from examples.process_pipeline.script.extract_workflow import extract_workflow
  768. result = await extract_workflow(
  769. case_file_to_use,
  770. workflow_llm_call, model=claude_model
  771. )
  772. # 如果使用了临时文件,需要合并回原始 case.json
  773. if args.case_index is not None:
  774. with open(case_file_to_use, "r", encoding="utf-8") as f:
  775. updated_data = json.load(f)
  776. updated_case = updated_data["cases"][0]
  777. # 更新原始文件中的对应 case
  778. with open(case_file, "r", encoding="utf-8") as f:
  779. original_data = json.load(f)
  780. for i, c in enumerate(original_data["cases"]):
  781. if c.get("index") == args.case_index:
  782. original_data["cases"][i] = updated_case
  783. break
  784. with open(case_file, "w", encoding="utf-8") as f:
  785. json.dump(original_data, f, ensure_ascii=False, indent=2)
  786. temp_case_file.unlink() # 删除临时文件
  787. print(f" ✓ Merged case {args.case_index} back to case.json")
  788. print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}")
  789. elif step == "capability-extract":
  790. # Phase 1.6b 已废弃:capability 现在由 workflow-extract 一并写入 case.json
  791. case_file = output_dir / "case.json"
  792. if not case_file.exists():
  793. print(f" ❌ case.json not found. Run --only-step generate-case first.")
  794. sys.exit(1)
  795. print(" ⏭️ capability-extract is integrated into workflow-extract; skipping old extractor.")
  796. elif step == "apply-grounding":
  797. # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to
  798. case_file = output_dir / "case.json"
  799. if not case_file.exists():
  800. print(f" ❌ case.json not found. Run workflow-extract first.")
  801. sys.exit(1)
  802. # 如果指定了 --case-index,先过滤 case.json
  803. if args.case_index is not None:
  804. with open(case_file, "r", encoding="utf-8") as f:
  805. case_data = json.load(f)
  806. original_cases = case_data.get("cases", [])
  807. target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
  808. if not target_case:
  809. print(f" ❌ Case with index {args.case_index} not found in case.json")
  810. sys.exit(1)
  811. case_data["cases"] = [target_case]
  812. temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
  813. with open(temp_case_file, "w", encoding="utf-8") as f:
  814. json.dump(case_data, f, ensure_ascii=False, indent=2)
  815. print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
  816. case_file_to_use = temp_case_file
  817. else:
  818. case_file_to_use = case_file
  819. from examples.process_pipeline.script.apply_to_grounding import apply_grounding
  820. result = await apply_grounding(
  821. case_file_to_use,
  822. claude_llm_call, model=claude_model
  823. )
  824. # 如果使用了临时文件,需要合并回原始 case.json
  825. if args.case_index is not None:
  826. with open(case_file_to_use, "r", encoding="utf-8") as f:
  827. updated_data = json.load(f)
  828. updated_case = updated_data["cases"][0]
  829. with open(case_file, "r", encoding="utf-8") as f:
  830. original_data = json.load(f)
  831. for i, c in enumerate(original_data["cases"]):
  832. if c.get("index") == args.case_index:
  833. original_data["cases"][i] = updated_case
  834. break
  835. with open(case_file, "w", encoding="utf-8") as f:
  836. json.dump(original_data, f, ensure_ascii=False, indent=2)
  837. temp_case_file.unlink()
  838. print(f" ✓ Merged case {args.case_index} back to case.json")
  839. print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}")
  840. elif step == "process-cluster":
  841. # Phase 2.1.1: 工序聚类
  842. if not detailed_file.exists():
  843. print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
  844. sys.exit(1)
  845. from examples.process_pipeline.script.cluster_processes import cluster_processes
  846. result = await cluster_processes(
  847. source_file=source_file, detailed_file=detailed_file,
  848. output_file=blueprint_temp_file, requirement=requirement,
  849. llm_call=claude_llm_call, model=claude_model,
  850. )
  851. print(f" ✓ blueprint_temp.json: clusters={result['clusters']}")
  852. elif step == "process-score":
  853. # Phase 2.1.2: 工序打分
  854. if not blueprint_temp_file.exists():
  855. print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.")
  856. sys.exit(1)
  857. from examples.process_pipeline.script.score_processes import score_blueprints
  858. result = await score_blueprints(
  859. blueprint_file=blueprint_temp_file, output_file=process_file,
  860. requirement=requirement, llm_call=claude_llm_call, model=claude_model,
  861. )
  862. print(f" ✓ process.json: scored={result['scored']}")
  863. elif step == "capability-extract":
  864. # Phase 2.2.1: 能力初步聚类
  865. if not detailed_file.exists():
  866. print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
  867. sys.exit(1)
  868. from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
  869. result = await extract_capabilities_workflow(
  870. detailed_file=detailed_file, source_file=source_file,
  871. output_file=capabilities_temp_file, requirement=requirement,
  872. llm_call=claude_llm_call, model=claude_model,
  873. )
  874. print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}")
  875. elif step == "capability-enrich":
  876. # Phase 2.2.2: 能力丰富化
  877. if not capabilities_temp_file.exists():
  878. print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.")
  879. sys.exit(1)
  880. if not source_file.exists():
  881. print(f" ❌ source.json not found. Run --only-step source first.")
  882. sys.exit(1)
  883. from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
  884. result = await enrich_all_capabilities(
  885. capabilities_temp_file=capabilities_temp_file, source_file=source_file,
  886. output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model,
  887. )
  888. print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}")
  889. elif step == "strategy":
  890. # Phase 3: 策略组装
  891. if not process_file.exists():
  892. print(f" ❌ process.json not found. Run --only-step process-score first.")
  893. sys.exit(1)
  894. if not capabilities_file.exists():
  895. print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.")
  896. sys.exit(1)
  897. strategy_file_path = output_dir / "strategy.json"
  898. from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
  899. result = await assemble_strategy(
  900. process_file=process_file, capabilities_file=capabilities_file,
  901. output_file=strategy_file_path, requirement=requirement,
  902. llm_call=claude_llm_call, model=claude_model,
  903. )
  904. print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}")
  905. elapsed = time.time() - start_time
  906. print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s")
  907. return
  908. # ── 正常 pipeline 流程 ──────────────────────────────
  909. platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
  910. # Phase 0: Platform Selection (controlled by --platforms)
  911. if should_run("research"):
  912. print(f"\n--- Phase 0: Using specified platforms ---")
  913. print(f"📡 Will research platforms: {platforms}")
  914. # 注:不再跳过已有平台,因为 agent 是增量追加模式
  915. is_single_step = args.restart_mode.startswith("single_")
  916. # Phase 1 & 1.5: MAP (Parallel Search) and Source Extraction Loop
  917. if should_run("research") or should_run("source"):
  918. print(f"\n--- Phase 1: Distributed Research Map with Source Filter Loop ({qwen_model}) ---")
  919. from examples.process_pipeline.script.extract_sources import extract_sources_to_json
  920. MAX_ROUNDS = 50
  921. platform_traces = {p: None for p in platforms}
  922. active_platforms = platforms.copy()
  923. phase1_trace_ids = {}
  924. if not should_run("research") and should_run("source"):
  925. try:
  926. src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=None)
  927. print(f"📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}")
  928. if src_stats.get("filtered_reasons"):
  929. for reason, cnt in src_stats["filtered_reasons"].items():
  930. print(f" - {reason}: {cnt}")
  931. except Exception as e:
  932. err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
  933. print(f"⚠️ [Warning] {err_msg}")
  934. global_errors.append(err_msg)
  935. elif should_run("research"):
  936. round_idx = 0
  937. last_src_stats = None # 保存上一轮的过滤统计
  938. last_platform_count = {} # 保存上一轮每个平台的合格数
  939. while active_platforms and round_idx < MAX_ROUNDS:
  940. print(f"\n >>> [Research Loop] Round {round_idx+1} - Active platforms: {active_platforms}")
  941. if active_platforms:
  942. phase1_tasks = []
  943. for p in active_platforms:
  944. task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=70、正文充实)。"
  945. out_file = str(raw_cases_dir / f"case_{p}.json")
  946. additional_msgs = None
  947. if round_idx > 0 and last_src_stats:
  948. # 构建带过滤详情的 feedback message
  949. p_count = last_platform_count.get(p, 0)
  950. # 筛选出该平台被过滤的条目
  951. p_filtered = [d for d in last_src_stats.get("filtered_details", []) if d.get("platform") == p]
  952. reason_summary = last_src_stats.get("filtered_reasons", {})
  953. feedback_lines = [
  954. f"【系统反馈】你在上一轮提取的有效案例数量未达标。",
  955. f"当前 {p.upper()} 合格案例:{p_count}/{TARGET_QUALIFIED_CASES}",
  956. f"过滤统计:{dict(reason_summary)}" if reason_summary else "",
  957. ]
  958. if p_filtered:
  959. feedback_lines.append(f"\n以下是你提交的被过滤掉的帖子(共{len(p_filtered)}条):")
  960. for item in p_filtered[:10]:
  961. feedback_lines.append(f" - [{item['case_id']}] {item['title']} → 原因: {item['filter_reason']}")
  962. if len(p_filtered) > 10:
  963. feedback_lines.append(f" ... 还有 {len(p_filtered) - 10} 条未列出")
  964. feedback_lines.append(
  965. f"\n请继续搜索并提取更多**全新的、不同的**高质量案例,**追加**写入到你一直在维护的文件中。"
  966. f"注意:不要重复之前已找过的案例!针对上述被过滤的原因,请确保新案例有详实的正文内容且评分准确。"
  967. )
  968. additional_msgs = [{
  969. "role": "user",
  970. "content": "\n".join(line for line in feedback_lines if line)
  971. }]
  972. kwargs = {
  973. "task": task_desc,
  974. "output_file": out_file
  975. }
  976. phase1_tasks.append(
  977. run_agent_task(
  978. runner_qwen, "researcher", kwargs,
  979. f"P1_Research_{p}_R{round_idx+1}", qwen_model,
  980. skip_validation=is_single_step,
  981. start_trace_id=platform_traces[p],
  982. additional_messages=additional_msgs
  983. )
  984. )
  985. phase1_results = await asyncio.gather(*phase1_tasks)
  986. for (task_cost, task_errors, trace_id), p in zip(phase1_results, active_platforms):
  987. total_cost += task_cost
  988. cost_key = f"P1_Research_{p}"
  989. costs_breakdown[cost_key] = costs_breakdown.get(cost_key, 0.0) + round(task_cost, 4)
  990. platform_traces[p] = trace_id
  991. phase1_trace_ids[f"P1_Research_{p}"] = trace_id
  992. global_errors.extend(task_errors)
  993. expected_file = Path(raw_cases_dir / f"case_{p}.json")
  994. if not expected_file.exists():
  995. err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
  996. print(f"⚠️ [Warning] {err_msg}")
  997. global_errors.append(err_msg)
  998. if should_run("source"):
  999. try:
  1000. trace_id_list = [tid for tid in phase1_trace_ids.values() if tid]
  1001. src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
  1002. last_src_stats = src_stats
  1003. print(f" 📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}")
  1004. if src_stats.get("filtered_reasons"):
  1005. for reason, cnt in src_stats["filtered_reasons"].items():
  1006. print(f" - {reason}: {cnt}")
  1007. except Exception as e:
  1008. err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
  1009. print(f" ⚠️ [Warning] {err_msg}")
  1010. global_errors.append(err_msg)
  1011. # 判断是否达标:直接看 source.json 里每个平台的条目数
  1012. source_file = raw_cases_dir / "source.json"
  1013. if source_file.exists():
  1014. with open(source_file, "r", encoding="utf-8") as f:
  1015. source_data = json.load(f)
  1016. platform_count = {}
  1017. for s in source_data.get("sources", []):
  1018. p = s.get("platform")
  1019. if p:
  1020. platform_count[p] = platform_count.get(p, 0) + 1
  1021. last_platform_count = platform_count
  1022. print(f" 📊 [Source Count] Target: >={TARGET_QUALIFIED_CASES} qualified items per platform")
  1023. platforms_to_keep = []
  1024. for p in platforms:
  1025. count = platform_count.get(p, 0)
  1026. if p in active_platforms:
  1027. print(f" - {p}: {count}/{TARGET_QUALIFIED_CASES}")
  1028. if count < TARGET_QUALIFIED_CASES:
  1029. platforms_to_keep.append(p)
  1030. active_platforms = platforms_to_keep
  1031. if not active_platforms:
  1032. print(f" ✅ All platforms reached the target {TARGET_QUALIFIED_CASES} qualified items!")
  1033. break
  1034. else:
  1035. print(f" ⚠️ source.json not found, continuing loop to retry...")
  1036. round_idx += 1
  1037. if round_idx >= MAX_ROUNDS and active_platforms:
  1038. print(f" ⚠️ [Max Rounds] Reached {MAX_ROUNDS} rounds. Remaining platforms: {active_platforms}")
  1039. else:
  1040. print("\n⏭️ [Skip] Phase 1 & 1.5 Skipped. Using existing cases...")
  1041. # Phase 1.3: Generate case.json from source.json
  1042. if should_run("generate-case"):
  1043. source_file = raw_cases_dir / "source.json"
  1044. case_file = output_dir / "case.json"
  1045. if source_file.exists():
  1046. try:
  1047. from examples.process_pipeline.script.generate_case import generate_case
  1048. print(f"\n--- Phase 1.3: Generate case.json ---")
  1049. case_stats = await generate_case(source_file, case_file)
  1050. print(
  1051. f"📦 [Case Generation] "
  1052. f"generated {case_stats.get('total', 0)} cases "
  1053. f"→ {case_file}"
  1054. )
  1055. except Exception as e:
  1056. err_msg = f"Case generation failed: {type(e).__name__}: {e}"
  1057. print(f"⚠️ [Warning] {err_msg}")
  1058. global_errors.append(err_msg)
  1059. # Phase 1.6: Extract workflow and capability together → case.json
  1060. if should_run("workflow-extract"):
  1061. case_file = output_dir / "case.json"
  1062. if case_file.exists():
  1063. try:
  1064. from examples.process_pipeline.script.extract_workflow import extract_workflow
  1065. print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---")
  1066. workflow_stats = await extract_workflow(
  1067. case_file,
  1068. workflow_llm_call,
  1069. model=claude_model,
  1070. max_concurrent=3
  1071. )
  1072. total_cost += workflow_stats.get("total_cost", 0.0)
  1073. costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
  1074. print(
  1075. f"🔍 [Workflow Extraction] "
  1076. f"success={workflow_stats['success']} "
  1077. f"failed={workflow_stats['failed']}"
  1078. )
  1079. except Exception as e:
  1080. err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
  1081. print(f"⚠️ [Warning] {err_msg}")
  1082. global_errors.append(err_msg)
  1083. if should_run("capability-extract"):
  1084. case_file = output_dir / "case.json"
  1085. if case_file.exists():
  1086. print("\n--- Phase 1.6b: Capability Extraction ---")
  1087. print("🧩 [Capability Extraction] integrated into workflow-extract; skipping old extractor.")
  1088. # Phase 1.7: Apply grounding (map apply_to_draft to apply_to)
  1089. if should_run("apply-grounding"):
  1090. case_file = output_dir / "case.json"
  1091. if case_file.exists():
  1092. try:
  1093. from examples.process_pipeline.script.apply_to_grounding import apply_grounding
  1094. print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---")
  1095. grounding_stats = await apply_grounding(
  1096. case_file,
  1097. claude_llm_call,
  1098. model=claude_model,
  1099. max_concurrent=3
  1100. )
  1101. total_cost += grounding_stats.get("total_cost", 0.0)
  1102. costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4)
  1103. print(
  1104. f"🗺️ [Apply Grounding] "
  1105. f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} "
  1106. f"→ {case_file}"
  1107. )
  1108. except Exception as e:
  1109. err_msg = f"Apply grounding failed: {type(e).__name__}: {e}"
  1110. print(f"⚠️ [Warning] {err_msg}")
  1111. global_errors.append(err_msg)
  1112. # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude
  1113. if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]):
  1114. print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---")
  1115. # 输出文件
  1116. process_file = str(output_dir / "process.json")
  1117. capabilities_file = str(output_dir / "capabilities.json")
  1118. # 中间文件
  1119. blueprint_temp_file = str(output_dir / "blueprint_temp.json")
  1120. capabilities_temp_file = str(output_dir / "capabilities_temp.json")
  1121. # 优先使用结构化数据:source.json + case_detailed.json
  1122. detailed_file = raw_cases_dir / "case_detailed.json"
  1123. source_file = raw_cases_dir / "source.json"
  1124. if detailed_file.exists():
  1125. input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/")
  1126. print(f" Using structured data: source.json + case_detailed.json")
  1127. else:
  1128. input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
  1129. print(f" Fallback to raw cases: case_*.json")
  1130. force_strategy_rerun = False
  1131. force_active = active_steps is not None
  1132. # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ──
  1133. async def run_cluster_processes():
  1134. """2.1.1: 工序聚类 → blueprint_temp.json"""
  1135. if Path(blueprint_temp_file).exists() and not force_active:
  1136. print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping")
  1137. return 0.0
  1138. print(f" [2.1.1] Clustering processes...")
  1139. try:
  1140. from examples.process_pipeline.script.cluster_processes import cluster_processes
  1141. result = await cluster_processes(
  1142. source_file=source_file,
  1143. detailed_file=detailed_file,
  1144. output_file=Path(blueprint_temp_file),
  1145. requirement=requirement,
  1146. llm_call=claude_llm_call,
  1147. model=claude_model,
  1148. )
  1149. print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, "
  1150. f"generated {result.get('blueprints', 0)} blueprints")
  1151. return result.get("total_cost", 0.0)
  1152. except Exception as e:
  1153. err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}"
  1154. print(f" [2.1.1] ⚠️ {err_msg}")
  1155. global_errors.append(err_msg)
  1156. return 0.0
  1157. async def run_extract_capabilities():
  1158. """2.2.1: 能力初步聚类 → capabilities_temp.json"""
  1159. if Path(capabilities_temp_file).exists() and not force_active:
  1160. print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping")
  1161. return 0.0
  1162. print(f" [2.2.1] Extracting capabilities...")
  1163. try:
  1164. from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
  1165. result = await extract_capabilities_workflow(
  1166. detailed_file=detailed_file,
  1167. source_file=source_file,
  1168. output_file=Path(capabilities_temp_file),
  1169. requirement=requirement,
  1170. llm_call=claude_llm_call,
  1171. model=claude_model,
  1172. )
  1173. print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities")
  1174. return result.get("total_cost", 0.0)
  1175. except Exception as e:
  1176. err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}"
  1177. print(f" [2.2.1] ⚠️ {err_msg}")
  1178. global_errors.append(err_msg)
  1179. return 0.0
  1180. if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists():
  1181. force_strategy_rerun = True
  1182. step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities())
  1183. for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]):
  1184. total_cost += cost
  1185. costs_breakdown[name] = round(cost, 4)
  1186. # ── Step 2: 并行执行 2.1.2 和 2.2.2 ──────────────────────────────
  1187. async def run_score_processes():
  1188. """2.1.2: 工序匹配度打分"""
  1189. if Path(process_file).exists() and not force_active:
  1190. print(f" [2.1.2] ⏭️ process.json exists, skipping")
  1191. return 0.0
  1192. if not Path(blueprint_temp_file).exists():
  1193. print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping")
  1194. return 0.0
  1195. print(f" [2.1.2] Scoring processes...")
  1196. try:
  1197. from examples.process_pipeline.script.score_processes import score_blueprints
  1198. score_result = await score_blueprints(
  1199. blueprint_file=Path(blueprint_temp_file),
  1200. output_file=Path(process_file),
  1201. requirement=requirement,
  1202. llm_call=claude_llm_call,
  1203. model=claude_model,
  1204. )
  1205. print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints")
  1206. return score_result.get("total_cost", 0.0)
  1207. except Exception as e:
  1208. err_msg = f"P2.1.2 ScoreProcesses failed: {e}"
  1209. print(f" [2.1.2] ⚠️ {err_msg}")
  1210. global_errors.append(err_msg)
  1211. return 0.0
  1212. async def run_enrich_capabilities():
  1213. """2.2.2: 能力丰富化"""
  1214. if Path(capabilities_file).exists() and not force_active:
  1215. print(f" [2.2.2] ⏭️ capabilities.json exists, skipping")
  1216. return 0.0
  1217. if not Path(capabilities_temp_file).exists():
  1218. print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping")
  1219. return 0.0
  1220. if not source_file.exists():
  1221. print(f" [2.2.2] ⚠️ source.json not found, skipping")
  1222. return 0.0
  1223. print(f" [2.2.2] Enriching capabilities...")
  1224. try:
  1225. from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
  1226. enrich_result = await enrich_all_capabilities(
  1227. capabilities_temp_file=Path(capabilities_temp_file),
  1228. source_file=source_file,
  1229. output_file=Path(capabilities_file),
  1230. llm_call=claude_llm_call,
  1231. model=claude_model,
  1232. )
  1233. print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities")
  1234. return enrich_result.get("total_cost", 0.0)
  1235. except Exception as e:
  1236. err_msg = f"P2.2.2 EnrichCapabilities failed: {e}"
  1237. print(f" [2.2.2] ⚠️ {err_msg}")
  1238. global_errors.append(err_msg)
  1239. return 0.0
  1240. # 并行执行 Step 2
  1241. step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities())
  1242. for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]):
  1243. total_cost += cost
  1244. costs_breakdown[name] = round(cost, 4)
  1245. # Phase 3: REDUCE 2 (Final Assembly) uses Claude
  1246. print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
  1247. strategy_file_path = output_dir / "strategy.json"
  1248. if args.restart_mode == "single":
  1249. force_strategy_rerun = False
  1250. if strategy_file_path.exists() and not force_strategy_rerun and not force_active:
  1251. print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
  1252. else:
  1253. if strategy_file_path.exists():
  1254. print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
  1255. print(" > Using [Workflow Core]")
  1256. from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
  1257. try:
  1258. phase3_result = await assemble_strategy(
  1259. process_file=Path(process_file),
  1260. capabilities_file=Path(capabilities_file),
  1261. output_file=strategy_file_path,
  1262. requirement=requirement,
  1263. llm_call=claude_llm_call,
  1264. model=claude_model,
  1265. )
  1266. phase3_cost = phase3_result.get("total_cost", 0.0)
  1267. print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps")
  1268. except Exception as e:
  1269. err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}"
  1270. print(f" ⚠️ {err_msg}")
  1271. global_errors.append(err_msg)
  1272. phase3_cost = 0.0
  1273. total_cost += phase3_cost
  1274. costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
  1275. else:
  1276. print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
  1277. end_time = time.time()
  1278. elapsed_sec = end_time - start_time
  1279. # Save Metrics
  1280. metrics_file = base_dir / "run_metrics.json"
  1281. metrics_data = []
  1282. if metrics_file.exists():
  1283. with open(metrics_file, "r", encoding="utf-8") as f:
  1284. try:
  1285. metrics_data = json.load(f)
  1286. except json.JSONDecodeError:
  1287. pass
  1288. # Collect trace_ids from all phases
  1289. trace_ids = {}
  1290. if 'phase1_trace_ids' in dir():
  1291. trace_ids.update(phase1_trace_ids)
  1292. metrics_data.append({
  1293. "index": args.index,
  1294. "requirement": requirement[:80] + "...",
  1295. "duration_seconds": round(elapsed_sec, 2),
  1296. "total_cost_usd": round(total_cost, 4),
  1297. "costs_breakdown": costs_breakdown,
  1298. "trace_ids": trace_ids,
  1299. "errors": global_errors,
  1300. "timestamp": datetime.now().isoformat()
  1301. })
  1302. with open(metrics_file, "w", encoding="utf-8") as f:
  1303. json.dump(metrics_data, f, indent=2, ensure_ascii=False)
  1304. print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
  1305. finally:
  1306. pass
  1307. print("✅ Pipeline run finished.")
  1308. if strategy_file:
  1309. print("✅ Strategy saved to:", strategy_file)
  1310. if __name__ == "__main__":
  1311. asyncio.run(main())