fix_broken_outputs.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. """
  2. 批量修复有问题的 output 文件
  3. 根据 schema_errors_report.txt 中的错误,删除相应文件并重跑 pipeline
  4. """
  5. import re
  6. import subprocess
  7. from pathlib import Path
  8. PIPELINE_DIR = Path(__file__).parent.parent
  9. OUTPUT_DIR = PIPELINE_DIR / "output"
  10. REPORT_PATH = PIPELINE_DIR / "script" / "schema_errors_report.txt"
  11. def parse_error_report(report_path):
  12. """解析错误报告,返回需要修复的需求列表"""
  13. errors = {}
  14. with open(report_path, "r", encoding="utf-8") as f:
  15. for line in f:
  16. # 匹配格式: output\XXX\file.json: Schema mismatch: error_detail
  17. match = re.search(r'output[/\\](\d+)[/\\]([^:]+):\s*Schema mismatch:\s*(.+)', line)
  18. if match:
  19. req_id = match.group(1)
  20. file_name = match.group(2)
  21. error_detail = match.group(3).strip()
  22. if req_id not in errors:
  23. errors[req_id] = []
  24. errors[req_id].append({
  25. "file": file_name,
  26. "error": error_detail
  27. })
  28. return errors
  29. def determine_cleanup_strategy(req_id, error_items):
  30. """
  31. 根据错误文件类型决定清理策略
  32. 返回: (策略类型, 需要删除的文件列表, 问题平台列表)
  33. 策略类型:
  34. - single_platform: 单个平台的 raw_cases 有问题,只重跑该平台 + 级联删除后续文件
  35. - full: 多个平台或整体 raw_cases 有问题,完整重跑
  36. - from_blueprint: blueprint 有问题,删除 blueprint + capabilities + strategy
  37. - from_capabilities: capabilities 有问题,删除 capabilities + strategy
  38. - from_strategy: 只有 strategy 有问题,只删除 strategy
  39. """
  40. # 提取有问题的平台和错误类型
  41. problem_platforms = []
  42. has_empty_array = False
  43. for item in error_items:
  44. file_name = item["file"]
  45. error_detail = item["error"]
  46. # 检查是否是空数组问题
  47. if "array is empty" in error_detail:
  48. has_empty_array = True
  49. # 跳过 strategies[1].reasoning is empty(非选中策略的 reasoning 为空是正常的)
  50. if "strategies[1].reasoning is empty" in error_detail:
  51. continue
  52. if "raw_cases" in file_name:
  53. # 提取平台名,如 "raw_cases/case_bili.json" -> "bili"
  54. match = re.search(r'case_([a-z]+)\.json', file_name)
  55. if match:
  56. problem_platforms.append(match.group(1))
  57. has_raw_cases_error = len(problem_platforms) > 0
  58. has_blueprint_error = any(item["file"] == "blueprint.json" for item in error_items)
  59. has_capabilities_error = any(item["file"] == "capabilities_extracted.json" for item in error_items)
  60. has_strategy_error = any(item["file"] == "strategy.json" for item in error_items)
  61. if has_raw_cases_error:
  62. if len(problem_platforms) == 1:
  63. # 单个平台有问题,只重跑该平台,但要级联删除后续所有文件
  64. platform = problem_platforms[0]
  65. return "single_platform", [f"raw_cases/case_{platform}.json", "blueprint.json", "capabilities_extracted.json", "strategy.json"], [platform]
  66. else:
  67. # 多个平台有问题,或者 raw_cases 整体有问题,完整重跑
  68. return "full", ["raw_cases", "blueprint.json", "capabilities_extracted.json", "strategy.json"], problem_platforms
  69. elif has_blueprint_error:
  70. # blueprint 有问题,删除 blueprint 及后续文件
  71. return "from_blueprint", ["blueprint.json", "capabilities_extracted.json", "strategy.json"], []
  72. elif has_capabilities_error:
  73. # capabilities 有问题,删除 capabilities 和 strategy
  74. return "from_capabilities", ["capabilities_extracted.json", "strategy.json"], []
  75. elif has_strategy_error:
  76. # 只有 strategy 有问题,只删除 strategy
  77. return "from_strategy", ["strategy.json"], []
  78. return "unknown", [], []
  79. def delete_files(req_id, files_to_delete):
  80. """删除指定的文件"""
  81. output_dir = OUTPUT_DIR / req_id
  82. deleted = []
  83. for file_name in files_to_delete:
  84. if file_name == "raw_cases":
  85. # 删除整个 raw_cases 目录
  86. raw_cases_dir = output_dir / "raw_cases"
  87. if raw_cases_dir.exists():
  88. import shutil
  89. shutil.rmtree(raw_cases_dir)
  90. deleted.append(f"{req_id}/raw_cases/")
  91. elif file_name.startswith("raw_cases/"):
  92. # 删除单个 case 文件
  93. file_path = output_dir / file_name
  94. if file_path.exists():
  95. file_path.unlink()
  96. deleted.append(f"{req_id}/{file_name}")
  97. else:
  98. file_path = output_dir / file_name
  99. if file_path.exists():
  100. file_path.unlink()
  101. deleted.append(f"{req_id}/{file_name}")
  102. return deleted
  103. def generate_rerun_commands(errors_dict):
  104. """生成重跑命令"""
  105. commands = []
  106. for req_id, error_files in sorted(errors_dict.items()):
  107. strategy, files_to_delete, problem_platforms = determine_cleanup_strategy(req_id, error_files)
  108. req_index = int(req_id) - 1 # run_pipeline.py 使用 0-based index
  109. if strategy == "single_platform":
  110. # 只重跑单个平台,blueprint/capabilities/strategy 已被级联删除
  111. platforms_str = ",".join(problem_platforms)
  112. commands.append({
  113. "req_id": req_id,
  114. "index": req_index,
  115. "strategy": strategy,
  116. "files_to_delete": files_to_delete,
  117. "command": f"python run_pipeline.py --index {req_index} --platforms {platforms_str}"
  118. })
  119. elif strategy == "full":
  120. # 完整重跑(包括所有 research)
  121. commands.append({
  122. "req_id": req_id,
  123. "index": req_index,
  124. "strategy": strategy,
  125. "files_to_delete": files_to_delete,
  126. "command": f"python run_pipeline.py --index {req_index}"
  127. })
  128. elif strategy == "from_blueprint":
  129. # 跳过 research,从 blueprint 开始
  130. commands.append({
  131. "req_id": req_id,
  132. "index": req_index,
  133. "strategy": strategy,
  134. "files_to_delete": files_to_delete,
  135. "command": f"python run_pipeline.py --index {req_index} --skip-research"
  136. })
  137. elif strategy == "from_capabilities":
  138. # 跳过 research;blueprint 仍在会被跳过,capabilities + strategy 会重跑
  139. commands.append({
  140. "req_id": req_id,
  141. "index": req_index,
  142. "strategy": strategy,
  143. "files_to_delete": files_to_delete,
  144. "command": f"python run_pipeline.py --index {req_index} --skip-research"
  145. })
  146. elif strategy == "from_strategy":
  147. # 跳过 research;blueprint/capabilities 仍在会被跳过,只重跑 strategy
  148. commands.append({
  149. "req_id": req_id,
  150. "index": req_index,
  151. "strategy": strategy,
  152. "files_to_delete": files_to_delete,
  153. "command": f"python run_pipeline.py --index {req_index} --skip-research"
  154. })
  155. return commands
  156. def main():
  157. import argparse
  158. parser = argparse.ArgumentParser(description="Fix broken output files based on schema validation")
  159. parser.add_argument("--dry-run", action="store_true", help="Show what would be done without actually doing it")
  160. parser.add_argument("--delete-only", action="store_true", help="Only delete broken files, don't rerun")
  161. parser.add_argument("--rerun-only", action="store_true", help="Only rerun (assume files already deleted)")
  162. parser.add_argument("--workers", type=int, default=4, help="Number of parallel CMD windows to spawn")
  163. parser.add_argument("--no-spawn", action="store_true", help="Only generate worker .bat files, don't auto-spawn windows")
  164. args = parser.parse_args()
  165. report_path = REPORT_PATH
  166. if not report_path.exists():
  167. print(f"[ERROR] {report_path} not found. Run validate_schema.py first.")
  168. return
  169. print("[*] Parsing error report...")
  170. errors_dict = parse_error_report(report_path)
  171. if not errors_dict:
  172. print("[OK] No errors found in report!")
  173. return
  174. print(f"Found {len(errors_dict)} requirements with errors:")
  175. for req_id, items in sorted(errors_dict.items()):
  176. files_str = ", ".join(item["file"] for item in items)
  177. print(f" - REQ_{req_id}: {files_str}")
  178. print("\n" + "="*60)
  179. print("[*] Generating cleanup and rerun plan...")
  180. print("="*60)
  181. commands = generate_rerun_commands(errors_dict)
  182. for cmd_info in commands:
  183. print(f"\n[FIX] REQ_{cmd_info['req_id']} ({cmd_info['strategy']}):")
  184. print(f" Delete: {', '.join(cmd_info['files_to_delete'])}")
  185. print(f" Rerun: {cmd_info['command']}")
  186. if args.dry_run:
  187. print("\n[DRY-RUN] No files were deleted or commands executed.")
  188. return
  189. # 执行删除
  190. if not args.rerun_only:
  191. print("\n" + "="*60)
  192. print("[*] Deleting broken files...")
  193. print("="*60)
  194. for cmd_info in commands:
  195. deleted = delete_files(cmd_info['req_id'], cmd_info['files_to_delete'])
  196. for f in deleted:
  197. print(f" ✓ Deleted: {f}")
  198. if args.delete_only:
  199. print("\n[DELETE-ONLY] Files deleted. Skipping rerun.")
  200. return
  201. # 生成多窗口并行批处理脚本
  202. print("\n" + "="*60)
  203. print("[*] Generating multi-window parallel batch scripts...")
  204. print("="*60)
  205. total_tasks = len(commands)
  206. num_workers = min(args.workers, total_tasks)
  207. # 使用 round-robin 分配任务到各个 worker
  208. worker_tasks = {i: [] for i in range(num_workers)}
  209. for i, cmd_info in enumerate(commands):
  210. worker_id = i % num_workers
  211. worker_tasks[worker_id].append(cmd_info)
  212. print(f"Distributing {total_tasks} tasks across {num_workers} workers...")
  213. # bat 文件生成在 pipeline 根目录(run_pipeline.py 所在目录)
  214. bat_dir = PIPELINE_DIR
  215. # 为每个 worker 生成独立的批处理文件
  216. for worker_id, tasks in worker_tasks.items():
  217. if not tasks:
  218. continue
  219. bat_path = bat_dir / f"fix_worker_{worker_id}.bat"
  220. with open(bat_path, "w", encoding="utf-8") as f:
  221. f.write("@echo off\n")
  222. f.write("cd /d %~dp0\n")
  223. f.write("if exist ..\\..\\venv\\Scripts\\activate.bat call ..\\..\\venv\\Scripts\\activate.bat\n")
  224. f.write("if exist ..\\..\\.venv\\Scripts\\activate.bat call ..\\..\\.venv\\Scripts\\activate.bat\n")
  225. f.write(f"echo Worker {worker_id} starting with {len(tasks)} tasks...\n")
  226. f.write("echo ========================================\n\n")
  227. for cmd_info in tasks:
  228. f.write(f"echo.\n")
  229. f.write(f"echo ========================================\n")
  230. f.write(f"echo [Worker {worker_id}] Fixing REQ_{cmd_info['req_id']} ({cmd_info['strategy']})\n")
  231. f.write(f"echo ========================================\n")
  232. f.write(f"{cmd_info['command']}\n")
  233. f.write(f"timeout /t 2 > NUL\n\n")
  234. f.write(f"echo.\n")
  235. f.write(f"echo ========================================\n")
  236. f.write(f"echo Worker {worker_id} completed all tasks!\n")
  237. f.write(f"echo ========================================\n")
  238. f.write("pause\n")
  239. print(f" Worker {worker_id}: {len(tasks)} tasks -> {bat_path}")
  240. # 生成主启动脚本
  241. launcher_path = bat_dir / "fix_broken_outputs_parallel.bat"
  242. with open(launcher_path, "w", encoding="utf-8") as f:
  243. f.write("@echo off\n")
  244. f.write("cd /d %~dp0\n")
  245. f.write(f"echo Spawning {num_workers} parallel workers to fix {total_tasks} broken outputs...\n")
  246. f.write("echo ========================================\n\n")
  247. for worker_id in range(num_workers):
  248. if worker_id in worker_tasks and worker_tasks[worker_id]:
  249. f.write(f'start "Fix_Worker_{worker_id}" cmd /c fix_worker_{worker_id}.bat\n')
  250. f.write(f"timeout /t 1 > NUL\n")
  251. f.write("\necho.\n")
  252. f.write(f"echo All {num_workers} workers have been launched in separate windows.\n")
  253. f.write("echo ========================================\n")
  254. f.write("pause\n")
  255. print(f"\n[OK] Parallel batch scripts generated!")
  256. print(f"\nTo execute all fixes in parallel, run:")
  257. print(f" {launcher_path}")
  258. print(f"\nOr manually launch individual workers:")
  259. for worker_id in range(num_workers):
  260. if worker_id in worker_tasks and worker_tasks[worker_id]:
  261. print(f" fix_worker_{worker_id}.bat")
  262. # 如果不是 no-spawn 模式,自动启动窗口
  263. if not args.no_spawn:
  264. import subprocess
  265. print(f"\n[*] Auto-spawning {num_workers} worker windows...")
  266. for worker_id in range(num_workers):
  267. if worker_id in worker_tasks and worker_tasks[worker_id]:
  268. bat_file = bat_dir / f"fix_worker_{worker_id}.bat"
  269. subprocess.Popen(
  270. ["cmd.exe", "/c", "start", f"Fix_Worker_{worker_id}", "cmd.exe", "/c", str(bat_file.absolute())],
  271. cwd=str(bat_dir)
  272. )
  273. print(f" Launched Worker {worker_id}")
  274. print(f"\n[OK] All workers launched! Check the new CMD windows.")
  275. if __name__ == "__main__":
  276. main()