from content_agent.business_modules.walk_engine import _terminal_stage from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION from content_agent.integrations.walk_strategy_json import WalkStrategyStore def test_terminal_stage_maps_p5_decisions_to_p6_budget_and_stop_actions(): result = _terminal_stage( {"pattern_execution_id": 581}, [], [ { "platform_content_id": "content_success", "search_query_id": "q_001", "discovery_start_source": "pattern_itemset", "previous_discovery_step": "search_query_direct", }, { "platform_content_id": "content_pending", "search_query_id": "q_001", "discovery_start_source": "pattern_itemset", "previous_discovery_step": "search_query_direct", }, { "platform_content_id": "content_blocked", "search_query_id": "q_001", "discovery_start_source": "pattern_itemset", "previous_discovery_step": "search_query_direct", }, ], [ _decision("d_001", "content_success", "ADD_TO_CONTENT_POOL", "success"), _decision("d_002", "content_pending", "KEEP_CONTENT_FOR_REVIEW", "pending"), _decision("d_003", "content_blocked", "REJECT_CONTENT", "rule_blocked"), ], WalkStrategyStore().load_walk_strategy(), "2026-06-11T00:00:00+00:00", ) by_target = {row["decision_target_id"]: row for row in result["walk_actions"]} assert by_target["content_success"]["walk_action"] == "commit_asset" assert by_target["content_pending"]["walk_action"] == "downgrade_budget" assert by_target["content_pending"]["budget_tier"] == "low_budget" assert by_target["content_blocked"]["walk_action"] == "stop_path" assert by_target["content_blocked"]["walk_status"] == "skipped" def _decision(decision_id, target_id, action, effect_status): return { "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION, "run_id": "run_001", "policy_run_id": "policy_run_001", "decision_id": decision_id, "rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0", "decision_target_type": "content", "decision_target_id": target_id, "decision_action": action, "decision_reason_code": effect_status, "search_query_effect_status": effect_status, "decision_input_snapshot_id": f"evidence:{target_id}", } def test_keep_review_author_edge_uses_low_budget(tmp_path): from content_agent.business_modules.walk_engine import run_bounded_walk from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context context = build_initial_walk_context(tmp_path) for decision in context["rule_decisions"]: decision["decision_action"] = "KEEP_CONTENT_FOR_REVIEW" decision["search_query_effect_status"] = "pending" client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) author_actions = [ row for row in result["walk_actions"] if row["edge_id"] == "author_to_works" ] assert author_actions assert all(row["budget_tier"] == "low_budget" for row in author_actions) def _walk_configs(): from content_agent.integrations.walk_graph_json import WalkGraphStore store = WalkGraphStore() return store.load_policy(), store.load_profile("douyin"), WalkStrategyStore().load_walk_strategy() def _item(content_id, author_id, *, search_query_id="q_001", has_more=False, cursor=None): return { "platform_content_id": content_id, "platform_author_id": author_id, "search_query_id": search_query_id, "discovery_start_source": "pattern_itemset", "previous_discovery_step": "search_query_direct", "has_more": has_more, "next_cursor": cursor, "tags": [], } class _EmptyAuthorWorksClient: def fetch_author_works(self, query): return [] def test_author_budget_overflow_emits_budget_exhausted_skip(): # R8(2026-06-12 拍板): 预算外的合格作者不再静默消失,必须留 budget_exhausted skip。 from content_agent.business_modules.walk_engine import _expand_authors policy, profile, walk_strategy = _walk_configs() budget = policy["edge_budgets_by_id"]["author_to_works"]["max_total_actions"] items = [_item(f"content_{i}", f"author_{i:03d}") for i in range(1, 5)] decisions = [ _decision(f"d_{i:03d}", f"content_{i}", "ADD_TO_CONTENT_POOL", "success") for i in range(1, 5) ] result = _expand_authors( run_id="run_001", policy_run_id="policy_run_001", source_context={}, discovered_content_items=items, rule_decisions=decisions, policy=policy, profile=profile, walk_strategy=walk_strategy, policy_bundle={}, content_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"}, platform_client=_EmptyAuthorWorksClient(), runtime=None, gemini_video_client=None, start_recall_index=0, start_decision_index=0, created_at="2026-06-12T00:00:00+00:00", ) actions = result["walk_actions"] fetched = [row for row in actions if row["walk_status"] == "success"] skipped = [row for row in actions if row["reason_code"] == "budget_exhausted"] assert len(fetched) == budget assert len(skipped) == 4 - budget assert all(row["walk_status"] == "skipped" and row["budget_tier"] == "blocked" for row in skipped) assert len({row["walk_action_id"] for row in actions}) == len(actions) def test_page_budget_overflow_emits_budget_exhausted_skip(): # R8: 翻页预算耗尽时,仍有下一页可翻的合格 query 必须留 budget_exhausted skip。 from content_agent.business_modules.walk_engine import _expand_queries policy, profile, walk_strategy = _walk_configs() budget = policy["edge_budgets_by_id"]["query_next_page"]["max_total_actions"] search_queries = [ {"search_query_id": f"q_{i:03d}", "search_query": f"词{i}", "discovery_start_source": "pattern_itemset"} for i in range(1, 6) ] items = [ _item(f"content_{i}", f"author_{i:03d}", search_query_id=f"q_{i:03d}", has_more=True, cursor="10") for i in range(1, 6) ] decisions = [ _decision(f"d_{i:03d}", f"content_{i}", "ADD_TO_CONTENT_POOL", "success") for i in range(1, 6) ] rows, skipped_actions = _expand_queries( "run_001", "policy_run_001", search_queries, items, decisions, "2026-06-12T00:00:00+00:00", policy=policy, profile=profile, walk_strategy=walk_strategy, content_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"}, ) page_rows = [row for row in rows if row["search_query_generation_method"] == "query_next_page"] page_skips = [ row for row in skipped_actions if row["edge_id"] == "query_next_page" and row["reason_code"] == "budget_exhausted" ] assert len(page_rows) == budget assert len(page_skips) == 5 - budget assert all(row["walk_status"] == "skipped" and row["budget_tier"] == "blocked" for row in page_skips)