| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- from content_agent.business_modules.walk_engine import _terminal_stage
- from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- def test_terminal_stage_maps_p5_decisions_to_p6_budget_and_stop_actions():
- result = _terminal_stage(
- {"pattern_execution_id": 581},
- [],
- [
- {
- "platform_content_id": "content_success",
- "search_query_id": "q_001",
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_direct",
- },
- {
- "platform_content_id": "content_pending",
- "search_query_id": "q_001",
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_direct",
- },
- {
- "platform_content_id": "content_blocked",
- "search_query_id": "q_001",
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_direct",
- },
- ],
- [
- _decision("d_001", "content_success", "ADD_TO_CONTENT_POOL", "success"),
- _decision("d_002", "content_pending", "KEEP_CONTENT_FOR_REVIEW", "pending"),
- _decision("d_003", "content_blocked", "REJECT_CONTENT", "rule_blocked"),
- ],
- WalkStrategyStore().load_walk_strategy(),
- "2026-06-11T00:00:00+00:00",
- )
- by_target = {row["decision_target_id"]: row for row in result["walk_actions"]}
- assert by_target["content_success"]["walk_action"] == "commit_asset"
- assert by_target["content_pending"]["walk_action"] == "downgrade_budget"
- assert by_target["content_pending"]["budget_tier"] == "low_budget"
- assert by_target["content_blocked"]["walk_action"] == "stop_path"
- assert by_target["content_blocked"]["walk_status"] == "skipped"
- def _decision(decision_id, target_id, action, effect_status):
- return {
- "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "decision_id": decision_id,
- "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
- "rule_pack_version": "1.0.0",
- "decision_target_type": "content",
- "decision_target_id": target_id,
- "decision_action": action,
- "decision_reason_code": effect_status,
- "search_query_effect_status": effect_status,
- "decision_input_snapshot_id": f"evidence:{target_id}",
- }
- def test_keep_review_author_edge_uses_low_budget(tmp_path):
- from content_agent.business_modules.walk_engine import run_bounded_walk
- from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
- context = build_initial_walk_context(tmp_path)
- for decision in context["rule_decisions"]:
- decision["decision_action"] = "KEEP_CONTENT_FOR_REVIEW"
- decision["search_query_effect_status"] = "pending"
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- author_actions = [
- row for row in result["walk_actions"] if row["edge_id"] == "author_to_works"
- ]
- assert author_actions
- assert all(row["budget_tier"] == "low_budget" for row in author_actions)
- def _walk_configs():
- from content_agent.integrations.walk_graph_json import WalkGraphStore
- store = WalkGraphStore()
- return store.load_policy(), store.load_profile("douyin"), WalkStrategyStore().load_walk_strategy()
- def _item(content_id, author_id, *, search_query_id="q_001", has_more=False, cursor=None):
- return {
- "platform_content_id": content_id,
- "platform_author_id": author_id,
- "search_query_id": search_query_id,
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_direct",
- "has_more": has_more,
- "next_cursor": cursor,
- "tags": [],
- }
- class _EmptyAuthorWorksClient:
- def fetch_author_works(self, query):
- return []
- def test_author_budget_overflow_emits_budget_exhausted_skip():
- # R8(2026-06-12 拍板): 预算外的合格作者不再静默消失,必须留 budget_exhausted skip。
- from content_agent.business_modules.walk_engine import _expand_authors
- policy, profile, walk_strategy = _walk_configs()
- budget = policy["edge_budgets_by_id"]["author_to_works"]["max_total_actions"]
- items = [_item(f"content_{i}", f"author_{i:03d}") for i in range(1, 5)]
- decisions = [
- _decision(f"d_{i:03d}", f"content_{i}", "ADD_TO_CONTENT_POOL", "success")
- for i in range(1, 5)
- ]
- result = _expand_authors(
- run_id="run_001",
- policy_run_id="policy_run_001",
- source_context={},
- discovered_content_items=items,
- rule_decisions=decisions,
- policy=policy,
- profile=profile,
- walk_strategy=walk_strategy,
- policy_bundle={},
- content_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
- platform_client=_EmptyAuthorWorksClient(),
- runtime=None,
- gemini_video_client=None,
- start_recall_index=0,
- start_decision_index=0,
- created_at="2026-06-12T00:00:00+00:00",
- )
- actions = result["walk_actions"]
- fetched = [row for row in actions if row["walk_status"] == "success"]
- skipped = [row for row in actions if row["reason_code"] == "budget_exhausted"]
- assert len(fetched) == budget
- assert len(skipped) == 4 - budget
- assert all(row["walk_status"] == "skipped" and row["budget_tier"] == "blocked" for row in skipped)
- assert len({row["walk_action_id"] for row in actions}) == len(actions)
- def test_page_budget_overflow_emits_budget_exhausted_skip():
- # R8: 翻页预算耗尽时,仍有下一页可翻的合格 query 必须留 budget_exhausted skip。
- from content_agent.business_modules.walk_engine import _expand_queries
- policy, profile, walk_strategy = _walk_configs()
- budget = policy["edge_budgets_by_id"]["query_next_page"]["max_total_actions"]
- search_queries = [
- {"search_query_id": f"q_{i:03d}", "search_query": f"词{i}", "discovery_start_source": "pattern_itemset"}
- for i in range(1, 6)
- ]
- items = [
- _item(f"content_{i}", f"author_{i:03d}", search_query_id=f"q_{i:03d}", has_more=True, cursor="10")
- for i in range(1, 6)
- ]
- decisions = [
- _decision(f"d_{i:03d}", f"content_{i}", "ADD_TO_CONTENT_POOL", "success")
- for i in range(1, 6)
- ]
- rows, skipped_actions = _expand_queries(
- "run_001",
- "policy_run_001",
- search_queries,
- items,
- decisions,
- "2026-06-12T00:00:00+00:00",
- policy=policy,
- profile=profile,
- walk_strategy=walk_strategy,
- content_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
- )
- page_rows = [row for row in rows if row["search_query_generation_method"] == "query_next_page"]
- page_skips = [
- row for row in skipped_actions
- if row["edge_id"] == "query_next_page" and row["reason_code"] == "budget_exhausted"
- ]
- assert len(page_rows) == budget
- assert len(page_skips) == 5 - budget
- assert all(row["walk_status"] == "skipped" and row["budget_tier"] == "blocked" for row in page_skips)
|