| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- """
- 清洗脚本:移除 case.json 中的 capabilities 数组
- 背景:capability 概念已被 fragment 取代,case.json 里遗留的 capabilities 字段需要清除。
- 用法:
- # 清洗指定需求目录的 case.json
- python -m examples.process_pipeline.script.clean_capabilities --index 108
- # 预览(不实际修改文件)
- python -m examples.process_pipeline.script.clean_capabilities --index 108 --dry-run
- # 不备份(默认会写 .bak)
- python -m examples.process_pipeline.script.clean_capabilities --index 108 --no-backup
- # 批量清洗所有 output 目录
- python -m examples.process_pipeline.script.clean_capabilities --all
- """
- import argparse
- import json
- import shutil
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Tuple
- # Windows 控制台 UTF-8 输出
- if sys.platform == "win32":
- sys.stdout.reconfigure(encoding="utf-8")
- sys.stderr.reconfigure(encoding="utf-8")
- PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent.parent
- OUTPUT_DIR = Path(__file__).resolve().parent.parent / "output"
- def clean_case_file(
- case_file: Path,
- dry_run: bool = False,
- backup: bool = True,
- ) -> Dict[str, int]:
- """
- 清洗单个 case.json 文件中的 capabilities 数组。
- Returns:
- stats dict: {"total_cases": int, "with_capabilities": int, "without": int}
- """
- if not case_file.exists():
- raise FileNotFoundError(f"Case file not found: {case_file}")
- with open(case_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- cases = data.get("cases", [])
- with_cap_count = 0
- for case in cases:
- if "capabilities" in case:
- with_cap_count += 1
- if not dry_run:
- del case["capabilities"]
- without_count = len(cases) - with_cap_count
- if dry_run:
- return {
- "total_cases": len(cases),
- "with_capabilities": with_cap_count,
- "without": without_count,
- "action": "dry_run",
- }
- if with_cap_count > 0:
- if backup:
- backup_file = case_file.with_suffix(".json.bak")
- shutil.copy2(case_file, backup_file)
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- return {
- "total_cases": len(cases),
- "with_capabilities": with_cap_count,
- "without": without_count,
- "action": "cleaned" if with_cap_count > 0 else "no_change",
- }
- def main():
- parser = argparse.ArgumentParser(description="移除 case.json 中的 capabilities 数组")
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument("--index", type=str, help="需求目录索引(如 108 或 108,109,110)")
- group.add_argument("--all", action="store_true", help="清洗 output 下所有需求目录")
- parser.add_argument("--dry-run", action="store_true", help="只预览,不修改文件")
- parser.add_argument("--no-backup", action="store_true", help="不创建 .bak 备份文件")
- args = parser.parse_args()
- backup = not args.no_backup
- # 确定要处理的目录列表
- target_dirs: List[Path] = []
- if args.all:
- for d in sorted(OUTPUT_DIR.iterdir()):
- if d.is_dir() and d.name.isdigit():
- case_file = d / "case.json"
- if case_file.exists():
- target_dirs.append(d)
- else:
- for idx_str in args.index.split(","):
- idx_str = idx_str.strip()
- # 支持 "108" 或 "108/raw_cases" 格式,统一取目录名
- target_dir = OUTPUT_DIR / idx_str.zfill(3) if len(idx_str) <= 3 else OUTPUT_DIR / idx_str
- if not target_dir.exists():
- print(f"⚠️ 目录不存在: {target_dir}")
- continue
- target_dirs.append(target_dir)
- if not target_dirs:
- print("❌ 没有找到任何可处理的目录")
- sys.exit(1)
- print(f"{'[Dry Run] ' if args.dry_run else ''}处理 {len(target_dirs)} 个目录 (backup={'on' if backup else 'off'})")
- print("=" * 60)
- total_cleaned = 0
- total_cases = 0
- for d in target_dirs:
- case_file = d / "case.json"
- if not case_file.exists():
- print(f" [{d.name}] ⏭️ case.json 不存在")
- continue
- try:
- stats = clean_case_file(case_file, dry_run=args.dry_run, backup=backup)
- except Exception as e:
- print(f" [{d.name}] ❌ 错误: {type(e).__name__}: {e}")
- continue
- total_cases += stats["total_cases"]
- total_cleaned += stats["with_capabilities"]
- icon = "🔍" if args.dry_run else ("✅" if stats["action"] == "cleaned" else "⏭️ ")
- print(
- f" [{d.name}] {icon} "
- f"{stats['action']}: {stats['with_capabilities']}/{stats['total_cases']} cases 含 capabilities"
- )
- print("=" * 60)
- if args.dry_run:
- print(f"📊 [Dry Run Summary] 共 {total_cases} 个 case,{total_cleaned} 个含 capabilities(未修改)")
- else:
- print(f"📊 [Summary] 共处理 {total_cases} 个 case,清除了 {total_cleaned} 个 capabilities 数组")
- if backup and total_cleaned > 0:
- print(f" 💾 原始文件已备份为 case.json.bak")
- if __name__ == "__main__":
- main()
|