""" case.json 历史快照恢复工具 每次 pipeline 运行会在 {output_dir}/history// 留下快照盒: - case.json 运行开始前的 case.json 完整快照 - run.log 该次运行的全文日志(stdout + stderr) 本脚本用于查看运行清单、恢复整个文件或恢复单个 case。 用法: # 列出所有 run(含 case 数、有 workflow 数、log 大小) python script/recover.py list output/112 # 全文件回滚到某次 run 开始前的状态 python script/recover.py restore output/112 20260509_141802 # 仅恢复指定 case(其他 case 不动) python script/recover.py restore output/112 20260509_141802 --case-index 12 # 批量恢复多个 case python script/recover.py restore output/112 20260509_141802 --case-index 1,5,12 """ import argparse import json import shutil import sys from datetime import datetime from pathlib import Path from typing import List, Optional def _summarize_case_file(p: Path) -> dict: """读取 case.json 并返回简要统计""" try: with open(p, "r", encoding="utf-8") as f: d = json.load(f) except Exception as e: return {"error": f"{type(e).__name__}: {e}"} cases = d.get("cases", []) if isinstance(d, dict) else [] with_wf = sum( 1 for c in cases if (c.get("workflow_groups") or (c.get("workflow") and c.get("workflow", {}).get("steps"))) ) return { "size_bytes": p.stat().st_size, "total_cases": len(cases), "with_workflow": with_wf, } def cmd_list(output_dir: Path) -> int: history_dir = output_dir / "history" if not history_dir.exists(): print(f"No history directory at: {history_dir}") return 0 # 每次 run 是 history//,每个里面有 case.json 和 run.log run_dirs = sorted( [d for d in history_dir.iterdir() if d.is_dir()], key=lambda d: d.name, reverse=True, ) if not run_dirs: print(f"History directory empty: {history_dir}") return 0 current = output_dir / "case.json" if current.exists(): info = _summarize_case_file(current) print(f"\n=== Current case.json ===") print(f" total={info.get('total_cases', 0)} with_workflow={info.get('with_workflow', 0)} size={info.get('size_bytes', 0):,}B") print(f"\n=== Run history in {history_dir} (newest first, {len(run_dirs)} runs) ===") print(f" {'run_id':<20} {'cases':>5} {'with_wf':>7} {'case.json':>10} {'log':>9}") for d in run_dirs: snap = d / "case.json" log = d / "run.log" if snap.exists(): info = _summarize_case_file(snap) cases_str = str(info.get("total_cases", 0)) wf_str = str(info.get("with_workflow", 0)) snap_size = f"{info.get('size_bytes', 0):,}B" else: cases_str, wf_str, snap_size = "-", "-", "(missing)" log_size = f"{log.stat().st_size:,}B" if log.exists() else "(none)" print(f" {d.name:<20} {cases_str:>5} {wf_str:>7} {snap_size:>10} {log_size:>9}") return 0 def _parse_case_indices(s: str) -> List[int]: """把 '1,5,12' 拆成 [1, 5, 12],单数字也支持""" return [int(x.strip()) for x in s.split(",") if x.strip()] def cmd_restore( output_dir: Path, run_id: str, case_indices: Optional[List[int]], ) -> int: history_dir = output_dir / "history" snap_path = history_dir / run_id / "case.json" if not snap_path.exists(): print(f"Snapshot not found: {snap_path}", file=sys.stderr) return 1 target = output_dir / "case.json" if not target.exists(): print(f"Target case.json not found: {target}", file=sys.stderr) return 1 # 恢复前先快照当前 case.json — 让 restore 操作本身也可回滚 # 用 set_run_id("restore_") 让快照落到独立 run 文件夹 from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file set_run_id(f"restore_{datetime.now():%Y%m%d_%H%M%S}") pre_snap = snapshot_case_file(target, step="pre_restore") if pre_snap: print(f" [snapshot] saved current state to history/{pre_snap.parent.name}/{pre_snap.name}") # 加载快照 with open(snap_path, "r", encoding="utf-8") as f: snap_data = json.load(f) if case_indices is None: # 整文件恢复 shutil.copy2(snap_path, target) info = _summarize_case_file(target) print(f" ✓ restored full case.json from run {run_id}") print(f" now: total={info.get('total_cases', 0)} with_workflow={info.get('with_workflow', 0)}") return 0 # 单/多 case 恢复 snap_cases = {c.get("index"): c for c in snap_data.get("cases", [])} missing = [i for i in case_indices if i not in snap_cases] if missing: print(f" ✗ case index {missing} not found in snapshot", file=sys.stderr) return 1 with open(target, "r", encoding="utf-8") as f: target_data = json.load(f) target_cases = target_data.get("cases", []) target_index_map = {c.get("index"): i for i, c in enumerate(target_cases)} restored: List[int] = [] appended: List[int] = [] for idx in case_indices: snap_case = snap_cases[idx] if idx in target_index_map: target_cases[target_index_map[idx]] = snap_case restored.append(idx) else: target_cases.append(snap_case) appended.append(idx) target_cases.sort(key=lambda c: c.get("index", 0)) target_data["cases"] = target_cases with open(target, "w", encoding="utf-8") as f: json.dump(target_data, f, ensure_ascii=False, indent=2) print(f" ✓ restored {len(restored)} case(s) from run {run_id}: {restored}") if appended: print(f" + appended {len(appended)} new case(s) (not present in current): {appended}") return 0 def main() -> int: parser = argparse.ArgumentParser(description="Restore case.json from history snapshots") sub = parser.add_subparsers(dest="cmd", required=True) p_list = sub.add_parser("list", help="List snapshots in {output_dir}/history/") p_list.add_argument("output_dir", type=Path, help="output dir, e.g. examples/process_pipeline/output/112") p_restore = sub.add_parser("restore", help="Restore from a run snapshot (auto-snapshots current state first)") p_restore.add_argument("output_dir", type=Path) p_restore.add_argument("run_id", help="run id (timestamp folder name), e.g. 20260509_141802") p_restore.add_argument( "--case-index", type=str, default=None, help="restore only specified case(s), comma-separated (e.g. '12' or '1,5,12'); omit to restore full file", ) args = parser.parse_args() if args.cmd == "list": return cmd_list(args.output_dir) elif args.cmd == "restore": indices = _parse_case_indices(args.case_index) if args.case_index else None return cmd_restore(args.output_dir, args.run_id, indices) return 1 if __name__ == "__main__": # 让脚本能在 repo 根目录直接 python script/recover.py 跑 repo_root = Path(__file__).resolve().parent.parent.parent.parent if str(repo_root) not in sys.path: sys.path.insert(0, str(repo_root)) sys.exit(main())