test_rule_decision_effect_status.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from __future__ import annotations
  2. from copy import deepcopy
  3. from content_agent.business_modules.rule_judgment.evaluator import decide
  4. from content_agent.run_service import RunService
  5. from content_agent.schemas import RunStartRequest
  6. from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
  7. def test_effect_status_mapping_for_success_pending_failed_and_rule_blocked(tmp_path):
  8. state = _state(tmp_path)
  9. # M3: success = relevance 60 + platform_heat 40 (heat >= 0.8) = 100 (>= 70 pool).
  10. success_bundle = deepcopy(state["evidence_bundles"][0])
  11. success_bundle["content_engagement_metrics"]["platform_heat"] = 0.9
  12. success = decide(
  13. state["run_id"],
  14. state["policy_run_id"],
  15. 1,
  16. success_bundle,
  17. state["policy_bundle"],
  18. )
  19. assert success["decision_action"] == "ADD_TO_CONTENT_POOL"
  20. assert success["search_query_effect_status"] == "success"
  21. assert success["decision_replay_data"]["effect_mapping_id"] == "map_add_to_pool_success"
  22. # M3: pending = relevance 60 + zero platform_heat = 60 (60-69 review band).
  23. pending_bundle = deepcopy(state["evidence_bundles"][0])
  24. pending_bundle["content_engagement_metrics"]["platform_heat"] = 0.0
  25. pending = decide(
  26. state["run_id"],
  27. state["policy_run_id"],
  28. 2,
  29. pending_bundle,
  30. state["policy_bundle"],
  31. )
  32. assert pending["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
  33. assert pending["search_query_effect_status"] == "pending"
  34. assert pending["decision_replay_data"]["effect_mapping_id"] == "map_keep_for_review_pending"
  35. # M3: failed = no scoring rule matches either active dimension (relevance and
  36. # platform_heat both below their lowest gte band) → missing_score.
  37. failed_bundle = deepcopy(state["evidence_bundles"][0])
  38. failed_bundle["pattern_match_result"]["relevance_score"] = 0.0
  39. failed_bundle["content_engagement_metrics"]["platform_heat"] = 0.0
  40. failed = decide(
  41. state["run_id"],
  42. state["policy_run_id"],
  43. 3,
  44. failed_bundle,
  45. state["policy_bundle"],
  46. )
  47. assert failed["decision_reason_code"] == "missing_score"
  48. assert failed["search_query_effect_status"] == "failed"
  49. assert failed["decision_replay_data"]["effect_mapping_id"] == "map_reject_failed"
  50. blocked_bundle = deepcopy(state["evidence_bundles"][0])
  51. blocked_bundle["source_evidence"] = {}
  52. blocked = decide(
  53. state["run_id"],
  54. state["policy_run_id"],
  55. 4,
  56. blocked_bundle,
  57. state["policy_bundle"],
  58. )
  59. assert blocked["decision_reason_code"] == "missing_source_evidence"
  60. assert blocked["search_query_effect_status"] == "rule_blocked"
  61. assert blocked["decision_replay_data"]["effect_mapping_id"] == "map_reject_rule_blocked"
  62. def _state(tmp_path):
  63. service = RunService(
  64. runtime_root=tmp_path / "runtime" / "v1",
  65. query_variant_client=FakeQueryVariantClient(),
  66. )
  67. return service.start_run(
  68. RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
  69. )