clean_capabilities.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. """
  2. 清洗脚本:移除 case.json 中的 capabilities 数组
  3. 背景:capability 概念已被 fragment 取代,case.json 里遗留的 capabilities 字段需要清除。
  4. 用法:
  5. # 清洗指定需求目录的 case.json
  6. python -m examples.process_pipeline.script.clean_capabilities --index 108
  7. # 预览(不实际修改文件)
  8. python -m examples.process_pipeline.script.clean_capabilities --index 108 --dry-run
  9. # 不备份(默认会写 .bak)
  10. python -m examples.process_pipeline.script.clean_capabilities --index 108 --no-backup
  11. # 批量清洗所有 output 目录
  12. python -m examples.process_pipeline.script.clean_capabilities --all
  13. """
  14. import argparse
  15. import json
  16. import shutil
  17. import sys
  18. from pathlib import Path
  19. from typing import Any, Dict, List, Tuple
  20. # Windows 控制台 UTF-8 输出
  21. if sys.platform == "win32":
  22. sys.stdout.reconfigure(encoding="utf-8")
  23. sys.stderr.reconfigure(encoding="utf-8")
  24. PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent.parent
  25. OUTPUT_DIR = Path(__file__).resolve().parent.parent / "output"
  26. def clean_case_file(
  27. case_file: Path,
  28. dry_run: bool = False,
  29. backup: bool = True,
  30. ) -> Dict[str, int]:
  31. """
  32. 清洗单个 case.json 文件中的 capabilities 数组。
  33. Returns:
  34. stats dict: {"total_cases": int, "with_capabilities": int, "without": int}
  35. """
  36. if not case_file.exists():
  37. raise FileNotFoundError(f"Case file not found: {case_file}")
  38. with open(case_file, "r", encoding="utf-8") as f:
  39. data = json.load(f)
  40. cases = data.get("cases", [])
  41. with_cap_count = 0
  42. for case in cases:
  43. if "capabilities" in case:
  44. with_cap_count += 1
  45. if not dry_run:
  46. del case["capabilities"]
  47. without_count = len(cases) - with_cap_count
  48. if dry_run:
  49. return {
  50. "total_cases": len(cases),
  51. "with_capabilities": with_cap_count,
  52. "without": without_count,
  53. "action": "dry_run",
  54. }
  55. if with_cap_count > 0:
  56. if backup:
  57. backup_file = case_file.with_suffix(".json.bak")
  58. shutil.copy2(case_file, backup_file)
  59. with open(case_file, "w", encoding="utf-8") as f:
  60. json.dump(data, f, ensure_ascii=False, indent=2)
  61. return {
  62. "total_cases": len(cases),
  63. "with_capabilities": with_cap_count,
  64. "without": without_count,
  65. "action": "cleaned" if with_cap_count > 0 else "no_change",
  66. }
  67. def main():
  68. parser = argparse.ArgumentParser(description="移除 case.json 中的 capabilities 数组")
  69. group = parser.add_mutually_exclusive_group(required=True)
  70. group.add_argument("--index", type=str, help="需求目录索引(如 108 或 108,109,110)")
  71. group.add_argument("--all", action="store_true", help="清洗 output 下所有需求目录")
  72. parser.add_argument("--dry-run", action="store_true", help="只预览,不修改文件")
  73. parser.add_argument("--no-backup", action="store_true", help="不创建 .bak 备份文件")
  74. args = parser.parse_args()
  75. backup = not args.no_backup
  76. # 确定要处理的目录列表
  77. target_dirs: List[Path] = []
  78. if args.all:
  79. for d in sorted(OUTPUT_DIR.iterdir()):
  80. if d.is_dir() and d.name.isdigit():
  81. case_file = d / "case.json"
  82. if case_file.exists():
  83. target_dirs.append(d)
  84. else:
  85. for idx_str in args.index.split(","):
  86. idx_str = idx_str.strip()
  87. # 支持 "108" 或 "108/raw_cases" 格式,统一取目录名
  88. target_dir = OUTPUT_DIR / idx_str.zfill(3) if len(idx_str) <= 3 else OUTPUT_DIR / idx_str
  89. if not target_dir.exists():
  90. print(f"⚠️ 目录不存在: {target_dir}")
  91. continue
  92. target_dirs.append(target_dir)
  93. if not target_dirs:
  94. print("❌ 没有找到任何可处理的目录")
  95. sys.exit(1)
  96. print(f"{'[Dry Run] ' if args.dry_run else ''}处理 {len(target_dirs)} 个目录 (backup={'on' if backup else 'off'})")
  97. print("=" * 60)
  98. total_cleaned = 0
  99. total_cases = 0
  100. for d in target_dirs:
  101. case_file = d / "case.json"
  102. if not case_file.exists():
  103. print(f" [{d.name}] ⏭️ case.json 不存在")
  104. continue
  105. try:
  106. stats = clean_case_file(case_file, dry_run=args.dry_run, backup=backup)
  107. except Exception as e:
  108. print(f" [{d.name}] ❌ 错误: {type(e).__name__}: {e}")
  109. continue
  110. total_cases += stats["total_cases"]
  111. total_cleaned += stats["with_capabilities"]
  112. icon = "🔍" if args.dry_run else ("✅" if stats["action"] == "cleaned" else "⏭️ ")
  113. print(
  114. f" [{d.name}] {icon} "
  115. f"{stats['action']}: {stats['with_capabilities']}/{stats['total_cases']} cases 含 capabilities"
  116. )
  117. print("=" * 60)
  118. if args.dry_run:
  119. print(f"📊 [Dry Run Summary] 共 {total_cases} 个 case,{total_cleaned} 个含 capabilities(未修改)")
  120. else:
  121. print(f"📊 [Summary] 共处理 {total_cases} 个 case,清除了 {total_cleaned} 个 capabilities 数组")
  122. if backup and total_cleaned > 0:
  123. print(f" 💾 原始文件已备份为 case.json.bak")
  124. if __name__ == "__main__":
  125. main()