from __future__ import annotations from copy import deepcopy from content_agent.business_modules.rule_judgment.evaluator import decide from content_agent.run_service import RunService from content_agent.schemas import RunStartRequest from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE def test_effect_status_mapping_for_success_pending_failed_and_rule_blocked(tmp_path): state = _state(tmp_path) # M3: success = relevance 60 + platform_heat 40 (heat >= 0.8) = 100 (>= 70 pool). success_bundle = deepcopy(state["evidence_bundles"][0]) success_bundle["content_engagement_metrics"]["platform_heat"] = 0.9 success = decide( state["run_id"], state["policy_run_id"], 1, success_bundle, state["policy_bundle"], ) assert success["decision_action"] == "ADD_TO_CONTENT_POOL" assert success["search_query_effect_status"] == "success" assert success["decision_replay_data"]["effect_mapping_id"] == "map_add_to_pool_success" # M3: pending = relevance 60 + zero platform_heat = 60 (60-69 review band). pending_bundle = deepcopy(state["evidence_bundles"][0]) pending_bundle["content_engagement_metrics"]["platform_heat"] = 0.0 pending = decide( state["run_id"], state["policy_run_id"], 2, pending_bundle, state["policy_bundle"], ) assert pending["decision_action"] == "KEEP_CONTENT_FOR_REVIEW" assert pending["search_query_effect_status"] == "pending" assert pending["decision_replay_data"]["effect_mapping_id"] == "map_keep_for_review_pending" # M3: failed = no scoring rule matches either active dimension (relevance and # platform_heat both below their lowest gte band) → missing_score. failed_bundle = deepcopy(state["evidence_bundles"][0]) failed_bundle["pattern_match_result"]["relevance_score"] = 0.0 failed_bundle["content_engagement_metrics"]["platform_heat"] = 0.0 failed = decide( state["run_id"], state["policy_run_id"], 3, failed_bundle, state["policy_bundle"], ) assert failed["decision_reason_code"] == "missing_score" assert failed["search_query_effect_status"] == "failed" assert failed["decision_replay_data"]["effect_mapping_id"] == "map_reject_failed" blocked_bundle = deepcopy(state["evidence_bundles"][0]) blocked_bundle["source_evidence"] = {} blocked = decide( state["run_id"], state["policy_run_id"], 4, blocked_bundle, state["policy_bundle"], ) assert blocked["decision_reason_code"] == "missing_source_evidence" assert blocked["search_query_effect_status"] == "rule_blocked" assert blocked["decision_replay_data"]["effect_mapping_id"] == "map_reject_rule_blocked" def _state(tmp_path): service = RunService( runtime_root=tmp_path / "runtime" / "v1", query_variant_client=FakeQueryVariantClient(), ) return service.start_run( RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE)) )