| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- """
- case.json 写前快照工具
- 每次 pipeline 运行(一次 run_pipeline.py 调用)会被分配一个 run_id(时间戳),
- 对应 {output_dir}/history/{run_id}/ 文件夹。该文件夹里:
- - case.json 本次运行开始前的 case.json 完整快照(用于回滚)
- - run.log 本次运行的 stdout/stderr 全文(由 run_pipeline.py 的 tee 写入)
- 设计原则:
- - 一次 run = 一个 history 子文件夹 = 一个回滚点
- - snapshot_case_file 是 first-call-wins:本次运行已快照过就不再覆盖(避免被中
- 途的中间状态污染回滚点)
- - 静默失败:快照失败不阻塞主流程
- """
- import shutil
- from datetime import datetime
- from pathlib import Path
- from typing import Optional, Union
- _RUN_ID: Optional[str] = None
- def set_run_id(rid: str) -> None:
- """由 run_pipeline.py 在启动时调用,固定本次 run 的 id(建议时间戳格式)"""
- global _RUN_ID
- _RUN_ID = rid
- def get_run_id() -> str:
- """取当前 run id;未 set 过则用 fallback 时间戳(一般不应走到这里)"""
- if _RUN_ID is None:
- return datetime.now().strftime("%Y%m%d_%H%M%S")
- return _RUN_ID
- def get_run_history_dir(case_file: Union[str, Path]) -> Path:
- """根据 case_file 位置算出本次 run 的 history 子目录路径"""
- p = Path(case_file)
- return p.parent / "history" / get_run_id()
- def snapshot_case_file(
- case_file: Union[str, Path],
- step: str = "unknown",
- ) -> Optional[Path]:
- """
- 在 case_file 被覆写之前调用。把当前内容复制到本次 run 对应的 history 子目录。
- First-call-wins:若该 run 的快照已存在,直接返回已有路径,不覆盖。
- 这样无论是 step 启动还是 merge-back 调,本次运行的回滚点都是"运行开始前的状态"。
- Args:
- case_file: 即将被覆写的 case.json 路径
- step: 仅用于日志/调试,不进入路径
- Returns:
- 快照文件路径;原文件不存在或异常时返回 None
- """
- p = Path(case_file)
- if not p.exists():
- return None
- try:
- run_dir = get_run_history_dir(p)
- run_dir.mkdir(parents=True, exist_ok=True)
- dst = run_dir / "case.json"
- if dst.exists():
- return dst # first-call-wins
- shutil.copy2(p, dst)
- return dst
- except Exception as e:
- print(f" ⚠ snapshot failed: {type(e).__name__}: {e}")
- return None
|