| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- """
- 批量修复有问题的 output 文件
- 根据 schema_errors_report.txt 中的错误,删除相应文件并重跑 pipeline
- """
- import re
- import subprocess
- from pathlib import Path
- PIPELINE_DIR = Path(__file__).parent.parent
- OUTPUT_DIR = PIPELINE_DIR / "output"
- REPORT_PATH = PIPELINE_DIR / "script" / "schema_errors_report.txt"
- def parse_error_report(report_path):
- """解析错误报告,返回需要修复的需求列表"""
- errors = {}
- with open(report_path, "r", encoding="utf-8") as f:
- for line in f:
- # 匹配格式: output\XXX\file.json: Schema mismatch: error_detail
- match = re.search(r'output[/\\](\d+)[/\\]([^:]+):\s*Schema mismatch:\s*(.+)', line)
- if match:
- req_id = match.group(1)
- file_name = match.group(2)
- error_detail = match.group(3).strip()
- if req_id not in errors:
- errors[req_id] = []
- errors[req_id].append({
- "file": file_name,
- "error": error_detail
- })
- return errors
- def determine_cleanup_strategy(req_id, error_items):
- """
- 根据错误文件类型决定清理策略
- 返回: (策略类型, 需要删除的文件列表, 问题平台列表)
- 策略类型:
- - single_platform: 单个平台的 raw_cases 有问题,只重跑该平台 + 级联删除后续文件
- - full: 多个平台或整体 raw_cases 有问题,完整重跑
- - from_blueprint: blueprint 有问题,删除 blueprint + capabilities + strategy
- - from_capabilities: capabilities 有问题,删除 capabilities + strategy
- - from_strategy: 只有 strategy 有问题,只删除 strategy
- """
- # 提取有问题的平台和错误类型
- problem_platforms = []
- has_empty_array = False
- for item in error_items:
- file_name = item["file"]
- error_detail = item["error"]
- # 检查是否是空数组问题
- if "array is empty" in error_detail:
- has_empty_array = True
- # 跳过 strategies[1].reasoning is empty(非选中策略的 reasoning 为空是正常的)
- if "strategies[1].reasoning is empty" in error_detail:
- continue
- if "raw_cases" in file_name:
- # 提取平台名,如 "raw_cases/case_bili.json" -> "bili"
- match = re.search(r'case_([a-z]+)\.json', file_name)
- if match:
- problem_platforms.append(match.group(1))
- has_raw_cases_error = len(problem_platforms) > 0
- has_blueprint_error = any(item["file"] == "blueprint.json" for item in error_items)
- has_capabilities_error = any(item["file"] == "capabilities_extracted.json" for item in error_items)
- has_strategy_error = any(item["file"] == "strategy.json" for item in error_items)
- if has_raw_cases_error:
- if len(problem_platforms) == 1:
- # 单个平台有问题,只重跑该平台,但要级联删除后续所有文件
- platform = problem_platforms[0]
- return "single_platform", [f"raw_cases/case_{platform}.json", "blueprint.json", "capabilities_extracted.json", "strategy.json"], [platform]
- else:
- # 多个平台有问题,或者 raw_cases 整体有问题,完整重跑
- return "full", ["raw_cases", "blueprint.json", "capabilities_extracted.json", "strategy.json"], problem_platforms
- elif has_blueprint_error:
- # blueprint 有问题,删除 blueprint 及后续文件
- return "from_blueprint", ["blueprint.json", "capabilities_extracted.json", "strategy.json"], []
- elif has_capabilities_error:
- # capabilities 有问题,删除 capabilities 和 strategy
- return "from_capabilities", ["capabilities_extracted.json", "strategy.json"], []
- elif has_strategy_error:
- # 只有 strategy 有问题,只删除 strategy
- return "from_strategy", ["strategy.json"], []
- return "unknown", [], []
- def delete_files(req_id, files_to_delete):
- """删除指定的文件"""
- output_dir = OUTPUT_DIR / req_id
- deleted = []
- for file_name in files_to_delete:
- if file_name == "raw_cases":
- # 删除整个 raw_cases 目录
- raw_cases_dir = output_dir / "raw_cases"
- if raw_cases_dir.exists():
- import shutil
- shutil.rmtree(raw_cases_dir)
- deleted.append(f"{req_id}/raw_cases/")
- elif file_name.startswith("raw_cases/"):
- # 删除单个 case 文件
- file_path = output_dir / file_name
- if file_path.exists():
- file_path.unlink()
- deleted.append(f"{req_id}/{file_name}")
- else:
- file_path = output_dir / file_name
- if file_path.exists():
- file_path.unlink()
- deleted.append(f"{req_id}/{file_name}")
- return deleted
- def generate_rerun_commands(errors_dict):
- """生成重跑命令"""
- commands = []
- for req_id, error_files in sorted(errors_dict.items()):
- strategy, files_to_delete, problem_platforms = determine_cleanup_strategy(req_id, error_files)
- req_index = int(req_id) - 1 # run_pipeline.py 使用 0-based index
- if strategy == "single_platform":
- # 只重跑单个平台,blueprint/capabilities/strategy 已被级联删除
- platforms_str = ",".join(problem_platforms)
- commands.append({
- "req_id": req_id,
- "index": req_index,
- "strategy": strategy,
- "files_to_delete": files_to_delete,
- "command": f"python run_pipeline.py --index {req_index} --platforms {platforms_str}"
- })
- elif strategy == "full":
- # 完整重跑(包括所有 research)
- commands.append({
- "req_id": req_id,
- "index": req_index,
- "strategy": strategy,
- "files_to_delete": files_to_delete,
- "command": f"python run_pipeline.py --index {req_index}"
- })
- elif strategy == "from_blueprint":
- # 跳过 research,从 blueprint 开始
- commands.append({
- "req_id": req_id,
- "index": req_index,
- "strategy": strategy,
- "files_to_delete": files_to_delete,
- "command": f"python run_pipeline.py --index {req_index} --skip-research"
- })
- elif strategy == "from_capabilities":
- # 跳过 research;blueprint 仍在会被跳过,capabilities + strategy 会重跑
- commands.append({
- "req_id": req_id,
- "index": req_index,
- "strategy": strategy,
- "files_to_delete": files_to_delete,
- "command": f"python run_pipeline.py --index {req_index} --skip-research"
- })
- elif strategy == "from_strategy":
- # 跳过 research;blueprint/capabilities 仍在会被跳过,只重跑 strategy
- commands.append({
- "req_id": req_id,
- "index": req_index,
- "strategy": strategy,
- "files_to_delete": files_to_delete,
- "command": f"python run_pipeline.py --index {req_index} --skip-research"
- })
- return commands
- def main():
- import argparse
- parser = argparse.ArgumentParser(description="Fix broken output files based on schema validation")
- parser.add_argument("--dry-run", action="store_true", help="Show what would be done without actually doing it")
- parser.add_argument("--delete-only", action="store_true", help="Only delete broken files, don't rerun")
- parser.add_argument("--rerun-only", action="store_true", help="Only rerun (assume files already deleted)")
- parser.add_argument("--workers", type=int, default=4, help="Number of parallel CMD windows to spawn")
- parser.add_argument("--no-spawn", action="store_true", help="Only generate worker .bat files, don't auto-spawn windows")
- args = parser.parse_args()
- report_path = REPORT_PATH
- if not report_path.exists():
- print(f"[ERROR] {report_path} not found. Run validate_schema.py first.")
- return
- print("[*] Parsing error report...")
- errors_dict = parse_error_report(report_path)
- if not errors_dict:
- print("[OK] No errors found in report!")
- return
- print(f"Found {len(errors_dict)} requirements with errors:")
- for req_id, items in sorted(errors_dict.items()):
- files_str = ", ".join(item["file"] for item in items)
- print(f" - REQ_{req_id}: {files_str}")
- print("\n" + "="*60)
- print("[*] Generating cleanup and rerun plan...")
- print("="*60)
- commands = generate_rerun_commands(errors_dict)
- for cmd_info in commands:
- print(f"\n[FIX] REQ_{cmd_info['req_id']} ({cmd_info['strategy']}):")
- print(f" Delete: {', '.join(cmd_info['files_to_delete'])}")
- print(f" Rerun: {cmd_info['command']}")
- if args.dry_run:
- print("\n[DRY-RUN] No files were deleted or commands executed.")
- return
- # 执行删除
- if not args.rerun_only:
- print("\n" + "="*60)
- print("[*] Deleting broken files...")
- print("="*60)
- for cmd_info in commands:
- deleted = delete_files(cmd_info['req_id'], cmd_info['files_to_delete'])
- for f in deleted:
- print(f" ✓ Deleted: {f}")
- if args.delete_only:
- print("\n[DELETE-ONLY] Files deleted. Skipping rerun.")
- return
- # 生成多窗口并行批处理脚本
- print("\n" + "="*60)
- print("[*] Generating multi-window parallel batch scripts...")
- print("="*60)
- total_tasks = len(commands)
- num_workers = min(args.workers, total_tasks)
- # 使用 round-robin 分配任务到各个 worker
- worker_tasks = {i: [] for i in range(num_workers)}
- for i, cmd_info in enumerate(commands):
- worker_id = i % num_workers
- worker_tasks[worker_id].append(cmd_info)
- print(f"Distributing {total_tasks} tasks across {num_workers} workers...")
- # bat 文件生成在 pipeline 根目录(run_pipeline.py 所在目录)
- bat_dir = PIPELINE_DIR
- # 为每个 worker 生成独立的批处理文件
- for worker_id, tasks in worker_tasks.items():
- if not tasks:
- continue
- bat_path = bat_dir / f"fix_worker_{worker_id}.bat"
- with open(bat_path, "w", encoding="utf-8") as f:
- f.write("@echo off\n")
- f.write("cd /d %~dp0\n")
- f.write("if exist ..\\..\\venv\\Scripts\\activate.bat call ..\\..\\venv\\Scripts\\activate.bat\n")
- f.write("if exist ..\\..\\.venv\\Scripts\\activate.bat call ..\\..\\.venv\\Scripts\\activate.bat\n")
- f.write(f"echo Worker {worker_id} starting with {len(tasks)} tasks...\n")
- f.write("echo ========================================\n\n")
- for cmd_info in tasks:
- f.write(f"echo.\n")
- f.write(f"echo ========================================\n")
- f.write(f"echo [Worker {worker_id}] Fixing REQ_{cmd_info['req_id']} ({cmd_info['strategy']})\n")
- f.write(f"echo ========================================\n")
- f.write(f"{cmd_info['command']}\n")
- f.write(f"timeout /t 2 > NUL\n\n")
- f.write(f"echo.\n")
- f.write(f"echo ========================================\n")
- f.write(f"echo Worker {worker_id} completed all tasks!\n")
- f.write(f"echo ========================================\n")
- f.write("pause\n")
- print(f" Worker {worker_id}: {len(tasks)} tasks -> {bat_path}")
- # 生成主启动脚本
- launcher_path = bat_dir / "fix_broken_outputs_parallel.bat"
- with open(launcher_path, "w", encoding="utf-8") as f:
- f.write("@echo off\n")
- f.write("cd /d %~dp0\n")
- f.write(f"echo Spawning {num_workers} parallel workers to fix {total_tasks} broken outputs...\n")
- f.write("echo ========================================\n\n")
- for worker_id in range(num_workers):
- if worker_id in worker_tasks and worker_tasks[worker_id]:
- f.write(f'start "Fix_Worker_{worker_id}" cmd /c fix_worker_{worker_id}.bat\n')
- f.write(f"timeout /t 1 > NUL\n")
- f.write("\necho.\n")
- f.write(f"echo All {num_workers} workers have been launched in separate windows.\n")
- f.write("echo ========================================\n")
- f.write("pause\n")
- print(f"\n[OK] Parallel batch scripts generated!")
- print(f"\nTo execute all fixes in parallel, run:")
- print(f" {launcher_path}")
- print(f"\nOr manually launch individual workers:")
- for worker_id in range(num_workers):
- if worker_id in worker_tasks and worker_tasks[worker_id]:
- print(f" fix_worker_{worker_id}.bat")
- # 如果不是 no-spawn 模式,自动启动窗口
- if not args.no_spawn:
- import subprocess
- print(f"\n[*] Auto-spawning {num_workers} worker windows...")
- for worker_id in range(num_workers):
- if worker_id in worker_tasks and worker_tasks[worker_id]:
- bat_file = bat_dir / f"fix_worker_{worker_id}.bat"
- subprocess.Popen(
- ["cmd.exe", "/c", "start", f"Fix_Worker_{worker_id}", "cmd.exe", "/c", str(bat_file.absolute())],
- cwd=str(bat_dir)
- )
- print(f" Launched Worker {worker_id}")
- print(f"\n[OK] All workers launched! Check the new CMD windows.")
- if __name__ == "__main__":
- main()
|