fix_broken_outputs.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. """
  2. 批量修复有问题的 output 文件
  3. 根据 schema_errors_report.txt 中的错误,删除相应文件并重跑 pipeline
  4. """
  5. import re
  6. import subprocess
  7. from pathlib import Path
  8. PIPELINE_DIR = Path(__file__).parent.parent
  9. OUTPUT_DIR = PIPELINE_DIR / "output"
  10. REPORT_PATH = PIPELINE_DIR / "script" / "schema_errors_report.txt"
  11. def parse_error_report(report_path):
  12. """解析错误报告,返回需要修复的需求列表"""
  13. errors = {}
  14. with open(report_path, "r", encoding="utf-8") as f:
  15. for line in f:
  16. # 匹配格式: output\XXX\file.json: error
  17. match = re.search(r'output[/\\](\d+)[/\\]([^:]+):', line)
  18. if match:
  19. req_id = match.group(1)
  20. file_name = match.group(2)
  21. if req_id not in errors:
  22. errors[req_id] = []
  23. errors[req_id].append(file_name)
  24. return errors
  25. def determine_cleanup_strategy(req_id, error_files):
  26. """
  27. 根据错误文件类型决定清理策略
  28. 返回: (策略类型, 需要删除的文件列表, 问题平台列表)
  29. 策略类型:
  30. - single_platform: 单个平台的 raw_cases 有问题,只重跑该平台 + 级联删除后续文件
  31. - full: 多个平台或整体 raw_cases 有问题,完整重跑
  32. - from_blueprint: blueprint 有问题,删除 blueprint + capabilities + strategy
  33. - from_capabilities: capabilities 有问题,删除 capabilities + strategy
  34. - from_strategy: 只有 strategy 有问题,只删除 strategy
  35. """
  36. # 提取有问题的平台
  37. problem_platforms = []
  38. for f in error_files:
  39. if "raw_cases" in f:
  40. # 提取平台名,如 "raw_cases/case_bili.json" -> "bili"
  41. match = re.search(r'case_([a-z]+)\.json', f)
  42. if match:
  43. problem_platforms.append(match.group(1))
  44. has_raw_cases_error = len(problem_platforms) > 0
  45. has_blueprint_error = "blueprint.json" in error_files
  46. has_capabilities_error = "capabilities_extracted.json" in error_files
  47. has_strategy_error = "strategy.json" in error_files
  48. if has_raw_cases_error:
  49. if len(problem_platforms) == 1:
  50. # 单个平台有问题,只重跑该平台,但要级联删除后续所有文件
  51. platform = problem_platforms[0]
  52. return "single_platform", [f"raw_cases/case_{platform}.json", "blueprint.json", "capabilities_extracted.json", "strategy.json"], [platform]
  53. else:
  54. # 多个平台有问题,或者 raw_cases 整体有问题,完整重跑
  55. return "full", ["raw_cases", "blueprint.json", "capabilities_extracted.json", "strategy.json"], problem_platforms
  56. elif has_blueprint_error:
  57. # blueprint 有问题,删除 blueprint 及后续文件
  58. return "from_blueprint", ["blueprint.json", "capabilities_extracted.json", "strategy.json"], []
  59. elif has_capabilities_error:
  60. # capabilities 有问题,删除 capabilities 和 strategy
  61. return "from_capabilities", ["capabilities_extracted.json", "strategy.json"], []
  62. elif has_strategy_error:
  63. # 只有 strategy 有问题,只删除 strategy
  64. return "from_strategy", ["strategy.json"], []
  65. return "unknown", [], []
  66. def delete_files(req_id, files_to_delete):
  67. """删除指定的文件"""
  68. output_dir = OUTPUT_DIR / req_id
  69. deleted = []
  70. for file_name in files_to_delete:
  71. if file_name == "raw_cases":
  72. # 删除整个 raw_cases 目录
  73. raw_cases_dir = output_dir / "raw_cases"
  74. if raw_cases_dir.exists():
  75. import shutil
  76. shutil.rmtree(raw_cases_dir)
  77. deleted.append(f"{req_id}/raw_cases/")
  78. elif file_name.startswith("raw_cases/"):
  79. # 删除单个 case 文件
  80. file_path = output_dir / file_name
  81. if file_path.exists():
  82. file_path.unlink()
  83. deleted.append(f"{req_id}/{file_name}")
  84. else:
  85. file_path = output_dir / file_name
  86. if file_path.exists():
  87. file_path.unlink()
  88. deleted.append(f"{req_id}/{file_name}")
  89. return deleted
  90. def generate_rerun_commands(errors_dict):
  91. """生成重跑命令"""
  92. commands = []
  93. for req_id, error_files in sorted(errors_dict.items()):
  94. strategy, files_to_delete, problem_platforms = determine_cleanup_strategy(req_id, error_files)
  95. req_index = int(req_id) - 1 # run_pipeline.py 使用 0-based index
  96. if strategy == "single_platform":
  97. # 只重跑单个平台,blueprint/capabilities/strategy 已被级联删除
  98. platforms_str = ",".join(problem_platforms)
  99. commands.append({
  100. "req_id": req_id,
  101. "index": req_index,
  102. "strategy": strategy,
  103. "files_to_delete": files_to_delete,
  104. "command": f"python run_pipeline.py --index {req_index} --platforms {platforms_str}"
  105. })
  106. elif strategy == "full":
  107. # 完整重跑(包括所有 research)
  108. commands.append({
  109. "req_id": req_id,
  110. "index": req_index,
  111. "strategy": strategy,
  112. "files_to_delete": files_to_delete,
  113. "command": f"python run_pipeline.py --index {req_index}"
  114. })
  115. elif strategy == "from_blueprint":
  116. # 跳过 research,从 blueprint 开始
  117. commands.append({
  118. "req_id": req_id,
  119. "index": req_index,
  120. "strategy": strategy,
  121. "files_to_delete": files_to_delete,
  122. "command": f"python run_pipeline.py --index {req_index} --skip-research"
  123. })
  124. elif strategy == "from_capabilities":
  125. # 跳过 research;blueprint 仍在会被跳过,capabilities + strategy 会重跑
  126. commands.append({
  127. "req_id": req_id,
  128. "index": req_index,
  129. "strategy": strategy,
  130. "files_to_delete": files_to_delete,
  131. "command": f"python run_pipeline.py --index {req_index} --skip-research"
  132. })
  133. elif strategy == "from_strategy":
  134. # 跳过 research;blueprint/capabilities 仍在会被跳过,只重跑 strategy
  135. commands.append({
  136. "req_id": req_id,
  137. "index": req_index,
  138. "strategy": strategy,
  139. "files_to_delete": files_to_delete,
  140. "command": f"python run_pipeline.py --index {req_index} --skip-research"
  141. })
  142. return commands
  143. def main():
  144. import argparse
  145. parser = argparse.ArgumentParser(description="Fix broken output files based on schema validation")
  146. parser.add_argument("--dry-run", action="store_true", help="Show what would be done without actually doing it")
  147. parser.add_argument("--delete-only", action="store_true", help="Only delete broken files, don't rerun")
  148. parser.add_argument("--rerun-only", action="store_true", help="Only rerun (assume files already deleted)")
  149. parser.add_argument("--workers", type=int, default=4, help="Number of parallel CMD windows to spawn")
  150. parser.add_argument("--no-spawn", action="store_true", help="Only generate worker .bat files, don't auto-spawn windows")
  151. args = parser.parse_args()
  152. report_path = REPORT_PATH
  153. if not report_path.exists():
  154. print(f"[ERROR] {report_path} not found. Run validate_schema.py first.")
  155. return
  156. print("[*] Parsing error report...")
  157. errors_dict = parse_error_report(report_path)
  158. if not errors_dict:
  159. print("[OK] No errors found in report!")
  160. return
  161. print(f"Found {len(errors_dict)} requirements with errors:")
  162. for req_id, files in sorted(errors_dict.items()):
  163. print(f" - REQ_{req_id}: {', '.join(files)}")
  164. print("\n" + "="*60)
  165. print("[*] Generating cleanup and rerun plan...")
  166. print("="*60)
  167. commands = generate_rerun_commands(errors_dict)
  168. for cmd_info in commands:
  169. print(f"\n[FIX] REQ_{cmd_info['req_id']} ({cmd_info['strategy']}):")
  170. print(f" Delete: {', '.join(cmd_info['files_to_delete'])}")
  171. print(f" Rerun: {cmd_info['command']}")
  172. if args.dry_run:
  173. print("\n[DRY-RUN] No files were deleted or commands executed.")
  174. return
  175. # 执行删除
  176. if not args.rerun_only:
  177. print("\n" + "="*60)
  178. print("[*] Deleting broken files...")
  179. print("="*60)
  180. for cmd_info in commands:
  181. deleted = delete_files(cmd_info['req_id'], cmd_info['files_to_delete'])
  182. for f in deleted:
  183. print(f" ✓ Deleted: {f}")
  184. if args.delete_only:
  185. print("\n[DELETE-ONLY] Files deleted. Skipping rerun.")
  186. return
  187. # 生成多窗口并行批处理脚本
  188. print("\n" + "="*60)
  189. print("[*] Generating multi-window parallel batch scripts...")
  190. print("="*60)
  191. total_tasks = len(commands)
  192. num_workers = min(args.workers, total_tasks)
  193. # 使用 round-robin 分配任务到各个 worker
  194. worker_tasks = {i: [] for i in range(num_workers)}
  195. for i, cmd_info in enumerate(commands):
  196. worker_id = i % num_workers
  197. worker_tasks[worker_id].append(cmd_info)
  198. print(f"Distributing {total_tasks} tasks across {num_workers} workers...")
  199. # bat 文件生成在 pipeline 根目录(run_pipeline.py 所在目录)
  200. bat_dir = PIPELINE_DIR
  201. # 为每个 worker 生成独立的批处理文件
  202. for worker_id, tasks in worker_tasks.items():
  203. if not tasks:
  204. continue
  205. bat_path = bat_dir / f"fix_worker_{worker_id}.bat"
  206. with open(bat_path, "w", encoding="utf-8") as f:
  207. f.write("@echo off\n")
  208. f.write("cd /d %~dp0\n")
  209. f.write("if exist ..\\..\\venv\\Scripts\\activate.bat call ..\\..\\venv\\Scripts\\activate.bat\n")
  210. f.write("if exist ..\\..\\.venv\\Scripts\\activate.bat call ..\\..\\.venv\\Scripts\\activate.bat\n")
  211. f.write(f"echo Worker {worker_id} starting with {len(tasks)} tasks...\n")
  212. f.write("echo ========================================\n\n")
  213. for cmd_info in tasks:
  214. f.write(f"echo.\n")
  215. f.write(f"echo ========================================\n")
  216. f.write(f"echo [Worker {worker_id}] Fixing REQ_{cmd_info['req_id']} ({cmd_info['strategy']})\n")
  217. f.write(f"echo ========================================\n")
  218. f.write(f"{cmd_info['command']}\n")
  219. f.write(f"timeout /t 2 > NUL\n\n")
  220. f.write(f"echo.\n")
  221. f.write(f"echo ========================================\n")
  222. f.write(f"echo Worker {worker_id} completed all tasks!\n")
  223. f.write(f"echo ========================================\n")
  224. f.write("pause\n")
  225. print(f" Worker {worker_id}: {len(tasks)} tasks -> {bat_path}")
  226. # 生成主启动脚本
  227. launcher_path = bat_dir / "fix_broken_outputs_parallel.bat"
  228. with open(launcher_path, "w", encoding="utf-8") as f:
  229. f.write("@echo off\n")
  230. f.write("cd /d %~dp0\n")
  231. f.write(f"echo Spawning {num_workers} parallel workers to fix {total_tasks} broken outputs...\n")
  232. f.write("echo ========================================\n\n")
  233. for worker_id in range(num_workers):
  234. if worker_id in worker_tasks and worker_tasks[worker_id]:
  235. f.write(f'start "Fix_Worker_{worker_id}" cmd /c fix_worker_{worker_id}.bat\n')
  236. f.write(f"timeout /t 1 > NUL\n")
  237. f.write("\necho.\n")
  238. f.write(f"echo All {num_workers} workers have been launched in separate windows.\n")
  239. f.write("echo ========================================\n")
  240. f.write("pause\n")
  241. print(f"\n[OK] Parallel batch scripts generated!")
  242. print(f"\nTo execute all fixes in parallel, run:")
  243. print(f" {launcher_path}")
  244. print(f"\nOr manually launch individual workers:")
  245. for worker_id in range(num_workers):
  246. if worker_id in worker_tasks and worker_tasks[worker_id]:
  247. print(f" fix_worker_{worker_id}.bat")
  248. # 如果不是 no-spawn 模式,自动启动窗口
  249. if not args.no_spawn:
  250. import subprocess
  251. print(f"\n[*] Auto-spawning {num_workers} worker windows...")
  252. for worker_id in range(num_workers):
  253. if worker_id in worker_tasks and worker_tasks[worker_id]:
  254. bat_file = bat_dir / f"fix_worker_{worker_id}.bat"
  255. subprocess.Popen(
  256. ["cmd.exe", "/c", "start", f"Fix_Worker_{worker_id}", "cmd.exe", "/c", str(bat_file.absolute())],
  257. cwd=str(base_dir)
  258. )
  259. print(f" Launched Worker {worker_id}")
  260. print(f"\n[OK] All workers launched! Check the new CMD windows.")
  261. if __name__ == "__main__":
  262. main()