| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from content_agent.business_modules.walk_engine import run_bounded_walk
- from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
- def test_walk_engine_author_works_reenter_content_decision_flow(tmp_path):
- context = build_initial_walk_context(tmp_path)
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- assert client.author_calls
- assert any(item["previous_discovery_step"] == "author_works" for item in result["discovered_content_items"])
- assert all(decision["decision_target_type"] == "content" for decision in result["rule_decisions"])
- assert any(row["edge_id"] == "author_to_works" for row in result["walk_actions"])
- def test_walk_engine_author_edge_skips_missing_author_id(tmp_path):
- context = build_initial_walk_context(tmp_path)
- context["discovered_content_items"][0]["platform_author_id"] = ""
- context["discovered_content_items"][0]["has_more"] = False
- context["discovered_content_items"][0]["tags"] = []
- client = FakeWalkPlatformClient()
- run_bounded_walk(platform_client=client, **context)
- assert client.author_calls == []
- def _override_decisions(context, action, effect_status):
- for decision in context["rule_decisions"]:
- decision["decision_action"] = action
- decision["search_query_effect_status"] = effect_status
- def test_author_edge_skips_rejected_content(tmp_path):
- context = build_initial_walk_context(tmp_path)
- _override_decisions(context, "REJECT_CONTENT", "rule_blocked")
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- assert client.author_calls == []
- skipped = [
- row for row in result["walk_actions"]
- if row["edge_id"] == "author_to_works" and row["walk_status"] == "skipped"
- ]
- assert len(skipped) == 1
- assert skipped[0]["reason_code"] == "blocked_by_rule_decision"
- assert skipped[0]["budget_tier"] == "blocked"
- assert skipped[0]["raw_payload"]["decision_action"] == "REJECT_CONTENT"
- def test_author_edge_allows_add_content_pool(tmp_path):
- context = build_initial_walk_context(tmp_path)
- # M3: mock judgment scores 60 (review/low_budget); promote the seed decision to
- # pool so the author edge runs at normal budget, which is what this test asserts.
- _override_decisions(context, "ADD_TO_CONTENT_POOL", "success")
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- assert client.author_calls
- author_actions = [
- row for row in result["walk_actions"]
- if row["edge_id"] == "author_to_works" and row["walk_status"] == "success"
- ]
- assert author_actions
- assert author_actions[0]["budget_tier"] == "normal"
- # M4 砍包受控变化:future 包 binding 已删,戳回退内容包(=executed_rule_pack_id)。
- assert author_actions[0]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
- def test_author_edge_keeps_review_low_budget(tmp_path):
- context = build_initial_walk_context(tmp_path)
- _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending")
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- assert client.author_calls
- author_actions = [
- row for row in result["walk_actions"]
- if row["edge_id"] == "author_to_works" and row["walk_status"] == "success"
- ]
- assert author_actions
- assert author_actions[0]["budget_tier"] == "low_budget"
- assert author_actions[0]["raw_payload"]["rule_pack_execution"]["executed"] is True
- def test_author_works_skip_already_discovered_content(tmp_path):
- # 真实 E2E(v1_run_e6ba21f7543b)实证:作者近期作品包含首轮已发现的同一条视频,
- # 不去重会撞 DB 唯一索引 uk_ca_items_run_policy_content。
- context = build_initial_walk_context(tmp_path)
- context["discovered_content_items"][0]["has_more"] = False
- context["discovered_content_items"][0]["tags"] = []
- first_round_id = context["discovered_content_items"][0]["platform_content_id"]
- class OverlappingWorksClient(FakeWalkPlatformClient):
- def fetch_author_works(self, query):
- self.author_calls.append(dict(query))
- duplicate = _dup_platform_result(query, first_round_id)
- fresh = _dup_platform_result(query, "7390000000000000399")
- return [duplicate, fresh]
- client = OverlappingWorksClient()
- result = run_bounded_walk(platform_client=client, **context)
- assert client.author_calls
- content_ids = [item["platform_content_id"] for item in result["discovered_content_items"]]
- # 重复视频不得二次进入 discovered;新作品正常进入。
- assert content_ids.count(first_round_id) == 1
- assert "7390000000000000399" in content_ids
- def _dup_platform_result(query, platform_content_id):
- from tests.p6_walk_helpers import _platform_result
- return _platform_result(query, platform_content_id, "作者作品", [])
- def test_author_works_have_search_query_lineage(tmp_path):
- # 真实 E2E(v1_run_3a3bc9f0d72d)实证:作者作品内容引用合成 query id 但
- # search_queries 无此行,validate_run 报 missing_search_query_ref 等 8 条 fail。
- # 血缘补全后:query 行落盘 → pattern_to_search_query 路径与 search_clue 经既有机制生成。
- from content_agent.business_modules import run_record
- from content_agent.business_modules.run_record.validation import validate_run
- context = build_initial_walk_context(tmp_path)
- context["discovered_content_items"][0]["has_more"] = False
- context["discovered_content_items"][0]["tags"] = []
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- author_queries = [
- q for q in result["search_queries"]
- if q["search_query_generation_method"] == "author_works"
- ]
- assert len(author_queries) == 1
- author_query = author_queries[0]
- assert author_query["search_query_id"].startswith("author_")
- assert author_query["previous_discovery_step"] == "author_to_works"
- # 作者作品内容引用的 query id 必须真实存在。
- author_items = [
- i for i in result["discovered_content_items"]
- if i.get("previous_discovery_step") == "author_works"
- ]
- assert author_items
- assert all(i["search_query_id"] == author_query["search_query_id"] for i in author_items)
- # 端到端血缘:M4 后终端边+血缘由 run_bounded_walk 一体产出,record_run 后 validate_run 必须 pass。
- record = run_record.run(
- context["run_id"],
- context["policy_run_id"],
- result["search_queries"],
- result["discovered_content_items"],
- result["rule_decisions"],
- result["source_path_record_basis"],
- context["policy_bundle"],
- context["runtime"],
- walk_actions=result["walk_actions"],
- )
- from content_agent.business_modules import learning_review, result_source_lookup
- result_source_lookup.run(
- context["run_id"],
- context["policy_run_id"],
- context["policy_bundle"],
- result["discovered_content_items"],
- result["content_media_records"],
- result["rule_decisions"],
- record["source_path_records"],
- record["search_clues"],
- context["runtime"],
- )
- learning_review.run(context["run_id"], context["policy_run_id"], context["runtime"])
- validation = validate_run(context["run_id"], context["runtime"])
- fails = [f for f in validation["findings"] if f["level"] == "fail"]
- assert validation["status"] == "pass", fails
|