test_query_effect_aggregation.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from __future__ import annotations
  2. from content_agent.business_modules.run_record import recorder
  3. from content_agent.integrations.policy_json import JsonPolicyBundleStore
  4. from content_agent.run_service import RunService
  5. from content_agent.schemas import RunStartRequest
  6. from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
  7. def test_search_clues_aggregate_query_effect_status_from_decisions(tmp_path):
  8. service = RunService(
  9. runtime_root=tmp_path / "runtime" / "v1",
  10. query_variant_client=FakeQueryVariantClient(),
  11. )
  12. state = service.start_run(
  13. RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
  14. )
  15. clues = {
  16. clue["search_query_id"]: clue
  17. for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
  18. }
  19. # M3 受控变化: mock content scores relevance 60 + zero platform_heat = 60, so both
  20. # of q_001's contents land in the review band → query aggregates to pending.
  21. assert clues["q_001"]["search_query_effect_status"] == "pending"
  22. assert clues["q_001"]["effect_status_counts"] == {"pending": 2}
  23. assert clues["q_001"]["query_aggregation_id"] == "agg_query_pending"
  24. assert clues["q_001"]["raw_payload"]["query_aggregation_id"] == "agg_query_pending"
  25. assert clues["q_001"]["walk_next_step"] == "review_later_or_small_budget"
  26. assert clues["q_002"]["search_query_effect_status"] == "pending"
  27. assert clues["q_002"]["effect_status_counts"] == {"pending": 1}
  28. assert clues["q_002"]["query_aggregation_id"] == "agg_query_pending"
  29. assert clues["q_002"]["walk_next_step"] == "review_later_or_small_budget"
  30. def test_rule_blocked_only_query_aggregates_to_rule_blocked():
  31. runtime = _RecordingRuntime()
  32. policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
  33. recorder.run(
  34. run_id="run_001",
  35. policy_run_id="policy_run_001",
  36. search_queries=[
  37. {
  38. "search_query_id": "q_001",
  39. "search_query": "爱国情感",
  40. "discovery_start_source": "pattern_seed",
  41. "previous_discovery_step": "search_query_generated",
  42. }
  43. ],
  44. discovered_content_items=[
  45. {
  46. "platform_content_id": "content_rule_blocked",
  47. "search_query_id": "q_001",
  48. }
  49. ],
  50. decisions=[
  51. {
  52. "decision_target_id": "content_rule_blocked",
  53. "decision_action": "REJECT_CONTENT",
  54. "search_query_effect_status": "rule_blocked",
  55. }
  56. ],
  57. source_path_record_basis=[],
  58. policy_bundle=policy_bundle,
  59. runtime=runtime,
  60. )
  61. clue = runtime.rows["search_clues.jsonl"][0]
  62. assert clue["search_query_effect_status"] == "rule_blocked"
  63. assert clue["query_aggregation_id"] == "agg_query_rule_blocked"
  64. assert clue["raw_payload"]["query_aggregation_id"] == "agg_query_rule_blocked"
  65. assert clue["walk_next_step"] == "stop_search_query"
  66. def test_platform_query_failure_stays_failed(tmp_path):
  67. service = RunService(
  68. runtime_root=tmp_path / "runtime" / "v1",
  69. query_variant_client=FakeQueryVariantClient(),
  70. )
  71. service._platform_client = lambda platform, platform_mode: _PartialFailurePlatformClient()
  72. state = service.start_run(
  73. RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE))
  74. )
  75. failed_clue = next(
  76. clue for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
  77. if clue["search_query_id"] == "q_002"
  78. )
  79. assert failed_clue["search_query_effect_status"] == "failed"
  80. assert failed_clue["query_aggregation_id"] == "platform_query_failure"
  81. assert failed_clue["walk_next_step"] == "stop_search_query"
  82. class _PartialFailurePlatformClient:
  83. def search(self, query):
  84. if query["search_query_id"] == "q_002":
  85. raise RuntimeError("platform unavailable")
  86. return []
  87. class _RecordingRuntime:
  88. def __init__(self):
  89. self.rows = {}
  90. def append_jsonl(self, _run_id, filename, rows):
  91. self.rows[filename] = rows