test_query_effect_aggregation.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. from __future__ import annotations
  2. from content_agent.business_modules.run_record import recorder
  3. from content_agent.integrations.policy_json import JsonPolicyBundleStore
  4. from content_agent.run_service import RunService
  5. from content_agent.schemas import RunStartRequest
  6. from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
  7. def test_search_clues_aggregate_query_effect_status_from_decisions(tmp_path):
  8. service = RunService(
  9. runtime_root=tmp_path / "runtime" / "v1",
  10. query_variant_client=FakeQueryVariantClient(),
  11. )
  12. state = service.start_run(
  13. RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
  14. )
  15. clues = {
  16. clue["search_query_id"]: clue
  17. for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
  18. }
  19. # M11 re-baseline:q_001 首条候选(总分 55.67)进复看(pending),该 query 由"全失败"升为"含复看"。
  20. assert clues["q_001"]["search_query_effect_status"] == "pending"
  21. assert clues["q_001"]["effect_status_counts"] == {"pending": 1, "failed": 1}
  22. assert clues["q_001"]["query_aggregation_id"] == "agg_query_pending"
  23. assert clues["q_001"]["raw_payload"]["query_aggregation_id"] == "agg_query_pending"
  24. assert clues["q_001"]["walk_next_step"] == "review_later_or_small_budget"
  25. assert clues["q_002"]["search_query_effect_status"] == "failed"
  26. assert clues["q_002"]["effect_status_counts"] == {"failed": 1}
  27. assert clues["q_002"]["query_aggregation_id"] == "agg_query_failed"
  28. assert clues["q_002"]["walk_next_step"] == "stop_search_query"
  29. def test_rule_blocked_only_query_aggregates_to_rule_blocked():
  30. runtime = _RecordingRuntime()
  31. policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
  32. recorder.run(
  33. run_id="run_001",
  34. policy_run_id="policy_run_001",
  35. search_queries=[
  36. {
  37. "search_query_id": "q_001",
  38. "search_query": "爱国情感",
  39. "discovery_start_source": "pattern_seed",
  40. "previous_discovery_step": "search_query_generated",
  41. }
  42. ],
  43. discovered_content_items=[
  44. {
  45. "platform_content_id": "content_rule_blocked",
  46. "search_query_id": "q_001",
  47. }
  48. ],
  49. decisions=[
  50. {
  51. "decision_target_id": "content_rule_blocked",
  52. "decision_action": "REJECT_CONTENT",
  53. "search_query_effect_status": "rule_blocked",
  54. }
  55. ],
  56. source_path_record_basis=[],
  57. policy_bundle=policy_bundle,
  58. runtime=runtime,
  59. )
  60. clue = runtime.rows["search_clues.jsonl"][0]
  61. assert clue["search_query_effect_status"] == "rule_blocked"
  62. assert clue["query_aggregation_id"] == "agg_query_rule_blocked"
  63. assert clue["raw_payload"]["query_aggregation_id"] == "agg_query_rule_blocked"
  64. assert clue["walk_next_step"] == "stop_search_query"
  65. def test_platform_query_failure_stays_failed(tmp_path):
  66. service = RunService(
  67. runtime_root=tmp_path / "runtime" / "v1",
  68. query_variant_client=FakeQueryVariantClient(),
  69. )
  70. service._platform_client = lambda platform, platform_mode: _PartialFailurePlatformClient()
  71. state = service.start_run(
  72. RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE))
  73. )
  74. failed_clue = next(
  75. clue for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
  76. if clue["search_query_id"] == "q_002"
  77. )
  78. assert failed_clue["search_query_effect_status"] == "failed"
  79. assert failed_clue["query_aggregation_id"] == "platform_query_failure"
  80. assert failed_clue["walk_next_step"] == "stop_search_query"
  81. class _PartialFailurePlatformClient:
  82. def search(self, query):
  83. if query["search_query_id"] == "q_002":
  84. raise RuntimeError("platform unavailable")
  85. return []
  86. class _RecordingRuntime:
  87. def __init__(self):
  88. self.rows = {}
  89. def append_jsonl(self, _run_id, filename, rows):
  90. self.rows[filename] = rows