""" 清洗脚本:移除 case.json 中的 capabilities 数组 背景:capability 概念已被 fragment 取代,case.json 里遗留的 capabilities 字段需要清除。 用法: # 清洗指定需求目录的 case.json python -m examples.process_pipeline.script.clean_capabilities --index 108 # 预览(不实际修改文件) python -m examples.process_pipeline.script.clean_capabilities --index 108 --dry-run # 不备份(默认会写 .bak) python -m examples.process_pipeline.script.clean_capabilities --index 108 --no-backup # 批量清洗所有 output 目录 python -m examples.process_pipeline.script.clean_capabilities --all """ import argparse import json import shutil import sys from pathlib import Path from typing import Any, Dict, List, Tuple # Windows 控制台 UTF-8 输出 if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent.parent OUTPUT_DIR = Path(__file__).resolve().parent.parent / "output" def clean_case_file( case_file: Path, dry_run: bool = False, backup: bool = True, ) -> Dict[str, int]: """ 清洗单个 case.json 文件中的 capabilities 数组。 Returns: stats dict: {"total_cases": int, "with_capabilities": int, "without": int} """ if not case_file.exists(): raise FileNotFoundError(f"Case file not found: {case_file}") with open(case_file, "r", encoding="utf-8") as f: data = json.load(f) cases = data.get("cases", []) with_cap_count = 0 for case in cases: if "capabilities" in case: with_cap_count += 1 if not dry_run: del case["capabilities"] without_count = len(cases) - with_cap_count if dry_run: return { "total_cases": len(cases), "with_capabilities": with_cap_count, "without": without_count, "action": "dry_run", } if with_cap_count > 0: if backup: backup_file = case_file.with_suffix(".json.bak") shutil.copy2(case_file, backup_file) with open(case_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return { "total_cases": len(cases), "with_capabilities": with_cap_count, "without": without_count, "action": "cleaned" if with_cap_count > 0 else "no_change", } def main(): parser = argparse.ArgumentParser(description="移除 case.json 中的 capabilities 数组") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--index", type=str, help="需求目录索引(如 108 或 108,109,110)") group.add_argument("--all", action="store_true", help="清洗 output 下所有需求目录") parser.add_argument("--dry-run", action="store_true", help="只预览,不修改文件") parser.add_argument("--no-backup", action="store_true", help="不创建 .bak 备份文件") args = parser.parse_args() backup = not args.no_backup # 确定要处理的目录列表 target_dirs: List[Path] = [] if args.all: for d in sorted(OUTPUT_DIR.iterdir()): if d.is_dir() and d.name.isdigit(): case_file = d / "case.json" if case_file.exists(): target_dirs.append(d) else: for idx_str in args.index.split(","): idx_str = idx_str.strip() # 支持 "108" 或 "108/raw_cases" 格式,统一取目录名 target_dir = OUTPUT_DIR / idx_str.zfill(3) if len(idx_str) <= 3 else OUTPUT_DIR / idx_str if not target_dir.exists(): print(f"⚠️ 目录不存在: {target_dir}") continue target_dirs.append(target_dir) if not target_dirs: print("❌ 没有找到任何可处理的目录") sys.exit(1) print(f"{'[Dry Run] ' if args.dry_run else ''}处理 {len(target_dirs)} 个目录 (backup={'on' if backup else 'off'})") print("=" * 60) total_cleaned = 0 total_cases = 0 for d in target_dirs: case_file = d / "case.json" if not case_file.exists(): print(f" [{d.name}] ⏭️ case.json 不存在") continue try: stats = clean_case_file(case_file, dry_run=args.dry_run, backup=backup) except Exception as e: print(f" [{d.name}] ❌ 错误: {type(e).__name__}: {e}") continue total_cases += stats["total_cases"] total_cleaned += stats["with_capabilities"] icon = "🔍" if args.dry_run else ("✅" if stats["action"] == "cleaned" else "⏭️ ") print( f" [{d.name}] {icon} " f"{stats['action']}: {stats['with_capabilities']}/{stats['total_cases']} cases 含 capabilities" ) print("=" * 60) if args.dry_run: print(f"📊 [Dry Run Summary] 共 {total_cases} 个 case,{total_cleaned} 个含 capabilities(未修改)") else: print(f"📊 [Summary] 共处理 {total_cases} 个 case,清除了 {total_cleaned} 个 capabilities 数组") if backup and total_cleaned > 0: print(f" 💾 原始文件已备份为 case.json.bak") if __name__ == "__main__": main()