| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- from __future__ import annotations
- from content_agent.business_modules.run_record import recorder
- from content_agent.integrations.policy_json import JsonPolicyBundleStore
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
- def test_search_clues_aggregate_query_effect_status_from_decisions(tmp_path):
- service = RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- query_variant_client=FakeQueryVariantClient(),
- )
- state = service.start_run(
- RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
- )
- clues = {
- clue["search_query_id"]: clue
- for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
- }
- # M3 受控变化: mock content scores relevance 60 + zero platform_heat = 60, so both
- # of q_001's contents land in the review band → query aggregates to pending.
- assert clues["q_001"]["search_query_effect_status"] == "pending"
- assert clues["q_001"]["effect_status_counts"] == {"pending": 2}
- assert clues["q_001"]["query_aggregation_id"] == "agg_query_pending"
- assert clues["q_001"]["raw_payload"]["query_aggregation_id"] == "agg_query_pending"
- assert clues["q_001"]["walk_next_step"] == "review_later_or_small_budget"
- assert clues["q_002"]["search_query_effect_status"] == "pending"
- assert clues["q_002"]["effect_status_counts"] == {"pending": 1}
- assert clues["q_002"]["query_aggregation_id"] == "agg_query_pending"
- assert clues["q_002"]["walk_next_step"] == "review_later_or_small_budget"
- def test_rule_blocked_only_query_aggregates_to_rule_blocked():
- runtime = _RecordingRuntime()
- policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
- recorder.run(
- run_id="run_001",
- policy_run_id="policy_run_001",
- search_queries=[
- {
- "search_query_id": "q_001",
- "search_query": "爱国情感",
- "discovery_start_source": "pattern_seed",
- "previous_discovery_step": "search_query_generated",
- }
- ],
- discovered_content_items=[
- {
- "platform_content_id": "content_rule_blocked",
- "search_query_id": "q_001",
- }
- ],
- decisions=[
- {
- "decision_target_id": "content_rule_blocked",
- "decision_action": "REJECT_CONTENT",
- "search_query_effect_status": "rule_blocked",
- }
- ],
- source_path_record_basis=[],
- policy_bundle=policy_bundle,
- runtime=runtime,
- )
- clue = runtime.rows["search_clues.jsonl"][0]
- assert clue["search_query_effect_status"] == "rule_blocked"
- assert clue["query_aggregation_id"] == "agg_query_rule_blocked"
- assert clue["raw_payload"]["query_aggregation_id"] == "agg_query_rule_blocked"
- assert clue["walk_next_step"] == "stop_search_query"
- def test_platform_query_failure_stays_failed(tmp_path):
- service = RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- query_variant_client=FakeQueryVariantClient(),
- )
- service._platform_client = lambda platform, platform_mode: _PartialFailurePlatformClient()
- state = service.start_run(
- RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE))
- )
- failed_clue = next(
- clue for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
- if clue["search_query_id"] == "q_002"
- )
- assert failed_clue["search_query_effect_status"] == "failed"
- assert failed_clue["query_aggregation_id"] == "platform_query_failure"
- assert failed_clue["walk_next_step"] == "stop_search_query"
- class _PartialFailurePlatformClient:
- def search(self, query):
- if query["search_query_id"] == "q_002":
- raise RuntimeError("platform unavailable")
- return []
- class _RecordingRuntime:
- def __init__(self):
- self.rows = {}
- def append_jsonl(self, _run_id, filename, rows):
- self.rows[filename] = rows
|