replay_clients.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """Corpus-driven platform client for the M0 replay harness.
  2. Feeds a harvested case corpus's captured discovered-content items back through
  3. the live pipeline as if they were fresh platform search results. Decode and
  4. category-match are reused from tests.p4_helpers (seeded with the corpus seed
  5. terms) since pattern-recall is not the regression target here — rule judgment
  6. and walk are.
  7. """
  8. from __future__ import annotations
  9. import copy
  10. from collections import defaultdict
  11. from typing import Any
  12. # Runtime-record envelope fields that must be stripped to recover a
  13. # platform-search-result shape (content_discovery.run rebuilds the rest).
  14. _ENVELOPE_FIELDS = {
  15. "record_schema_version",
  16. "run_id",
  17. "policy_run_id",
  18. "raw_payload",
  19. "created_at",
  20. }
  21. def _as_platform_result(item: dict[str, Any]) -> dict[str, Any]:
  22. return {key: value for key, value in item.items() if key not in _ENVELOPE_FIELDS}
  23. class CorpusPlatformClient:
  24. """Returns captured discovered items grouped under their base search query.
  25. Page-N items (e.g. ``q_001_page_002``) are grouped under their base query
  26. (``q_001``) so every captured content item re-enters the pipeline via the
  27. two queries that ``search_intent`` regenerates, without depending on the
  28. original run's (buggy) pagination behaviour.
  29. """
  30. def __init__(self, discovered_items: list[dict[str, Any]]) -> None:
  31. self._by_base_query: dict[str, list[dict[str, Any]]] = defaultdict(list)
  32. for item in discovered_items:
  33. base = str(item.get("search_query_id", "")).split("_page_")[0]
  34. self._by_base_query[base].append(_as_platform_result(item))
  35. self.search_calls: list[dict[str, Any]] = []
  36. self.author_calls: list[dict[str, Any]] = []
  37. def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
  38. self.search_calls.append(dict(query))
  39. base = str(query.get("search_query_id", "")).split("_page_")[0]
  40. captured = self._by_base_query.pop(base, [])
  41. results = []
  42. for item in captured:
  43. result = copy.deepcopy(item)
  44. result["search_query_id"] = query["search_query_id"]
  45. results.append(result)
  46. return results
  47. def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
  48. self.author_calls.append(dict(query))
  49. return []