from __future__ import annotations from content_agent.business_modules.run_record import recorder from content_agent.integrations.policy_json import JsonPolicyBundleStore from content_agent.run_service import RunService from content_agent.schemas import RunStartRequest from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE def test_search_clues_aggregate_query_effect_status_from_decisions(tmp_path): service = RunService( runtime_root=tmp_path / "runtime" / "v1", query_variant_client=FakeQueryVariantClient(), ) state = service.start_run( RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE)) ) clues = { clue["search_query_id"]: clue for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl") } # M3 受控变化: mock content scores relevance 60 + zero platform_heat = 60, so both # of q_001's contents land in the review band → query aggregates to pending. assert clues["q_001"]["search_query_effect_status"] == "pending" assert clues["q_001"]["effect_status_counts"] == {"pending": 2} assert clues["q_001"]["query_aggregation_id"] == "agg_query_pending" assert clues["q_001"]["raw_payload"]["query_aggregation_id"] == "agg_query_pending" assert clues["q_001"]["walk_next_step"] == "review_later_or_small_budget" assert clues["q_002"]["search_query_effect_status"] == "pending" assert clues["q_002"]["effect_status_counts"] == {"pending": 1} assert clues["q_002"]["query_aggregation_id"] == "agg_query_pending" assert clues["q_002"]["walk_next_step"] == "review_later_or_small_budget" def test_rule_blocked_only_query_aggregates_to_rule_blocked(): runtime = _RecordingRuntime() policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V1") recorder.run( run_id="run_001", policy_run_id="policy_run_001", search_queries=[ { "search_query_id": "q_001", "search_query": "爱国情感", "discovery_start_source": "pattern_seed", "previous_discovery_step": "search_query_generated", } ], discovered_content_items=[ { "platform_content_id": "content_rule_blocked", "search_query_id": "q_001", } ], decisions=[ { "decision_target_id": "content_rule_blocked", "decision_action": "REJECT_CONTENT", "search_query_effect_status": "rule_blocked", } ], source_path_record_basis=[], policy_bundle=policy_bundle, runtime=runtime, ) clue = runtime.rows["search_clues.jsonl"][0] assert clue["search_query_effect_status"] == "rule_blocked" assert clue["query_aggregation_id"] == "agg_query_rule_blocked" assert clue["raw_payload"]["query_aggregation_id"] == "agg_query_rule_blocked" assert clue["walk_next_step"] == "stop_search_query" def test_platform_query_failure_stays_failed(tmp_path): service = RunService( runtime_root=tmp_path / "runtime" / "v1", query_variant_client=FakeQueryVariantClient(), ) service._platform_client = lambda platform, platform_mode: _PartialFailurePlatformClient() state = service.start_run( RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE)) ) failed_clue = next( clue for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl") if clue["search_query_id"] == "q_002" ) assert failed_clue["search_query_effect_status"] == "failed" assert failed_clue["query_aggregation_id"] == "platform_query_failure" assert failed_clue["walk_next_step"] == "stop_search_query" class _PartialFailurePlatformClient: def search(self, query): if query["search_query_id"] == "q_002": raise RuntimeError("platform unavailable") return [] class _RecordingRuntime: def __init__(self): self.rows = {} def append_jsonl(self, _run_id, filename, rows): self.rows[filename] = rows