""" 批量修复有问题的 output 文件 根据 schema_errors_report.txt 中的错误,删除相应文件并重跑 pipeline """ import re import subprocess from pathlib import Path PIPELINE_DIR = Path(__file__).parent.parent OUTPUT_DIR = PIPELINE_DIR / "output" REPORT_PATH = PIPELINE_DIR / "script" / "schema_errors_report.txt" def parse_error_report(report_path): """解析错误报告,返回需要修复的需求列表""" errors = {} with open(report_path, "r", encoding="utf-8") as f: for line in f: # 匹配格式: output\XXX\file.json: error match = re.search(r'output[/\\](\d+)[/\\]([^:]+):', line) if match: req_id = match.group(1) file_name = match.group(2) if req_id not in errors: errors[req_id] = [] errors[req_id].append(file_name) return errors def determine_cleanup_strategy(req_id, error_files): """ 根据错误文件类型决定清理策略 返回: (策略类型, 需要删除的文件列表, 问题平台列表) 策略类型: - single_platform: 单个平台的 raw_cases 有问题,只重跑该平台 + 级联删除后续文件 - full: 多个平台或整体 raw_cases 有问题,完整重跑 - from_blueprint: blueprint 有问题,删除 blueprint + capabilities + strategy - from_capabilities: capabilities 有问题,删除 capabilities + strategy - from_strategy: 只有 strategy 有问题,只删除 strategy """ # 提取有问题的平台 problem_platforms = [] for f in error_files: if "raw_cases" in f: # 提取平台名,如 "raw_cases/case_bili.json" -> "bili" match = re.search(r'case_([a-z]+)\.json', f) if match: problem_platforms.append(match.group(1)) has_raw_cases_error = len(problem_platforms) > 0 has_blueprint_error = "blueprint.json" in error_files has_capabilities_error = "capabilities_extracted.json" in error_files has_strategy_error = "strategy.json" in error_files if has_raw_cases_error: if len(problem_platforms) == 1: # 单个平台有问题,只重跑该平台,但要级联删除后续所有文件 platform = problem_platforms[0] return "single_platform", [f"raw_cases/case_{platform}.json", "blueprint.json", "capabilities_extracted.json", "strategy.json"], [platform] else: # 多个平台有问题,或者 raw_cases 整体有问题,完整重跑 return "full", ["raw_cases", "blueprint.json", "capabilities_extracted.json", "strategy.json"], problem_platforms elif has_blueprint_error: # blueprint 有问题,删除 blueprint 及后续文件 return "from_blueprint", ["blueprint.json", "capabilities_extracted.json", "strategy.json"], [] elif has_capabilities_error: # capabilities 有问题,删除 capabilities 和 strategy return "from_capabilities", ["capabilities_extracted.json", "strategy.json"], [] elif has_strategy_error: # 只有 strategy 有问题,只删除 strategy return "from_strategy", ["strategy.json"], [] return "unknown", [], [] def delete_files(req_id, files_to_delete): """删除指定的文件""" output_dir = OUTPUT_DIR / req_id deleted = [] for file_name in files_to_delete: if file_name == "raw_cases": # 删除整个 raw_cases 目录 raw_cases_dir = output_dir / "raw_cases" if raw_cases_dir.exists(): import shutil shutil.rmtree(raw_cases_dir) deleted.append(f"{req_id}/raw_cases/") elif file_name.startswith("raw_cases/"): # 删除单个 case 文件 file_path = output_dir / file_name if file_path.exists(): file_path.unlink() deleted.append(f"{req_id}/{file_name}") else: file_path = output_dir / file_name if file_path.exists(): file_path.unlink() deleted.append(f"{req_id}/{file_name}") return deleted def generate_rerun_commands(errors_dict): """生成重跑命令""" commands = [] for req_id, error_files in sorted(errors_dict.items()): strategy, files_to_delete, problem_platforms = determine_cleanup_strategy(req_id, error_files) req_index = int(req_id) - 1 # run_pipeline.py 使用 0-based index if strategy == "single_platform": # 只重跑单个平台,blueprint/capabilities/strategy 已被级联删除 platforms_str = ",".join(problem_platforms) commands.append({ "req_id": req_id, "index": req_index, "strategy": strategy, "files_to_delete": files_to_delete, "command": f"python run_pipeline.py --index {req_index} --platforms {platforms_str}" }) elif strategy == "full": # 完整重跑(包括所有 research) commands.append({ "req_id": req_id, "index": req_index, "strategy": strategy, "files_to_delete": files_to_delete, "command": f"python run_pipeline.py --index {req_index}" }) elif strategy == "from_blueprint": # 跳过 research,从 blueprint 开始 commands.append({ "req_id": req_id, "index": req_index, "strategy": strategy, "files_to_delete": files_to_delete, "command": f"python run_pipeline.py --index {req_index} --skip-research" }) elif strategy == "from_capabilities": # 跳过 research;blueprint 仍在会被跳过,capabilities + strategy 会重跑 commands.append({ "req_id": req_id, "index": req_index, "strategy": strategy, "files_to_delete": files_to_delete, "command": f"python run_pipeline.py --index {req_index} --skip-research" }) elif strategy == "from_strategy": # 跳过 research;blueprint/capabilities 仍在会被跳过,只重跑 strategy commands.append({ "req_id": req_id, "index": req_index, "strategy": strategy, "files_to_delete": files_to_delete, "command": f"python run_pipeline.py --index {req_index} --skip-research" }) return commands def main(): import argparse parser = argparse.ArgumentParser(description="Fix broken output files based on schema validation") parser.add_argument("--dry-run", action="store_true", help="Show what would be done without actually doing it") parser.add_argument("--delete-only", action="store_true", help="Only delete broken files, don't rerun") parser.add_argument("--rerun-only", action="store_true", help="Only rerun (assume files already deleted)") parser.add_argument("--workers", type=int, default=4, help="Number of parallel CMD windows to spawn") parser.add_argument("--no-spawn", action="store_true", help="Only generate worker .bat files, don't auto-spawn windows") args = parser.parse_args() report_path = REPORT_PATH if not report_path.exists(): print(f"[ERROR] {report_path} not found. Run validate_schema.py first.") return print("[*] Parsing error report...") errors_dict = parse_error_report(report_path) if not errors_dict: print("[OK] No errors found in report!") return print(f"Found {len(errors_dict)} requirements with errors:") for req_id, files in sorted(errors_dict.items()): print(f" - REQ_{req_id}: {', '.join(files)}") print("\n" + "="*60) print("[*] Generating cleanup and rerun plan...") print("="*60) commands = generate_rerun_commands(errors_dict) for cmd_info in commands: print(f"\n[FIX] REQ_{cmd_info['req_id']} ({cmd_info['strategy']}):") print(f" Delete: {', '.join(cmd_info['files_to_delete'])}") print(f" Rerun: {cmd_info['command']}") if args.dry_run: print("\n[DRY-RUN] No files were deleted or commands executed.") return # 执行删除 if not args.rerun_only: print("\n" + "="*60) print("[*] Deleting broken files...") print("="*60) for cmd_info in commands: deleted = delete_files(cmd_info['req_id'], cmd_info['files_to_delete']) for f in deleted: print(f" ✓ Deleted: {f}") if args.delete_only: print("\n[DELETE-ONLY] Files deleted. Skipping rerun.") return # 生成多窗口并行批处理脚本 print("\n" + "="*60) print("[*] Generating multi-window parallel batch scripts...") print("="*60) total_tasks = len(commands) num_workers = min(args.workers, total_tasks) # 使用 round-robin 分配任务到各个 worker worker_tasks = {i: [] for i in range(num_workers)} for i, cmd_info in enumerate(commands): worker_id = i % num_workers worker_tasks[worker_id].append(cmd_info) print(f"Distributing {total_tasks} tasks across {num_workers} workers...") # bat 文件生成在 pipeline 根目录(run_pipeline.py 所在目录) bat_dir = PIPELINE_DIR # 为每个 worker 生成独立的批处理文件 for worker_id, tasks in worker_tasks.items(): if not tasks: continue bat_path = bat_dir / f"fix_worker_{worker_id}.bat" with open(bat_path, "w", encoding="utf-8") as f: f.write("@echo off\n") f.write("cd /d %~dp0\n") f.write("if exist ..\\..\\venv\\Scripts\\activate.bat call ..\\..\\venv\\Scripts\\activate.bat\n") f.write("if exist ..\\..\\.venv\\Scripts\\activate.bat call ..\\..\\.venv\\Scripts\\activate.bat\n") f.write(f"echo Worker {worker_id} starting with {len(tasks)} tasks...\n") f.write("echo ========================================\n\n") for cmd_info in tasks: f.write(f"echo.\n") f.write(f"echo ========================================\n") f.write(f"echo [Worker {worker_id}] Fixing REQ_{cmd_info['req_id']} ({cmd_info['strategy']})\n") f.write(f"echo ========================================\n") f.write(f"{cmd_info['command']}\n") f.write(f"timeout /t 2 > NUL\n\n") f.write(f"echo.\n") f.write(f"echo ========================================\n") f.write(f"echo Worker {worker_id} completed all tasks!\n") f.write(f"echo ========================================\n") f.write("pause\n") print(f" Worker {worker_id}: {len(tasks)} tasks -> {bat_path}") # 生成主启动脚本 launcher_path = bat_dir / "fix_broken_outputs_parallel.bat" with open(launcher_path, "w", encoding="utf-8") as f: f.write("@echo off\n") f.write("cd /d %~dp0\n") f.write(f"echo Spawning {num_workers} parallel workers to fix {total_tasks} broken outputs...\n") f.write("echo ========================================\n\n") for worker_id in range(num_workers): if worker_id in worker_tasks and worker_tasks[worker_id]: f.write(f'start "Fix_Worker_{worker_id}" cmd /c fix_worker_{worker_id}.bat\n') f.write(f"timeout /t 1 > NUL\n") f.write("\necho.\n") f.write(f"echo All {num_workers} workers have been launched in separate windows.\n") f.write("echo ========================================\n") f.write("pause\n") print(f"\n[OK] Parallel batch scripts generated!") print(f"\nTo execute all fixes in parallel, run:") print(f" {launcher_path}") print(f"\nOr manually launch individual workers:") for worker_id in range(num_workers): if worker_id in worker_tasks and worker_tasks[worker_id]: print(f" fix_worker_{worker_id}.bat") # 如果不是 no-spawn 模式,自动启动窗口 if not args.no_spawn: import subprocess print(f"\n[*] Auto-spawning {num_workers} worker windows...") for worker_id in range(num_workers): if worker_id in worker_tasks and worker_tasks[worker_id]: bat_file = bat_dir / f"fix_worker_{worker_id}.bat" subprocess.Popen( ["cmd.exe", "/c", "start", f"Fix_Worker_{worker_id}", "cmd.exe", "/c", str(bat_file.absolute())], cwd=str(base_dir) ) print(f" Launched Worker {worker_id}") print(f"\n[OK] All workers launched! Check the new CMD windows.") if __name__ == "__main__": main()