test_walk_actions_runtime.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. import json
  2. from content_agent.integrations.database_runtime import DatabaseRuntimeStore, RUNTIME_FILE_TABLES
  3. from content_agent.integrations.runtime_files import LocalRuntimeFileStore, RUNTIME_FILENAMES
  4. from tests.test_database_runtime import FakeConnection, _config, _insert_values
  5. def _walk_action_row():
  6. return {
  7. "record_schema_version": "runtime_record.v1",
  8. "run_id": "run_001",
  9. "policy_run_id": "policy_run_001",
  10. "walk_action_id": "wa_001",
  11. "edge_id": "query_next_page",
  12. "edge_type": "pagination",
  13. "from_node_type": "SearchQuery",
  14. "from_node_id": "q_001",
  15. "to_node_type": "SearchPage",
  16. "to_node_id": "page_002",
  17. "walk_action": "fetch_next_page",
  18. "walk_status": "success",
  19. "budget_tier": "normal",
  20. "depth": 1,
  21. "page_cursor": "10",
  22. "next_cursor": "20",
  23. "decision_id": "d_001",
  24. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  25. "rule_pack_version": "1.0.0",
  26. "reason_code": "has_more",
  27. "raw_payload": {
  28. "record_schema_version": "runtime_record.v1",
  29. "run_id": "run_001",
  30. "policy_run_id": "policy_run_001",
  31. "walk_action_id": "wa_001",
  32. },
  33. }
  34. def test_walk_actions_file_is_formal_runtime_contract(tmp_path):
  35. runtime = LocalRuntimeFileStore(tmp_path / "runtime")
  36. runtime.prepare_run("run_001")
  37. runtime.append_jsonl("run_001", "walk_actions.jsonl", [_walk_action_row()])
  38. assert "walk_actions.jsonl" in RUNTIME_FILENAMES
  39. rows = runtime.read_jsonl("run_001", "walk_actions.jsonl")
  40. assert rows[0]["walk_action_id"] == "wa_001"
  41. assert rows[0]["record_schema_version"] == "runtime_record.v1"
  42. def test_database_runtime_writes_walk_actions_to_formal_table():
  43. connection = FakeConnection()
  44. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  45. store.append_jsonl("run_001", "walk_actions.jsonl", [_walk_action_row()])
  46. sql, params = connection.statements[-1]
  47. values = _insert_values(sql, params)
  48. assert RUNTIME_FILE_TABLES["walk_actions.jsonl"] == "content_agent_walk_actions"
  49. assert "INSERT INTO `content_agent_walk_actions`" in sql
  50. assert values["schema_version"] == "content_agent.v1"
  51. assert values["walk_action_id"] == "wa_001"
  52. assert values["edge_id"] == "query_next_page"
  53. assert values["walk_status"] == "success"
  54. assert json.loads(values["raw_payload"])["record_schema_version"] == "runtime_record.v1"
  55. def _m4_decision(decision_id, target_id, action, effect_status):
  56. return {
  57. "record_schema_version": "runtime_record.v1",
  58. "run_id": "run_001",
  59. "policy_run_id": "policy_run_001",
  60. "decision_id": decision_id,
  61. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  62. "rule_pack_version": "1.0.0",
  63. "decision_target_type": "content",
  64. "decision_target_id": target_id,
  65. "decision_action": action,
  66. "decision_reason_code": effect_status,
  67. "search_query_effect_status": effect_status,
  68. "decision_input_snapshot_id": f"evidence:{target_id}",
  69. }
  70. def _m4_item(content_id):
  71. return {
  72. "platform_content_id": content_id,
  73. "search_query_id": "q_001",
  74. "discovery_start_source": "pattern_itemset",
  75. "previous_discovery_step": "search_query_direct",
  76. }
  77. def _run_terminal_stage(action, effect_status):
  78. from content_agent.business_modules.walk_engine import _terminal_stage
  79. from content_agent.integrations.walk_strategy_json import WalkStrategyStore
  80. return _terminal_stage(
  81. {"pattern_execution_id": 581},
  82. [],
  83. [_m4_item("content_001")],
  84. [_m4_decision("d_001", "content_001", action, effect_status)],
  85. WalkStrategyStore().load_walk_strategy(),
  86. "2026-06-11T00:00:00+00:00",
  87. )
  88. def test_walk_action_missing_binding_falls_back_to_content_pack():
  89. # M4 砍包受控变化:扩展边 binding 已删,_walk_action 经 fallback_rule_pack 回退内容包。
  90. from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
  91. from content_agent.integrations.walk_strategy_json import WalkStrategyStore
  92. walk_strategy = WalkStrategyStore().load_walk_strategy()
  93. binding, missing_reason = _resolve_edge_binding("query_next_page", walk_strategy)
  94. assert binding == {}
  95. assert missing_reason == "edge_binding_missing"
  96. row = _walk_action(
  97. "run_001", "policy_run_001", "wa_test", "query_next_page", "pagination",
  98. "SearchQuery", "q_001", "SearchPage", "page_002", "fetch_next_page", "success",
  99. "2026-06-10T00:00:00+00:00",
  100. rule_pack_binding=binding,
  101. rule_pack_execution={"executed": True, "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  102. "reason": "content_decision_reused_for_walk_gate"},
  103. fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
  104. )
  105. assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  106. assert row["rule_pack_version"] == "1.0.0"
  107. assert row["raw_payload"]["rule_pack_binding"] == {}
  108. def test_missing_decision_records_not_executed():
  109. from content_agent.business_modules.walk_engine import _execution_record, _walk_action
  110. execution = _execution_record(None, content_pack_id="douyin_content_discovery_rule_pack_v1")
  111. row = _walk_action(
  112. "run_001", "policy_run_001", "wa_test", "author_to_works", "author",
  113. "Author", "a_001", "AuthorWorksPage", "a_001", "fetch_author_works", "skipped",
  114. "2026-06-10T00:00:00+00:00",
  115. rule_pack_binding={},
  116. rule_pack_execution=execution,
  117. fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
  118. )
  119. assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  120. assert row["raw_payload"]["rule_pack_execution"] == {
  121. "executed": False,
  122. "executed_rule_pack_id": None,
  123. "reason": "future_pack_not_enabled",
  124. }
  125. def test_content_decision_execution_stamp_is_consistent_after_pack_cut():
  126. # M4 砍包受控变化:path_stop 的 future 归属包已删,戳回退到执行包(内容包),两者一致。
  127. result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked")
  128. action = result["walk_actions"][0]
  129. assert action["edge_id"] == "path_stop"
  130. assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  131. execution = action["raw_payload"]["rule_pack_execution"]
  132. assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  133. assert action["raw_payload"]["rule_pack_binding"] == {}
  134. def test_unknown_edge_binding_leaves_rule_pack_id_null_with_reason():
  135. from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
  136. from content_agent.integrations.walk_strategy_json import WalkStrategyStore
  137. walk_strategy = WalkStrategyStore().load_walk_strategy()
  138. binding, missing_reason = _resolve_edge_binding("unknown_edge", walk_strategy)
  139. assert binding == {}
  140. assert missing_reason == "edge_binding_missing"
  141. row = _walk_action(
  142. "run_001", "policy_run_001", "wa_test", "unknown_edge", "unknown",
  143. "Node", "n_001", "Node", "n_002", "noop", "skipped",
  144. "2026-06-10T00:00:00+00:00",
  145. reason_code=missing_reason,
  146. rule_pack_binding=binding,
  147. )
  148. assert row["rule_pack_id"] is None
  149. assert row["reason_code"] == "edge_binding_missing"
  150. def test_path_stop_stamps_content_pack_after_cut():
  151. # M4 砍包受控变化:Path 包已删,path_stop 戳=decision 的内容包。
  152. result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked")
  153. action = result["walk_actions"][0]
  154. assert action["edge_id"] == "path_stop"
  155. assert action["walk_action"] == "stop_path"
  156. assert action["walk_status"] == "skipped"
  157. assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  158. def test_budget_downgrade_stamps_content_pack_after_cut():
  159. # M4 砍包受控变化:Budget 包已删,budget_downgrade 戳=decision 的内容包。
  160. result = _run_terminal_stage("KEEP_CONTENT_FOR_REVIEW", "pending")
  161. action = result["walk_actions"][0]
  162. assert action["edge_id"] == "budget_downgrade"
  163. assert action["budget_tier"] == "low_budget"
  164. assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  165. def test_decision_to_asset_keeps_content_execution_and_asset_edge_owner():
  166. result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success")
  167. action = result["walk_actions"][0]
  168. assert action["edge_id"] == "decision_to_asset"
  169. assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  170. # 唯一保留的 binding(content)仍随终端边落 raw_payload。
  171. assert action["raw_payload"]["rule_pack_binding"]["target_entity"] == "Content"
  172. execution = action["raw_payload"]["rule_pack_execution"]
  173. assert execution["executed"] is True
  174. assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
  175. def test_walk_action_and_source_path_share_binding_contract():
  176. result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success")
  177. action = result["walk_actions"][0]
  178. linked_paths = [
  179. basis for basis in result["source_path_record_basis"]
  180. if basis.get("walk_action_id") == action["walk_action_id"]
  181. ]
  182. assert linked_paths
  183. for basis in linked_paths:
  184. assert basis["rule_pack_binding"] == action["raw_payload"]["rule_pack_binding"]
  185. assert basis["rule_pack_execution"] == action["raw_payload"]["rule_pack_execution"]