from __future__ import annotations import json import shutil from pathlib import Path from typing import Any RUNTIME_FILENAMES = [ "source_context.json", "pattern_seed_pack.json", "search_queries.jsonl", "discovered_content_items.jsonl", "content_media_records.jsonl", "pattern_recall_evidence.jsonl", "rule_decisions.jsonl", "walk_actions.jsonl", "run_events.jsonl", "source_path_records.jsonl", "search_clues.jsonl", "final_output.json", "strategy_review.json", ] class LocalRuntimeFileStore: def __init__(self, base_dir: Path | str = Path("runtime/v1")) -> None: self.base_dir = Path(base_dir) def prepare_run(self, run_id: str) -> Path: path = self.run_dir(run_id) if path.exists(): raise FileExistsError(f"run already exists: {run_id}") path.mkdir(parents=True, exist_ok=True) return path def run_dir(self, run_id: str) -> Path: return self.base_dir / run_id def write_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path: path = self.run_dir(run_id) / filename path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") return path def update_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path: return self.write_json(run_id, filename, data) def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path: path = self.run_dir(run_id) / filename path.parent.mkdir(parents=True, exist_ok=True) if filename in {"pattern_recall_evidence.jsonl", "search_queries.jsonl"}: rows = _replace_keyed_rows( self.read_jsonl(run_id, filename), rows, _jsonl_key_fields(filename), ) path.write_text( "".join( json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n" for row in rows ), encoding="utf-8", ) return path with path.open("a", encoding="utf-8") as file: for row in rows: file.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n") return path def read_json(self, run_id: str, filename: str) -> dict[str, Any]: path = self.run_dir(run_id) / filename return json.loads(path.read_text(encoding="utf-8")) def read_jsonl(self, run_id: str, filename: str) -> list[dict[str, Any]]: path = self.run_dir(run_id) / filename if not path.exists(): return [] return [ json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line.strip() ] def file_status(self, run_id: str) -> dict[str, bool]: run_dir = self.run_dir(run_id) return {filename: (run_dir / filename).exists() for filename in RUNTIME_FILENAMES} def list_runs(self) -> list[str]: if not self.base_dir.exists(): return [] return sorted(path.name for path in self.base_dir.iterdir() if path.is_dir()) def create_run_record(self, record: dict[str, Any]) -> None: return None def update_run_record(self, run_id: str, updates: dict[str, Any]) -> None: return None def record_policy_run(self, record: dict[str, Any]) -> None: return None def append_run_event_records( self, run_id: str, policy_run_id: str, rows: list[dict[str, Any]], ) -> None: return None def write_publish_jobs( self, run_id: str, policy_run_id: str, rows: list[dict[str, Any]], ) -> None: return None def write_author_assets(self, rows: list[dict[str, Any]]) -> None: return None def write_author_asset_roles(self, rows: list[dict[str, Any]]) -> None: return None def write_search_clue_assets(self, rows: list[dict[str, Any]]) -> None: return None def write_search_clue_asset_evidence(self, rows: list[dict[str, Any]]) -> None: return None def read_performance_feedback( self, run_id: str, policy_run_id: str, ) -> list[dict[str, Any]]: return [] def _replace_keyed_rows( existing_rows: list[dict[str, Any]], new_rows: list[dict[str, Any]], key_fields: tuple[str, ...], ) -> list[dict[str, Any]]: keyed_rows: dict[tuple[Any, ...], dict[str, Any]] = {} order: list[tuple[Any, ...]] = [] for row in [*existing_rows, *new_rows]: key = tuple(row.get(field) for field in key_fields) if key not in keyed_rows: order.append(key) keyed_rows[key] = row return [keyed_rows[key] for key in order] def _jsonl_key_fields(filename: str) -> tuple[str, ...]: if filename == "pattern_recall_evidence.jsonl": return ("run_id", "policy_run_id", "recall_evidence_id") if filename == "search_queries.jsonl": return ("run_id", "policy_run_id", "search_query_id") raise ValueError(f"unsupported keyed JSONL file: {filename}")