test_walk_engine_author.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. from content_agent.business_modules.walk_engine import run_bounded_walk
  2. from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
  3. def test_walk_engine_author_works_reenter_content_decision_flow(tmp_path):
  4. context = build_initial_walk_context(tmp_path)
  5. client = FakeWalkPlatformClient()
  6. result = run_bounded_walk(platform_client=client, **context)
  7. assert client.author_calls
  8. assert any(item["previous_discovery_step"] == "author_works" for item in result["discovered_content_items"])
  9. assert all(decision["decision_target_type"] == "content" for decision in result["rule_decisions"])
  10. assert any(row["edge_id"] == "author_to_works" for row in result["walk_actions"])
  11. def test_walk_engine_author_edge_skips_missing_author_id(tmp_path):
  12. context = build_initial_walk_context(tmp_path)
  13. context["discovered_content_items"][0]["platform_author_id"] = ""
  14. context["discovered_content_items"][0]["has_more"] = False
  15. context["discovered_content_items"][0]["tags"] = []
  16. client = FakeWalkPlatformClient()
  17. run_bounded_walk(platform_client=client, **context)
  18. assert client.author_calls == []
  19. def _override_decisions(context, action, effect_status):
  20. for decision in context["rule_decisions"]:
  21. decision["decision_action"] = action
  22. decision["search_query_effect_status"] = effect_status
  23. def test_author_edge_skips_rejected_content(tmp_path):
  24. context = build_initial_walk_context(tmp_path)
  25. _override_decisions(context, "REJECT_CONTENT", "rule_blocked")
  26. client = FakeWalkPlatformClient()
  27. result = run_bounded_walk(platform_client=client, **context)
  28. assert client.author_calls == []
  29. skipped = [
  30. row for row in result["walk_actions"]
  31. if row["edge_id"] == "author_to_works" and row["walk_status"] == "skipped"
  32. ]
  33. assert len(skipped) == 1
  34. assert skipped[0]["reason_code"] == "blocked_by_rule_decision"
  35. assert skipped[0]["budget_tier"] == "blocked"
  36. assert skipped[0]["raw_payload"]["decision_action"] == "REJECT_CONTENT"
  37. def test_author_edge_allows_add_content_pool(tmp_path):
  38. context = build_initial_walk_context(tmp_path)
  39. # M3: mock judgment scores 60 (review/low_budget); promote the seed decision to
  40. # pool so the author edge runs at normal budget, which is what this test asserts.
  41. _override_decisions(context, "ADD_TO_CONTENT_POOL", "success")
  42. client = FakeWalkPlatformClient()
  43. result = run_bounded_walk(platform_client=client, **context)
  44. assert client.author_calls
  45. author_actions = [
  46. row for row in result["walk_actions"]
  47. if row["edge_id"] == "author_to_works" and row["walk_status"] == "success"
  48. ]
  49. assert author_actions
  50. assert author_actions[0]["budget_tier"] == "normal"
  51. # M4 砍包受控变化:future 包 binding 已删,戳回退内容包(=executed_rule_pack_id)。
  52. assert author_actions[0]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  53. def test_author_edge_keeps_review_low_budget(tmp_path):
  54. context = build_initial_walk_context(tmp_path)
  55. _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending")
  56. client = FakeWalkPlatformClient()
  57. result = run_bounded_walk(platform_client=client, **context)
  58. assert client.author_calls
  59. author_actions = [
  60. row for row in result["walk_actions"]
  61. if row["edge_id"] == "author_to_works" and row["walk_status"] == "success"
  62. ]
  63. assert author_actions
  64. assert author_actions[0]["budget_tier"] == "low_budget"
  65. assert author_actions[0]["raw_payload"]["rule_pack_execution"]["executed"] is True
  66. def test_author_works_skip_already_discovered_content(tmp_path):
  67. # 真实 E2E(v1_run_e6ba21f7543b)实证:作者近期作品包含首轮已发现的同一条视频,
  68. # 不去重会撞 DB 唯一索引 uk_ca_items_run_policy_content。
  69. context = build_initial_walk_context(tmp_path)
  70. context["discovered_content_items"][0]["has_more"] = False
  71. context["discovered_content_items"][0]["tags"] = []
  72. first_round_id = context["discovered_content_items"][0]["platform_content_id"]
  73. class OverlappingWorksClient(FakeWalkPlatformClient):
  74. def fetch_author_works(self, query):
  75. self.author_calls.append(dict(query))
  76. duplicate = _dup_platform_result(query, first_round_id)
  77. fresh = _dup_platform_result(query, "7390000000000000399")
  78. return [duplicate, fresh]
  79. client = OverlappingWorksClient()
  80. result = run_bounded_walk(platform_client=client, **context)
  81. assert client.author_calls
  82. content_ids = [item["platform_content_id"] for item in result["discovered_content_items"]]
  83. # 重复视频不得二次进入 discovered;新作品正常进入。
  84. assert content_ids.count(first_round_id) == 1
  85. assert "7390000000000000399" in content_ids
  86. def _dup_platform_result(query, platform_content_id):
  87. from tests.p6_walk_helpers import _platform_result
  88. return _platform_result(query, platform_content_id, "作者作品", [])
  89. def test_author_works_have_search_query_lineage(tmp_path):
  90. # 真实 E2E(v1_run_3a3bc9f0d72d)实证:作者作品内容引用合成 query id 但
  91. # search_queries 无此行,validate_run 报 missing_search_query_ref 等 8 条 fail。
  92. # 血缘补全后:query 行落盘 → pattern_to_search_query 路径与 search_clue 经既有机制生成。
  93. from content_agent.business_modules import run_record
  94. from content_agent.business_modules.run_record.validation import validate_run
  95. context = build_initial_walk_context(tmp_path)
  96. context["discovered_content_items"][0]["has_more"] = False
  97. context["discovered_content_items"][0]["tags"] = []
  98. client = FakeWalkPlatformClient()
  99. result = run_bounded_walk(platform_client=client, **context)
  100. author_queries = [
  101. q for q in result["search_queries"]
  102. if q["search_query_generation_method"] == "author_works"
  103. ]
  104. assert len(author_queries) == 1
  105. author_query = author_queries[0]
  106. assert author_query["search_query_id"].startswith("author_")
  107. assert author_query["previous_discovery_step"] == "author_to_works"
  108. # 作者作品内容引用的 query id 必须真实存在。
  109. author_items = [
  110. i for i in result["discovered_content_items"]
  111. if i.get("previous_discovery_step") == "author_works"
  112. ]
  113. assert author_items
  114. assert all(i["search_query_id"] == author_query["search_query_id"] for i in author_items)
  115. # 端到端血缘:M4 后终端边+血缘由 run_bounded_walk 一体产出,record_run 后 validate_run 必须 pass。
  116. record = run_record.run(
  117. context["run_id"],
  118. context["policy_run_id"],
  119. result["search_queries"],
  120. result["discovered_content_items"],
  121. result["rule_decisions"],
  122. result["source_path_record_basis"],
  123. context["policy_bundle"],
  124. context["runtime"],
  125. walk_actions=result["walk_actions"],
  126. )
  127. from content_agent.business_modules import learning_review, result_source_lookup
  128. result_source_lookup.run(
  129. context["run_id"],
  130. context["policy_run_id"],
  131. context["policy_bundle"],
  132. result["discovered_content_items"],
  133. result["content_media_records"],
  134. result["rule_decisions"],
  135. record["source_path_records"],
  136. record["search_clues"],
  137. context["runtime"],
  138. )
  139. learning_review.run(context["run_id"], context["policy_run_id"], context["runtime"])
  140. validation = validate_run(context["run_id"], context["runtime"])
  141. fails = [f for f in validation["findings"] if f["level"] == "fail"]
  142. assert validation["status"] == "pass", fails