| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- """Offline replay harness for harvested case corpora (V2-M0B).
- Feeds a scrubbed corpus (tests/fixtures/cases/{case_id}/input/) back through the
- live RunService pipeline in platform_mode="mock", reusing the existing
- dependency-injection seam (constructor clients + the `_platform_client`
- override). Rule judgment and walk run for real; the harness only supplies the
- captured inputs. Returns the produced runtime artifacts for snapshotting.
- """
- from __future__ import annotations
- import json
- from dataclasses import dataclass, field
- from pathlib import Path
- from typing import Any
- from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
- from content_agent.interfaces import GeminiVideoClient
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- from tests.gemini_helpers import FakeGeminiVideoClient
- from tests.p1_helpers import FakeQueryVariantClient
- from tests.replay_clients import CorpusPlatformClient
- CASES_DIR = Path("tests/fixtures/cases")
- _JSON_FILES = {"source_context.json", "pattern_seed_pack.json", "final_output.json", "strategy_review.json"}
- @dataclass
- class RunArtifacts:
- run_id: str
- state: dict[str, Any]
- files: dict[str, Any] = field(default_factory=dict)
- @property
- def summary(self) -> dict[str, Any]:
- return (self.files.get("final_output.json") or {}).get("summary", {})
- @property
- def decisions(self) -> list[dict[str, Any]]:
- return self.files.get("rule_decisions.jsonl") or []
- def load_corpus(case_id: str, cases_dir: Path | str = CASES_DIR) -> dict[str, Any]:
- """Read a corpus input dir; every file is stored as whole JSON (dict or list)."""
- input_dir = Path(cases_dir) / case_id / "input"
- corpus: dict[str, Any] = {}
- for path in sorted(input_dir.glob("*")):
- corpus[path.name] = json.loads(path.read_text(encoding="utf-8"))
- return corpus
- def _variants_from_queries(search_queries: list[dict[str, Any]]) -> dict[str, str]:
- variants: dict[str, str] = {}
- for row in search_queries:
- if row.get("search_query_generation_method") == "llm_variant":
- terms = row.get("query_source_terms") or []
- if terms:
- variants[terms[0]] = row.get("search_query", "")
- return variants
- def replay_case(
- case_id: str,
- *,
- runtime_root: Path | str,
- cases_dir: Path | str = CASES_DIR,
- config_overrides: dict[str, Any] | None = None,
- gemini_video_client: GeminiVideoClient | None = None,
- run_id: str | None = None,
- ) -> RunArtifacts:
- corpus = load_corpus(case_id, cases_dir)
- source_context = corpus["source_context.json"]
- seed_terms = source_context["ext_data"]["evidence_pack"].get("seed_terms") or ["种子词"]
- runtime_root = Path(runtime_root)
- runtime_root.mkdir(parents=True, exist_ok=True)
- source_path = runtime_root / f"{case_id}_source.json"
- source_path.write_text(json.dumps(source_context, ensure_ascii=False, indent=2), encoding="utf-8")
- discovered = corpus.get("discovered_content_items.jsonl", [])
- variants = _variants_from_queries(corpus.get("search_queries.jsonl", []))
- service = RunService(
- runtime_root=runtime_root,
- query_variant_client=FakeQueryVariantClient(variants=variants),
- gemini_video_client=gemini_video_client or FakeGeminiVideoClient(),
- )
- if config_overrides and config_overrides.get("policy_store") is not None:
- service.policy_store = config_overrides["policy_store"]
- service._platform_client = lambda platform, platform_mode: CorpusPlatformClient(discovered)
- state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path), run_id=run_id))
- files: dict[str, Any] = {}
- for filename in RUNTIME_FILENAMES:
- try:
- if filename in _JSON_FILES:
- files[filename] = service.runtime.read_json(state["run_id"], filename)
- else:
- files[filename] = service.runtime.read_jsonl(state["run_id"], filename)
- except FileNotFoundError:
- continue
- return RunArtifacts(run_id=state["run_id"], state=state, files=files)
|