case_history.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. """
  2. case.json 写前快照工具
  3. 每次 pipeline 运行(一次 run_pipeline.py 调用)会被分配一个 run_id(时间戳),
  4. 对应 {output_dir}/history/{run_id}/ 文件夹。该文件夹里:
  5. - case.json 本次运行开始前的 case.json 完整快照(用于回滚)
  6. - run.log 本次运行的 stdout/stderr 全文(由 run_pipeline.py 的 tee 写入)
  7. 设计原则:
  8. - 一次 run = 一个 history 子文件夹 = 一个回滚点
  9. - snapshot_case_file 是 first-call-wins:本次运行已快照过就不再覆盖(避免被中
  10. 途的中间状态污染回滚点)
  11. - 静默失败:快照失败不阻塞主流程
  12. """
  13. import shutil
  14. from datetime import datetime
  15. from pathlib import Path
  16. from typing import Optional, Union
  17. _RUN_ID: Optional[str] = None
  18. def set_run_id(rid: str) -> None:
  19. """由 run_pipeline.py 在启动时调用,固定本次 run 的 id(建议时间戳格式)"""
  20. global _RUN_ID
  21. _RUN_ID = rid
  22. def get_run_id() -> str:
  23. """取当前 run id;未 set 过则用 fallback 时间戳(一般不应走到这里)"""
  24. if _RUN_ID is None:
  25. return datetime.now().strftime("%Y%m%d_%H%M%S")
  26. return _RUN_ID
  27. def get_run_history_dir(case_file: Union[str, Path]) -> Path:
  28. """根据 case_file 位置算出本次 run 的 history 子目录路径"""
  29. p = Path(case_file)
  30. return p.parent / "history" / get_run_id()
  31. def snapshot_case_file(
  32. case_file: Union[str, Path],
  33. step: str = "unknown",
  34. ) -> Optional[Path]:
  35. """
  36. 在 case_file 被覆写之前调用。把当前内容复制到本次 run 对应的 history 子目录。
  37. First-call-wins:若该 run 的快照已存在,直接返回已有路径,不覆盖。
  38. 这样无论是 step 启动还是 merge-back 调,本次运行的回滚点都是"运行开始前的状态"。
  39. Args:
  40. case_file: 即将被覆写的 case.json 路径
  41. step: 仅用于日志/调试,不进入路径
  42. Returns:
  43. 快照文件路径;原文件不存在或异常时返回 None
  44. """
  45. p = Path(case_file)
  46. if not p.exists():
  47. return None
  48. try:
  49. run_dir = get_run_history_dir(p)
  50. run_dir.mkdir(parents=True, exist_ok=True)
  51. dst = run_dir / "case.json"
  52. if dst.exists():
  53. return dst # first-call-wins
  54. shutil.copy2(p, dst)
  55. return dst
  56. except Exception as e:
  57. print(f" ⚠ snapshot failed: {type(e).__name__}: {e}")
  58. return None