recover.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """
  2. case.json 历史快照恢复工具
  3. 每次 pipeline 运行会在 {output_dir}/history/<run_id>/ 留下快照盒:
  4. - case.json 运行开始前的 case.json 完整快照
  5. - run.log 该次运行的全文日志(stdout + stderr)
  6. 本脚本用于查看运行清单、恢复整个文件或恢复单个 case。
  7. 用法:
  8. # 列出所有 run(含 case 数、有 workflow 数、log 大小)
  9. python script/recover.py list output/112
  10. # 全文件回滚到某次 run 开始前的状态
  11. python script/recover.py restore output/112 20260509_141802
  12. # 仅恢复指定 case(其他 case 不动)
  13. python script/recover.py restore output/112 20260509_141802 --case-index 12
  14. # 批量恢复多个 case
  15. python script/recover.py restore output/112 20260509_141802 --case-index 1,5,12
  16. """
  17. import argparse
  18. import json
  19. import shutil
  20. import sys
  21. from datetime import datetime
  22. from pathlib import Path
  23. from typing import List, Optional
  24. def _summarize_case_file(p: Path) -> dict:
  25. """读取 case.json 并返回简要统计"""
  26. try:
  27. with open(p, "r", encoding="utf-8") as f:
  28. d = json.load(f)
  29. except Exception as e:
  30. return {"error": f"{type(e).__name__}: {e}"}
  31. cases = d.get("cases", []) if isinstance(d, dict) else []
  32. with_wf = sum(
  33. 1 for c in cases
  34. if (c.get("workflow_groups")
  35. or (c.get("workflow") and c.get("workflow", {}).get("steps")))
  36. )
  37. return {
  38. "size_bytes": p.stat().st_size,
  39. "total_cases": len(cases),
  40. "with_workflow": with_wf,
  41. }
  42. def cmd_list(output_dir: Path) -> int:
  43. history_dir = output_dir / "history"
  44. if not history_dir.exists():
  45. print(f"No history directory at: {history_dir}")
  46. return 0
  47. # 每次 run 是 history/<run_id>/,每个里面有 case.json 和 run.log
  48. run_dirs = sorted(
  49. [d for d in history_dir.iterdir() if d.is_dir()],
  50. key=lambda d: d.name,
  51. reverse=True,
  52. )
  53. if not run_dirs:
  54. print(f"History directory empty: {history_dir}")
  55. return 0
  56. current = output_dir / "case.json"
  57. if current.exists():
  58. info = _summarize_case_file(current)
  59. print(f"\n=== Current case.json ===")
  60. print(f" total={info.get('total_cases', 0)} with_workflow={info.get('with_workflow', 0)} size={info.get('size_bytes', 0):,}B")
  61. print(f"\n=== Run history in {history_dir} (newest first, {len(run_dirs)} runs) ===")
  62. print(f" {'run_id':<20} {'cases':>5} {'with_wf':>7} {'case.json':>10} {'log':>9}")
  63. for d in run_dirs:
  64. snap = d / "case.json"
  65. log = d / "run.log"
  66. if snap.exists():
  67. info = _summarize_case_file(snap)
  68. cases_str = str(info.get("total_cases", 0))
  69. wf_str = str(info.get("with_workflow", 0))
  70. snap_size = f"{info.get('size_bytes', 0):,}B"
  71. else:
  72. cases_str, wf_str, snap_size = "-", "-", "(missing)"
  73. log_size = f"{log.stat().st_size:,}B" if log.exists() else "(none)"
  74. print(f" {d.name:<20} {cases_str:>5} {wf_str:>7} {snap_size:>10} {log_size:>9}")
  75. return 0
  76. def _parse_case_indices(s: str) -> List[int]:
  77. """把 '1,5,12' 拆成 [1, 5, 12],单数字也支持"""
  78. return [int(x.strip()) for x in s.split(",") if x.strip()]
  79. def cmd_restore(
  80. output_dir: Path,
  81. run_id: str,
  82. case_indices: Optional[List[int]],
  83. ) -> int:
  84. history_dir = output_dir / "history"
  85. snap_path = history_dir / run_id / "case.json"
  86. if not snap_path.exists():
  87. print(f"Snapshot not found: {snap_path}", file=sys.stderr)
  88. return 1
  89. target = output_dir / "case.json"
  90. if not target.exists():
  91. print(f"Target case.json not found: {target}", file=sys.stderr)
  92. return 1
  93. # 恢复前先快照当前 case.json — 让 restore 操作本身也可回滚
  94. # 用 set_run_id("restore_<ts>") 让快照落到独立 run 文件夹
  95. from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file
  96. set_run_id(f"restore_{datetime.now():%Y%m%d_%H%M%S}")
  97. pre_snap = snapshot_case_file(target, step="pre_restore")
  98. if pre_snap:
  99. print(f" [snapshot] saved current state to history/{pre_snap.parent.name}/{pre_snap.name}")
  100. # 加载快照
  101. with open(snap_path, "r", encoding="utf-8") as f:
  102. snap_data = json.load(f)
  103. if case_indices is None:
  104. # 整文件恢复
  105. shutil.copy2(snap_path, target)
  106. info = _summarize_case_file(target)
  107. print(f" ✓ restored full case.json from run {run_id}")
  108. print(f" now: total={info.get('total_cases', 0)} with_workflow={info.get('with_workflow', 0)}")
  109. return 0
  110. # 单/多 case 恢复
  111. snap_cases = {c.get("index"): c for c in snap_data.get("cases", [])}
  112. missing = [i for i in case_indices if i not in snap_cases]
  113. if missing:
  114. print(f" ✗ case index {missing} not found in snapshot", file=sys.stderr)
  115. return 1
  116. with open(target, "r", encoding="utf-8") as f:
  117. target_data = json.load(f)
  118. target_cases = target_data.get("cases", [])
  119. target_index_map = {c.get("index"): i for i, c in enumerate(target_cases)}
  120. restored: List[int] = []
  121. appended: List[int] = []
  122. for idx in case_indices:
  123. snap_case = snap_cases[idx]
  124. if idx in target_index_map:
  125. target_cases[target_index_map[idx]] = snap_case
  126. restored.append(idx)
  127. else:
  128. target_cases.append(snap_case)
  129. appended.append(idx)
  130. target_cases.sort(key=lambda c: c.get("index", 0))
  131. target_data["cases"] = target_cases
  132. with open(target, "w", encoding="utf-8") as f:
  133. json.dump(target_data, f, ensure_ascii=False, indent=2)
  134. print(f" ✓ restored {len(restored)} case(s) from run {run_id}: {restored}")
  135. if appended:
  136. print(f" + appended {len(appended)} new case(s) (not present in current): {appended}")
  137. return 0
  138. def main() -> int:
  139. parser = argparse.ArgumentParser(description="Restore case.json from history snapshots")
  140. sub = parser.add_subparsers(dest="cmd", required=True)
  141. p_list = sub.add_parser("list", help="List snapshots in {output_dir}/history/")
  142. p_list.add_argument("output_dir", type=Path, help="output dir, e.g. examples/process_pipeline/output/112")
  143. p_restore = sub.add_parser("restore", help="Restore from a run snapshot (auto-snapshots current state first)")
  144. p_restore.add_argument("output_dir", type=Path)
  145. p_restore.add_argument("run_id", help="run id (timestamp folder name), e.g. 20260509_141802")
  146. p_restore.add_argument(
  147. "--case-index",
  148. type=str,
  149. default=None,
  150. help="restore only specified case(s), comma-separated (e.g. '12' or '1,5,12'); omit to restore full file",
  151. )
  152. args = parser.parse_args()
  153. if args.cmd == "list":
  154. return cmd_list(args.output_dir)
  155. elif args.cmd == "restore":
  156. indices = _parse_case_indices(args.case_index) if args.case_index else None
  157. return cmd_restore(args.output_dir, args.run_id, indices)
  158. return 1
  159. if __name__ == "__main__":
  160. # 让脚本能在 repo 根目录直接 python script/recover.py 跑
  161. repo_root = Path(__file__).resolve().parent.parent.parent.parent
  162. if str(repo_root) not in sys.path:
  163. sys.path.insert(0, str(repo_root))
  164. sys.exit(main())