replay_harness.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. """Offline replay harness for harvested case corpora (V2-M0B).
  2. Feeds a scrubbed corpus (tests/fixtures/cases/{case_id}/input/) back through the
  3. live RunService pipeline in platform_mode="mock", reusing the existing
  4. dependency-injection seam (constructor clients + the `_platform_client`
  5. override). Rule judgment and walk run for real; the harness only supplies the
  6. captured inputs. Returns the produced runtime artifacts for snapshotting.
  7. """
  8. from __future__ import annotations
  9. import json
  10. from dataclasses import dataclass, field
  11. from pathlib import Path
  12. from typing import Any
  13. from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
  14. from content_agent.interfaces import GeminiVideoClient
  15. from content_agent.run_service import RunService
  16. from content_agent.schemas import RunStartRequest
  17. from tests.gemini_helpers import FakeGeminiVideoClient
  18. from tests.p1_helpers import FakeQueryVariantClient
  19. from tests.replay_clients import CorpusPlatformClient
  20. CASES_DIR = Path("tests/fixtures/cases")
  21. _JSON_FILES = {"source_context.json", "pattern_seed_pack.json", "final_output.json", "strategy_review.json"}
  22. @dataclass
  23. class RunArtifacts:
  24. run_id: str
  25. state: dict[str, Any]
  26. files: dict[str, Any] = field(default_factory=dict)
  27. @property
  28. def summary(self) -> dict[str, Any]:
  29. return (self.files.get("final_output.json") or {}).get("summary", {})
  30. @property
  31. def decisions(self) -> list[dict[str, Any]]:
  32. return self.files.get("rule_decisions.jsonl") or []
  33. def load_corpus(case_id: str, cases_dir: Path | str = CASES_DIR) -> dict[str, Any]:
  34. """Read a corpus input dir; every file is stored as whole JSON (dict or list)."""
  35. input_dir = Path(cases_dir) / case_id / "input"
  36. corpus: dict[str, Any] = {}
  37. for path in sorted(input_dir.glob("*")):
  38. corpus[path.name] = json.loads(path.read_text(encoding="utf-8"))
  39. return corpus
  40. def _variants_from_queries(search_queries: list[dict[str, Any]]) -> dict[str, str]:
  41. variants: dict[str, str] = {}
  42. for row in search_queries:
  43. if row.get("search_query_generation_method") == "llm_variant":
  44. terms = row.get("query_source_terms") or []
  45. if terms:
  46. variants[terms[0]] = row.get("search_query", "")
  47. return variants
  48. def replay_case(
  49. case_id: str,
  50. *,
  51. runtime_root: Path | str,
  52. cases_dir: Path | str = CASES_DIR,
  53. config_overrides: dict[str, Any] | None = None,
  54. gemini_video_client: GeminiVideoClient | None = None,
  55. run_id: str | None = None,
  56. ) -> RunArtifacts:
  57. corpus = load_corpus(case_id, cases_dir)
  58. source_context = corpus["source_context.json"]
  59. seed_terms = source_context["ext_data"]["evidence_pack"].get("seed_terms") or ["种子词"]
  60. runtime_root = Path(runtime_root)
  61. runtime_root.mkdir(parents=True, exist_ok=True)
  62. source_path = runtime_root / f"{case_id}_source.json"
  63. source_path.write_text(json.dumps(source_context, ensure_ascii=False, indent=2), encoding="utf-8")
  64. discovered = corpus.get("discovered_content_items.jsonl", [])
  65. variants = _variants_from_queries(corpus.get("search_queries.jsonl", []))
  66. service = RunService(
  67. runtime_root=runtime_root,
  68. query_variant_client=FakeQueryVariantClient(variants=variants),
  69. gemini_video_client=gemini_video_client or FakeGeminiVideoClient(),
  70. )
  71. if config_overrides and config_overrides.get("policy_store") is not None:
  72. service.policy_store = config_overrides["policy_store"]
  73. service._platform_client = lambda platform, platform_mode: CorpusPlatformClient(discovered)
  74. state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path), run_id=run_id))
  75. files: dict[str, Any] = {}
  76. for filename in RUNTIME_FILENAMES:
  77. try:
  78. if filename in _JSON_FILES:
  79. files[filename] = service.runtime.read_json(state["run_id"], filename)
  80. else:
  81. files[filename] = service.runtime.read_jsonl(state["run_id"], filename)
  82. except FileNotFoundError:
  83. continue
  84. return RunArtifacts(run_id=state["run_id"], state=state, files=files)