| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """
- case.json 历史快照恢复工具
- 每次 pipeline 运行会在 {output_dir}/history/<run_id>/ 留下快照盒:
- - case.json 运行开始前的 case.json 完整快照
- - run.log 该次运行的全文日志(stdout + stderr)
- 本脚本用于查看运行清单、恢复整个文件或恢复单个 case。
- 用法:
- # 列出所有 run(含 case 数、有 workflow 数、log 大小)
- python script/recover.py list output/112
- # 全文件回滚到某次 run 开始前的状态
- python script/recover.py restore output/112 20260509_141802
- # 仅恢复指定 case(其他 case 不动)
- python script/recover.py restore output/112 20260509_141802 --case-index 12
- # 批量恢复多个 case
- python script/recover.py restore output/112 20260509_141802 --case-index 1,5,12
- """
- import argparse
- import json
- import shutil
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import List, Optional
- def _summarize_case_file(p: Path) -> dict:
- """读取 case.json 并返回简要统计"""
- try:
- with open(p, "r", encoding="utf-8") as f:
- d = json.load(f)
- except Exception as e:
- return {"error": f"{type(e).__name__}: {e}"}
- cases = d.get("cases", []) if isinstance(d, dict) else []
- with_wf = sum(
- 1 for c in cases
- if (c.get("workflow_groups")
- or (c.get("workflow") and c.get("workflow", {}).get("steps")))
- )
- return {
- "size_bytes": p.stat().st_size,
- "total_cases": len(cases),
- "with_workflow": with_wf,
- }
- def cmd_list(output_dir: Path) -> int:
- history_dir = output_dir / "history"
- if not history_dir.exists():
- print(f"No history directory at: {history_dir}")
- return 0
- # 每次 run 是 history/<run_id>/,每个里面有 case.json 和 run.log
- run_dirs = sorted(
- [d for d in history_dir.iterdir() if d.is_dir()],
- key=lambda d: d.name,
- reverse=True,
- )
- if not run_dirs:
- print(f"History directory empty: {history_dir}")
- return 0
- current = output_dir / "case.json"
- if current.exists():
- info = _summarize_case_file(current)
- print(f"\n=== Current case.json ===")
- print(f" total={info.get('total_cases', 0)} with_workflow={info.get('with_workflow', 0)} size={info.get('size_bytes', 0):,}B")
- print(f"\n=== Run history in {history_dir} (newest first, {len(run_dirs)} runs) ===")
- print(f" {'run_id':<20} {'cases':>5} {'with_wf':>7} {'case.json':>10} {'log':>9}")
- for d in run_dirs:
- snap = d / "case.json"
- log = d / "run.log"
- if snap.exists():
- info = _summarize_case_file(snap)
- cases_str = str(info.get("total_cases", 0))
- wf_str = str(info.get("with_workflow", 0))
- snap_size = f"{info.get('size_bytes', 0):,}B"
- else:
- cases_str, wf_str, snap_size = "-", "-", "(missing)"
- log_size = f"{log.stat().st_size:,}B" if log.exists() else "(none)"
- print(f" {d.name:<20} {cases_str:>5} {wf_str:>7} {snap_size:>10} {log_size:>9}")
- return 0
- def _parse_case_indices(s: str) -> List[int]:
- """把 '1,5,12' 拆成 [1, 5, 12],单数字也支持"""
- return [int(x.strip()) for x in s.split(",") if x.strip()]
- def cmd_restore(
- output_dir: Path,
- run_id: str,
- case_indices: Optional[List[int]],
- ) -> int:
- history_dir = output_dir / "history"
- snap_path = history_dir / run_id / "case.json"
- if not snap_path.exists():
- print(f"Snapshot not found: {snap_path}", file=sys.stderr)
- return 1
- target = output_dir / "case.json"
- if not target.exists():
- print(f"Target case.json not found: {target}", file=sys.stderr)
- return 1
- # 恢复前先快照当前 case.json — 让 restore 操作本身也可回滚
- # 用 set_run_id("restore_<ts>") 让快照落到独立 run 文件夹
- from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file
- set_run_id(f"restore_{datetime.now():%Y%m%d_%H%M%S}")
- pre_snap = snapshot_case_file(target, step="pre_restore")
- if pre_snap:
- print(f" [snapshot] saved current state to history/{pre_snap.parent.name}/{pre_snap.name}")
- # 加载快照
- with open(snap_path, "r", encoding="utf-8") as f:
- snap_data = json.load(f)
- if case_indices is None:
- # 整文件恢复
- shutil.copy2(snap_path, target)
- info = _summarize_case_file(target)
- print(f" ✓ restored full case.json from run {run_id}")
- print(f" now: total={info.get('total_cases', 0)} with_workflow={info.get('with_workflow', 0)}")
- return 0
- # 单/多 case 恢复
- snap_cases = {c.get("index"): c for c in snap_data.get("cases", [])}
- missing = [i for i in case_indices if i not in snap_cases]
- if missing:
- print(f" ✗ case index {missing} not found in snapshot", file=sys.stderr)
- return 1
- with open(target, "r", encoding="utf-8") as f:
- target_data = json.load(f)
- target_cases = target_data.get("cases", [])
- target_index_map = {c.get("index"): i for i, c in enumerate(target_cases)}
- restored: List[int] = []
- appended: List[int] = []
- for idx in case_indices:
- snap_case = snap_cases[idx]
- if idx in target_index_map:
- target_cases[target_index_map[idx]] = snap_case
- restored.append(idx)
- else:
- target_cases.append(snap_case)
- appended.append(idx)
- target_cases.sort(key=lambda c: c.get("index", 0))
- target_data["cases"] = target_cases
- with open(target, "w", encoding="utf-8") as f:
- json.dump(target_data, f, ensure_ascii=False, indent=2)
- print(f" ✓ restored {len(restored)} case(s) from run {run_id}: {restored}")
- if appended:
- print(f" + appended {len(appended)} new case(s) (not present in current): {appended}")
- return 0
- def main() -> int:
- parser = argparse.ArgumentParser(description="Restore case.json from history snapshots")
- sub = parser.add_subparsers(dest="cmd", required=True)
- p_list = sub.add_parser("list", help="List snapshots in {output_dir}/history/")
- p_list.add_argument("output_dir", type=Path, help="output dir, e.g. examples/process_pipeline/output/112")
- p_restore = sub.add_parser("restore", help="Restore from a run snapshot (auto-snapshots current state first)")
- p_restore.add_argument("output_dir", type=Path)
- p_restore.add_argument("run_id", help="run id (timestamp folder name), e.g. 20260509_141802")
- p_restore.add_argument(
- "--case-index",
- type=str,
- default=None,
- help="restore only specified case(s), comma-separated (e.g. '12' or '1,5,12'); omit to restore full file",
- )
- args = parser.parse_args()
- if args.cmd == "list":
- return cmd_list(args.output_dir)
- elif args.cmd == "restore":
- indices = _parse_case_indices(args.case_index) if args.case_index else None
- return cmd_restore(args.output_dir, args.run_id, indices)
- return 1
- if __name__ == "__main__":
- # 让脚本能在 repo 根目录直接 python script/recover.py 跑
- repo_root = Path(__file__).resolve().parent.parent.parent.parent
- if str(repo_root) not in sys.path:
- sys.path.insert(0, str(repo_root))
- sys.exit(main())
|