test_walk_engine_budget.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from content_agent.business_modules.walk_engine import _terminal_stage
  2. from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
  3. from content_agent.integrations.walk_strategy_json import WalkStrategyStore
  4. def test_terminal_stage_maps_p5_decisions_to_p6_budget_and_stop_actions():
  5. result = _terminal_stage(
  6. {"pattern_execution_id": 581},
  7. [],
  8. [
  9. {
  10. "platform_content_id": "content_success",
  11. "search_query_id": "q_001",
  12. "discovery_start_source": "pattern_itemset",
  13. "previous_discovery_step": "search_query_direct",
  14. },
  15. {
  16. "platform_content_id": "content_pending",
  17. "search_query_id": "q_001",
  18. "discovery_start_source": "pattern_itemset",
  19. "previous_discovery_step": "search_query_direct",
  20. },
  21. {
  22. "platform_content_id": "content_blocked",
  23. "search_query_id": "q_001",
  24. "discovery_start_source": "pattern_itemset",
  25. "previous_discovery_step": "search_query_direct",
  26. },
  27. ],
  28. [
  29. _decision("d_001", "content_success", "ADD_TO_CONTENT_POOL", "success"),
  30. _decision("d_002", "content_pending", "KEEP_CONTENT_FOR_REVIEW", "pending"),
  31. _decision("d_003", "content_blocked", "REJECT_CONTENT", "rule_blocked"),
  32. ],
  33. WalkStrategyStore().load_walk_strategy(),
  34. "2026-06-11T00:00:00+00:00",
  35. )
  36. by_target = {row["decision_target_id"]: row for row in result["walk_actions"]}
  37. assert by_target["content_success"]["walk_action"] == "commit_asset"
  38. assert by_target["content_pending"]["walk_action"] == "downgrade_budget"
  39. assert by_target["content_pending"]["budget_tier"] == "low_budget"
  40. assert by_target["content_blocked"]["walk_action"] == "stop_path"
  41. assert by_target["content_blocked"]["walk_status"] == "skipped"
  42. def _decision(decision_id, target_id, action, effect_status):
  43. return {
  44. "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
  45. "run_id": "run_001",
  46. "policy_run_id": "policy_run_001",
  47. "decision_id": decision_id,
  48. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  49. "rule_pack_version": "1.0.0",
  50. "decision_target_type": "content",
  51. "decision_target_id": target_id,
  52. "decision_action": action,
  53. "decision_reason_code": effect_status,
  54. "search_query_effect_status": effect_status,
  55. "decision_input_snapshot_id": f"evidence:{target_id}",
  56. }
  57. def test_keep_review_author_edge_uses_low_budget(tmp_path):
  58. from content_agent.business_modules.walk_engine import run_bounded_walk
  59. from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
  60. context = build_initial_walk_context(tmp_path)
  61. for decision in context["rule_decisions"]:
  62. decision["decision_action"] = "KEEP_CONTENT_FOR_REVIEW"
  63. decision["search_query_effect_status"] = "pending"
  64. client = FakeWalkPlatformClient()
  65. result = run_bounded_walk(platform_client=client, **context)
  66. author_actions = [
  67. row for row in result["walk_actions"] if row["edge_id"] == "author_to_works"
  68. ]
  69. assert author_actions
  70. assert all(row["budget_tier"] == "low_budget" for row in author_actions)
  71. def _walk_configs():
  72. from content_agent.integrations.walk_graph_json import WalkGraphStore
  73. store = WalkGraphStore()
  74. return store.load_policy(), store.load_profile("douyin"), WalkStrategyStore().load_walk_strategy()
  75. def _item(content_id, author_id, *, search_query_id="q_001", has_more=False, cursor=None):
  76. return {
  77. "platform_content_id": content_id,
  78. "platform_author_id": author_id,
  79. "search_query_id": search_query_id,
  80. "discovery_start_source": "pattern_itemset",
  81. "previous_discovery_step": "search_query_direct",
  82. "has_more": has_more,
  83. "next_cursor": cursor,
  84. "tags": [],
  85. }
  86. class _EmptyAuthorWorksClient:
  87. def fetch_author_works(self, query):
  88. return []
  89. def test_author_budget_overflow_emits_budget_exhausted_skip():
  90. # R8(2026-06-12 拍板): 预算外的合格作者不再静默消失,必须留 budget_exhausted skip。
  91. from content_agent.business_modules.walk_engine import _expand_authors
  92. policy, profile, walk_strategy = _walk_configs()
  93. budget = policy["edge_budgets_by_id"]["author_to_works"]["max_total_actions"]
  94. items = [_item(f"content_{i}", f"author_{i:03d}") for i in range(1, 5)]
  95. decisions = [
  96. _decision(f"d_{i:03d}", f"content_{i}", "ADD_TO_CONTENT_POOL", "success")
  97. for i in range(1, 5)
  98. ]
  99. result = _expand_authors(
  100. run_id="run_001",
  101. policy_run_id="policy_run_001",
  102. source_context={},
  103. discovered_content_items=items,
  104. rule_decisions=decisions,
  105. policy=policy,
  106. profile=profile,
  107. walk_strategy=walk_strategy,
  108. policy_bundle={},
  109. content_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
  110. platform_client=_EmptyAuthorWorksClient(),
  111. runtime=None,
  112. gemini_video_client=None,
  113. start_recall_index=0,
  114. start_decision_index=0,
  115. created_at="2026-06-12T00:00:00+00:00",
  116. )
  117. actions = result["walk_actions"]
  118. fetched = [row for row in actions if row["walk_status"] == "success"]
  119. skipped = [row for row in actions if row["reason_code"] == "budget_exhausted"]
  120. assert len(fetched) == budget
  121. assert len(skipped) == 4 - budget
  122. assert all(row["walk_status"] == "skipped" and row["budget_tier"] == "blocked" for row in skipped)
  123. assert len({row["walk_action_id"] for row in actions}) == len(actions)
  124. def test_page_budget_overflow_emits_budget_exhausted_skip():
  125. # R8: 翻页预算耗尽时,仍有下一页可翻的合格 query 必须留 budget_exhausted skip。
  126. from content_agent.business_modules.walk_engine import _expand_queries
  127. policy, profile, walk_strategy = _walk_configs()
  128. budget = policy["edge_budgets_by_id"]["query_next_page"]["max_total_actions"]
  129. search_queries = [
  130. {"search_query_id": f"q_{i:03d}", "search_query": f"词{i}", "discovery_start_source": "pattern_itemset"}
  131. for i in range(1, 6)
  132. ]
  133. items = [
  134. _item(f"content_{i}", f"author_{i:03d}", search_query_id=f"q_{i:03d}", has_more=True, cursor="10")
  135. for i in range(1, 6)
  136. ]
  137. decisions = [
  138. _decision(f"d_{i:03d}", f"content_{i}", "ADD_TO_CONTENT_POOL", "success")
  139. for i in range(1, 6)
  140. ]
  141. rows, skipped_actions = _expand_queries(
  142. "run_001",
  143. "policy_run_001",
  144. search_queries,
  145. items,
  146. decisions,
  147. "2026-06-12T00:00:00+00:00",
  148. policy=policy,
  149. profile=profile,
  150. walk_strategy=walk_strategy,
  151. content_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
  152. )
  153. page_rows = [row for row in rows if row["search_query_generation_method"] == "query_next_page"]
  154. page_skips = [
  155. row for row in skipped_actions
  156. if row["edge_id"] == "query_next_page" and row["reason_code"] == "budget_exhausted"
  157. ]
  158. assert len(page_rows) == budget
  159. assert len(page_skips) == 5 - budget
  160. assert all(row["walk_status"] == "skipped" and row["budget_tier"] == "blocked" for row in page_skips)