import json from content_agent.integrations.database_runtime import DatabaseRuntimeStore, RUNTIME_FILE_TABLES from content_agent.integrations.runtime_files import LocalRuntimeFileStore, RUNTIME_FILENAMES from tests.test_database_runtime import FakeConnection, _config, _insert_values def _walk_action_row(): return { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "walk_action_id": "wa_001", "edge_id": "query_next_page", "edge_type": "pagination", "from_node_type": "SearchQuery", "from_node_id": "q_001", "to_node_type": "SearchPage", "to_node_id": "page_002", "walk_action": "fetch_next_page", "walk_status": "success", "budget_tier": "normal", "depth": 1, "page_cursor": "10", "next_cursor": "20", "decision_id": "d_001", "rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0", "reason_code": "has_more", "raw_payload": { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "walk_action_id": "wa_001", }, } def test_walk_actions_file_is_formal_runtime_contract(tmp_path): runtime = LocalRuntimeFileStore(tmp_path / "runtime") runtime.prepare_run("run_001") runtime.append_jsonl("run_001", "walk_actions.jsonl", [_walk_action_row()]) assert "walk_actions.jsonl" in RUNTIME_FILENAMES rows = runtime.read_jsonl("run_001", "walk_actions.jsonl") assert rows[0]["walk_action_id"] == "wa_001" assert rows[0]["record_schema_version"] == "runtime_record.v1" def test_database_runtime_writes_walk_actions_to_formal_table(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.append_jsonl("run_001", "walk_actions.jsonl", [_walk_action_row()]) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert RUNTIME_FILE_TABLES["walk_actions.jsonl"] == "content_agent_walk_actions" assert "INSERT INTO `content_agent_walk_actions`" in sql assert values["schema_version"] == "content_agent.v1" assert values["walk_action_id"] == "wa_001" assert values["edge_id"] == "query_next_page" assert values["walk_status"] == "success" assert json.loads(values["raw_payload"])["record_schema_version"] == "runtime_record.v1" def _m4_decision(decision_id, target_id, action, effect_status): return { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "decision_id": decision_id, "rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0", "decision_target_type": "content", "decision_target_id": target_id, "decision_action": action, "decision_reason_code": effect_status, "search_query_effect_status": effect_status, "decision_input_snapshot_id": f"evidence:{target_id}", } def _m4_item(content_id): return { "platform_content_id": content_id, "search_query_id": "q_001", "discovery_start_source": "pattern_itemset", "previous_discovery_step": "search_query_direct", } def _run_terminal_stage(action, effect_status): from content_agent.business_modules.walk_engine import _terminal_stage from content_agent.integrations.walk_strategy_json import WalkStrategyStore return _terminal_stage( {"pattern_execution_id": 581}, [], [_m4_item("content_001")], [_m4_decision("d_001", "content_001", action, effect_status)], WalkStrategyStore().load_walk_strategy(), "2026-06-11T00:00:00+00:00", ) def test_walk_action_missing_binding_falls_back_to_content_pack(): # M4 砍包受控变化:扩展边 binding 已删,_walk_action 经 fallback_rule_pack 回退内容包。 from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action from content_agent.integrations.walk_strategy_json import WalkStrategyStore walk_strategy = WalkStrategyStore().load_walk_strategy() binding, missing_reason = _resolve_edge_binding("query_next_page", walk_strategy) assert binding == {} assert missing_reason == "edge_binding_missing" row = _walk_action( "run_001", "policy_run_001", "wa_test", "query_next_page", "pagination", "SearchQuery", "q_001", "SearchPage", "page_002", "fetch_next_page", "success", "2026-06-10T00:00:00+00:00", rule_pack_binding=binding, rule_pack_execution={"executed": True, "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1", "reason": "content_decision_reused_for_walk_gate"}, fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"}, ) assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" assert row["rule_pack_version"] == "1.0.0" assert row["raw_payload"]["rule_pack_binding"] == {} def test_missing_decision_records_not_executed(): from content_agent.business_modules.walk_engine import _execution_record, _walk_action execution = _execution_record(None, content_pack_id="douyin_content_discovery_rule_pack_v1") row = _walk_action( "run_001", "policy_run_001", "wa_test", "author_to_works", "author", "Author", "a_001", "AuthorWorksPage", "a_001", "fetch_author_works", "skipped", "2026-06-10T00:00:00+00:00", rule_pack_binding={}, rule_pack_execution=execution, fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"}, ) assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" assert row["raw_payload"]["rule_pack_execution"] == { "executed": False, "executed_rule_pack_id": None, "reason": "future_pack_not_enabled", } def test_content_decision_execution_stamp_is_consistent_after_pack_cut(): # M4 砍包受控变化:path_stop 的 future 归属包已删,戳回退到执行包(内容包),两者一致。 result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked") action = result["walk_actions"][0] assert action["edge_id"] == "path_stop" assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" execution = action["raw_payload"]["rule_pack_execution"] assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" assert action["raw_payload"]["rule_pack_binding"] == {} def test_unknown_edge_binding_leaves_rule_pack_id_null_with_reason(): from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action from content_agent.integrations.walk_strategy_json import WalkStrategyStore walk_strategy = WalkStrategyStore().load_walk_strategy() binding, missing_reason = _resolve_edge_binding("unknown_edge", walk_strategy) assert binding == {} assert missing_reason == "edge_binding_missing" row = _walk_action( "run_001", "policy_run_001", "wa_test", "unknown_edge", "unknown", "Node", "n_001", "Node", "n_002", "noop", "skipped", "2026-06-10T00:00:00+00:00", reason_code=missing_reason, rule_pack_binding=binding, ) assert row["rule_pack_id"] is None assert row["reason_code"] == "edge_binding_missing" def test_path_stop_stamps_content_pack_after_cut(): # M4 砍包受控变化:Path 包已删,path_stop 戳=decision 的内容包。 result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked") action = result["walk_actions"][0] assert action["edge_id"] == "path_stop" assert action["walk_action"] == "stop_path" assert action["walk_status"] == "skipped" assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" def test_budget_downgrade_stamps_content_pack_after_cut(): # M4 砍包受控变化:Budget 包已删,budget_downgrade 戳=decision 的内容包。 result = _run_terminal_stage("KEEP_CONTENT_FOR_REVIEW", "pending") action = result["walk_actions"][0] assert action["edge_id"] == "budget_downgrade" assert action["budget_tier"] == "low_budget" assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" def test_decision_to_asset_keeps_content_execution_and_asset_edge_owner(): result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success") action = result["walk_actions"][0] assert action["edge_id"] == "decision_to_asset" assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" # 唯一保留的 binding(content)仍随终端边落 raw_payload。 assert action["raw_payload"]["rule_pack_binding"]["target_entity"] == "Content" execution = action["raw_payload"]["rule_pack_execution"] assert execution["executed"] is True assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" def test_walk_action_and_source_path_share_binding_contract(): result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success") action = result["walk_actions"][0] linked_paths = [ basis for basis in result["source_path_record_basis"] if basis.get("walk_action_id") == action["walk_action_id"] ] assert linked_paths for basis in linked_paths: assert basis["rule_pack_binding"] == action["raw_payload"]["rule_pack_binding"] assert basis["rule_pack_execution"] == action["raw_payload"]["rule_pack_execution"]