| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import json
- from content_agent.integrations.database_runtime import DatabaseRuntimeStore, RUNTIME_FILE_TABLES
- from content_agent.integrations.runtime_files import LocalRuntimeFileStore, RUNTIME_FILENAMES
- from tests.test_database_runtime import FakeConnection, _config, _insert_values
- def _walk_action_row():
- return {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "walk_action_id": "wa_001",
- "edge_id": "query_next_page",
- "edge_type": "pagination",
- "from_node_type": "SearchQuery",
- "from_node_id": "q_001",
- "to_node_type": "SearchPage",
- "to_node_id": "page_002",
- "walk_action": "fetch_next_page",
- "walk_status": "success",
- "budget_tier": "normal",
- "depth": 1,
- "page_cursor": "10",
- "next_cursor": "20",
- "decision_id": "d_001",
- "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
- "rule_pack_version": "1.0.0",
- "reason_code": "has_more",
- "raw_payload": {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "walk_action_id": "wa_001",
- },
- }
- def test_walk_actions_file_is_formal_runtime_contract(tmp_path):
- runtime = LocalRuntimeFileStore(tmp_path / "runtime")
- runtime.prepare_run("run_001")
- runtime.append_jsonl("run_001", "walk_actions.jsonl", [_walk_action_row()])
- assert "walk_actions.jsonl" in RUNTIME_FILENAMES
- rows = runtime.read_jsonl("run_001", "walk_actions.jsonl")
- assert rows[0]["walk_action_id"] == "wa_001"
- assert rows[0]["record_schema_version"] == "runtime_record.v1"
- def test_database_runtime_writes_walk_actions_to_formal_table():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.append_jsonl("run_001", "walk_actions.jsonl", [_walk_action_row()])
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert RUNTIME_FILE_TABLES["walk_actions.jsonl"] == "content_agent_walk_actions"
- assert "INSERT INTO `content_agent_walk_actions`" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert values["walk_action_id"] == "wa_001"
- assert values["edge_id"] == "query_next_page"
- assert values["walk_status"] == "success"
- assert json.loads(values["raw_payload"])["record_schema_version"] == "runtime_record.v1"
- def _m4_decision(decision_id, target_id, action, effect_status):
- return {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "decision_id": decision_id,
- "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
- "rule_pack_version": "1.0.0",
- "decision_target_type": "content",
- "decision_target_id": target_id,
- "decision_action": action,
- "decision_reason_code": effect_status,
- "search_query_effect_status": effect_status,
- "decision_input_snapshot_id": f"evidence:{target_id}",
- }
- def _m4_item(content_id):
- return {
- "platform_content_id": content_id,
- "search_query_id": "q_001",
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_direct",
- }
- def _run_terminal_stage(action, effect_status):
- from content_agent.business_modules.walk_engine import _terminal_stage
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- return _terminal_stage(
- {"pattern_execution_id": 581},
- [],
- [_m4_item("content_001")],
- [_m4_decision("d_001", "content_001", action, effect_status)],
- WalkStrategyStore().load_walk_strategy(),
- "2026-06-11T00:00:00+00:00",
- )
- def test_walk_action_missing_binding_falls_back_to_content_pack():
- # M4 砍包受控变化:扩展边 binding 已删,_walk_action 经 fallback_rule_pack 回退内容包。
- from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- walk_strategy = WalkStrategyStore().load_walk_strategy()
- binding, missing_reason = _resolve_edge_binding("query_next_page", walk_strategy)
- assert binding == {}
- assert missing_reason == "edge_binding_missing"
- row = _walk_action(
- "run_001", "policy_run_001", "wa_test", "query_next_page", "pagination",
- "SearchQuery", "q_001", "SearchPage", "page_002", "fetch_next_page", "success",
- "2026-06-10T00:00:00+00:00",
- rule_pack_binding=binding,
- rule_pack_execution={"executed": True, "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1",
- "reason": "content_decision_reused_for_walk_gate"},
- fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
- )
- assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- assert row["rule_pack_version"] == "1.0.0"
- assert row["raw_payload"]["rule_pack_binding"] == {}
- def test_missing_decision_records_not_executed():
- from content_agent.business_modules.walk_engine import _execution_record, _walk_action
- execution = _execution_record(None, content_pack_id="douyin_content_discovery_rule_pack_v1")
- row = _walk_action(
- "run_001", "policy_run_001", "wa_test", "author_to_works", "author",
- "Author", "a_001", "AuthorWorksPage", "a_001", "fetch_author_works", "skipped",
- "2026-06-10T00:00:00+00:00",
- rule_pack_binding={},
- rule_pack_execution=execution,
- fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
- )
- assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- assert row["raw_payload"]["rule_pack_execution"] == {
- "executed": False,
- "executed_rule_pack_id": None,
- "reason": "future_pack_not_enabled",
- }
- def test_content_decision_execution_stamp_is_consistent_after_pack_cut():
- # M4 砍包受控变化:path_stop 的 future 归属包已删,戳回退到执行包(内容包),两者一致。
- result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked")
- action = result["walk_actions"][0]
- assert action["edge_id"] == "path_stop"
- assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- execution = action["raw_payload"]["rule_pack_execution"]
- assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- assert action["raw_payload"]["rule_pack_binding"] == {}
- def test_unknown_edge_binding_leaves_rule_pack_id_null_with_reason():
- from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- walk_strategy = WalkStrategyStore().load_walk_strategy()
- binding, missing_reason = _resolve_edge_binding("unknown_edge", walk_strategy)
- assert binding == {}
- assert missing_reason == "edge_binding_missing"
- row = _walk_action(
- "run_001", "policy_run_001", "wa_test", "unknown_edge", "unknown",
- "Node", "n_001", "Node", "n_002", "noop", "skipped",
- "2026-06-10T00:00:00+00:00",
- reason_code=missing_reason,
- rule_pack_binding=binding,
- )
- assert row["rule_pack_id"] is None
- assert row["reason_code"] == "edge_binding_missing"
- def test_path_stop_stamps_content_pack_after_cut():
- # M4 砍包受控变化:Path 包已删,path_stop 戳=decision 的内容包。
- result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked")
- action = result["walk_actions"][0]
- assert action["edge_id"] == "path_stop"
- assert action["walk_action"] == "stop_path"
- assert action["walk_status"] == "skipped"
- assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- def test_budget_downgrade_stamps_content_pack_after_cut():
- # M4 砍包受控变化:Budget 包已删,budget_downgrade 戳=decision 的内容包。
- result = _run_terminal_stage("KEEP_CONTENT_FOR_REVIEW", "pending")
- action = result["walk_actions"][0]
- assert action["edge_id"] == "budget_downgrade"
- assert action["budget_tier"] == "low_budget"
- assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- def test_decision_to_asset_keeps_content_execution_and_asset_edge_owner():
- result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success")
- action = result["walk_actions"][0]
- assert action["edge_id"] == "decision_to_asset"
- assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- # 唯一保留的 binding(content)仍随终端边落 raw_payload。
- assert action["raw_payload"]["rule_pack_binding"]["target_entity"] == "Content"
- execution = action["raw_payload"]["rule_pack_execution"]
- assert execution["executed"] is True
- assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- def test_walk_action_and_source_path_share_binding_contract():
- result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success")
- action = result["walk_actions"][0]
- linked_paths = [
- basis for basis in result["source_path_record_basis"]
- if basis.get("walk_action_id") == action["walk_action_id"]
- ]
- assert linked_paths
- for basis in linked_paths:
- assert basis["rule_pack_binding"] == action["raw_payload"]["rule_pack_binding"]
- assert basis["rule_pack_execution"] == action["raw_payload"]["rule_pack_execution"]
|