| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- """Corpus-driven platform client for the M0 replay harness.
- Feeds a harvested case corpus's captured discovered-content items back through
- the live pipeline as if they were fresh platform search results. Decode and
- category-match are reused from tests.p4_helpers (seeded with the corpus seed
- terms) since pattern-recall is not the regression target here — rule judgment
- and walk are.
- """
- from __future__ import annotations
- import copy
- from collections import defaultdict
- from typing import Any
- # Runtime-record envelope fields that must be stripped to recover a
- # platform-search-result shape (content_discovery.run rebuilds the rest).
- _ENVELOPE_FIELDS = {
- "record_schema_version",
- "run_id",
- "policy_run_id",
- "raw_payload",
- "created_at",
- }
- def _as_platform_result(item: dict[str, Any]) -> dict[str, Any]:
- return {key: value for key, value in item.items() if key not in _ENVELOPE_FIELDS}
- class CorpusPlatformClient:
- """Returns captured discovered items grouped under their base search query.
- Page-N items (e.g. ``q_001_page_002``) are grouped under their base query
- (``q_001``) so every captured content item re-enters the pipeline via the
- two queries that ``search_intent`` regenerates, without depending on the
- original run's (buggy) pagination behaviour.
- """
- def __init__(self, discovered_items: list[dict[str, Any]]) -> None:
- self._by_base_query: dict[str, list[dict[str, Any]]] = defaultdict(list)
- for item in discovered_items:
- base = str(item.get("search_query_id", "")).split("_page_")[0]
- self._by_base_query[base].append(_as_platform_result(item))
- self.search_calls: list[dict[str, Any]] = []
- self.author_calls: list[dict[str, Any]] = []
- def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- self.search_calls.append(dict(query))
- base = str(query.get("search_query_id", "")).split("_page_")[0]
- captured = self._by_base_query.pop(base, [])
- results = []
- for item in captured:
- result = copy.deepcopy(item)
- result["search_query_id"] = query["search_query_id"]
- results.append(result)
- return results
- def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- self.author_calls.append(dict(query))
- return []
|