runtime_files.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from __future__ import annotations
  2. import json
  3. import shutil
  4. from pathlib import Path
  5. from typing import Any
  6. RUNTIME_FILENAMES = [
  7. "source_context.json",
  8. "pattern_seed_pack.json",
  9. "search_queries.jsonl",
  10. "discovered_content_items.jsonl",
  11. "content_media_records.jsonl",
  12. "pattern_recall_evidence.jsonl",
  13. "rule_decisions.jsonl",
  14. "walk_actions.jsonl",
  15. "run_events.jsonl",
  16. "source_path_records.jsonl",
  17. "search_clues.jsonl",
  18. "final_output.json",
  19. "strategy_review.json",
  20. ]
  21. class LocalRuntimeFileStore:
  22. def __init__(self, base_dir: Path | str = Path("runtime/v1")) -> None:
  23. self.base_dir = Path(base_dir)
  24. def prepare_run(self, run_id: str) -> Path:
  25. path = self.run_dir(run_id)
  26. if path.exists():
  27. raise FileExistsError(f"run already exists: {run_id}")
  28. path.mkdir(parents=True, exist_ok=True)
  29. return path
  30. def run_dir(self, run_id: str) -> Path:
  31. return self.base_dir / run_id
  32. def write_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
  33. path = self.run_dir(run_id) / filename
  34. path.parent.mkdir(parents=True, exist_ok=True)
  35. path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
  36. return path
  37. def update_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
  38. return self.write_json(run_id, filename, data)
  39. def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
  40. path = self.run_dir(run_id) / filename
  41. path.parent.mkdir(parents=True, exist_ok=True)
  42. if filename in {"pattern_recall_evidence.jsonl", "search_queries.jsonl"}:
  43. rows = _replace_keyed_rows(
  44. self.read_jsonl(run_id, filename),
  45. rows,
  46. _jsonl_key_fields(filename),
  47. )
  48. path.write_text(
  49. "".join(
  50. json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n"
  51. for row in rows
  52. ),
  53. encoding="utf-8",
  54. )
  55. return path
  56. with path.open("a", encoding="utf-8") as file:
  57. for row in rows:
  58. file.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n")
  59. return path
  60. def read_json(self, run_id: str, filename: str) -> dict[str, Any]:
  61. path = self.run_dir(run_id) / filename
  62. return json.loads(path.read_text(encoding="utf-8"))
  63. def read_jsonl(self, run_id: str, filename: str) -> list[dict[str, Any]]:
  64. path = self.run_dir(run_id) / filename
  65. if not path.exists():
  66. return []
  67. return [
  68. json.loads(line)
  69. for line in path.read_text(encoding="utf-8").splitlines()
  70. if line.strip()
  71. ]
  72. def file_status(self, run_id: str) -> dict[str, bool]:
  73. run_dir = self.run_dir(run_id)
  74. return {filename: (run_dir / filename).exists() for filename in RUNTIME_FILENAMES}
  75. def list_runs(self) -> list[str]:
  76. if not self.base_dir.exists():
  77. return []
  78. return sorted(path.name for path in self.base_dir.iterdir() if path.is_dir())
  79. def create_run_record(self, record: dict[str, Any]) -> None:
  80. return None
  81. def update_run_record(self, run_id: str, updates: dict[str, Any]) -> None:
  82. return None
  83. def record_policy_run(self, record: dict[str, Any]) -> None:
  84. return None
  85. def append_run_event_records(
  86. self,
  87. run_id: str,
  88. policy_run_id: str,
  89. rows: list[dict[str, Any]],
  90. ) -> None:
  91. return None
  92. def write_publish_jobs(
  93. self,
  94. run_id: str,
  95. policy_run_id: str,
  96. rows: list[dict[str, Any]],
  97. ) -> None:
  98. return None
  99. def write_author_assets(self, rows: list[dict[str, Any]]) -> None:
  100. return None
  101. def write_author_asset_roles(self, rows: list[dict[str, Any]]) -> None:
  102. return None
  103. def write_search_clue_assets(self, rows: list[dict[str, Any]]) -> None:
  104. return None
  105. def write_search_clue_asset_evidence(self, rows: list[dict[str, Any]]) -> None:
  106. return None
  107. def read_performance_feedback(
  108. self,
  109. run_id: str,
  110. policy_run_id: str,
  111. ) -> list[dict[str, Any]]:
  112. return []
  113. def _replace_keyed_rows(
  114. existing_rows: list[dict[str, Any]],
  115. new_rows: list[dict[str, Any]],
  116. key_fields: tuple[str, ...],
  117. ) -> list[dict[str, Any]]:
  118. keyed_rows: dict[tuple[Any, ...], dict[str, Any]] = {}
  119. order: list[tuple[Any, ...]] = []
  120. for row in [*existing_rows, *new_rows]:
  121. key = tuple(row.get(field) for field in key_fields)
  122. if key not in keyed_rows:
  123. order.append(key)
  124. keyed_rows[key] = row
  125. return [keyed_rows[key] for key in order]
  126. def _jsonl_key_fields(filename: str) -> tuple[str, ...]:
  127. if filename == "pattern_recall_evidence.jsonl":
  128. return ("run_id", "policy_run_id", "recall_evidence_id")
  129. if filename == "search_queries.jsonl":
  130. return ("run_id", "policy_run_id", "search_query_id")
  131. raise ValueError(f"unsupported keyed JSONL file: {filename}")