| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- from __future__ import annotations
- import json
- import shutil
- from pathlib import Path
- from typing import Any
- RUNTIME_FILENAMES = [
- "source_context.json",
- "pattern_seed_pack.json",
- "search_queries.jsonl",
- "discovered_content_items.jsonl",
- "content_media_records.jsonl",
- "pattern_recall_evidence.jsonl",
- "rule_decisions.jsonl",
- "walk_actions.jsonl",
- "run_events.jsonl",
- "source_path_records.jsonl",
- "search_clues.jsonl",
- "final_output.json",
- "strategy_review.json",
- ]
- class LocalRuntimeFileStore:
- def __init__(self, base_dir: Path | str = Path("runtime/v1")) -> None:
- self.base_dir = Path(base_dir)
- def prepare_run(self, run_id: str) -> Path:
- path = self.run_dir(run_id)
- if path.exists():
- raise FileExistsError(f"run already exists: {run_id}")
- path.mkdir(parents=True, exist_ok=True)
- return path
- def run_dir(self, run_id: str) -> Path:
- return self.base_dir / run_id
- def write_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
- path = self.run_dir(run_id) / filename
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
- return path
- def update_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
- return self.write_json(run_id, filename, data)
- def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
- path = self.run_dir(run_id) / filename
- path.parent.mkdir(parents=True, exist_ok=True)
- if filename in {"pattern_recall_evidence.jsonl", "search_queries.jsonl"}:
- rows = _replace_keyed_rows(
- self.read_jsonl(run_id, filename),
- rows,
- _jsonl_key_fields(filename),
- )
- path.write_text(
- "".join(
- json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n"
- for row in rows
- ),
- encoding="utf-8",
- )
- return path
- with path.open("a", encoding="utf-8") as file:
- for row in rows:
- file.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n")
- return path
- def read_json(self, run_id: str, filename: str) -> dict[str, Any]:
- path = self.run_dir(run_id) / filename
- return json.loads(path.read_text(encoding="utf-8"))
- def read_jsonl(self, run_id: str, filename: str) -> list[dict[str, Any]]:
- path = self.run_dir(run_id) / filename
- if not path.exists():
- return []
- return [
- json.loads(line)
- for line in path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- def file_status(self, run_id: str) -> dict[str, bool]:
- run_dir = self.run_dir(run_id)
- return {filename: (run_dir / filename).exists() for filename in RUNTIME_FILENAMES}
- def list_runs(self) -> list[str]:
- if not self.base_dir.exists():
- return []
- return sorted(path.name for path in self.base_dir.iterdir() if path.is_dir())
- def create_run_record(self, record: dict[str, Any]) -> None:
- return None
- def update_run_record(self, run_id: str, updates: dict[str, Any]) -> None:
- return None
- def record_policy_run(self, record: dict[str, Any]) -> None:
- return None
- def append_run_event_records(
- self,
- run_id: str,
- policy_run_id: str,
- rows: list[dict[str, Any]],
- ) -> None:
- return None
- def write_publish_jobs(
- self,
- run_id: str,
- policy_run_id: str,
- rows: list[dict[str, Any]],
- ) -> None:
- return None
- def write_author_assets(self, rows: list[dict[str, Any]]) -> None:
- return None
- def write_author_asset_roles(self, rows: list[dict[str, Any]]) -> None:
- return None
- def write_search_clue_assets(self, rows: list[dict[str, Any]]) -> None:
- return None
- def write_search_clue_asset_evidence(self, rows: list[dict[str, Any]]) -> None:
- return None
- def read_performance_feedback(
- self,
- run_id: str,
- policy_run_id: str,
- ) -> list[dict[str, Any]]:
- return []
- def _replace_keyed_rows(
- existing_rows: list[dict[str, Any]],
- new_rows: list[dict[str, Any]],
- key_fields: tuple[str, ...],
- ) -> list[dict[str, Any]]:
- keyed_rows: dict[tuple[Any, ...], dict[str, Any]] = {}
- order: list[tuple[Any, ...]] = []
- for row in [*existing_rows, *new_rows]:
- key = tuple(row.get(field) for field in key_fields)
- if key not in keyed_rows:
- order.append(key)
- keyed_rows[key] = row
- return [keyed_rows[key] for key in order]
- def _jsonl_key_fields(filename: str) -> tuple[str, ...]:
- if filename == "pattern_recall_evidence.jsonl":
- return ("run_id", "policy_run_id", "recall_evidence_id")
- if filename == "search_queries.jsonl":
- return ("run_id", "policy_run_id", "search_query_id")
- raise ValueError(f"unsupported keyed JSONL file: {filename}")
|