from content_agent.business_modules.walk_engine import run_bounded_walk from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context def test_walk_engine_author_works_reenter_content_decision_flow(tmp_path): context = build_initial_walk_context(tmp_path) client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) assert client.author_calls assert any(item["previous_discovery_step"] == "author_works" for item in result["discovered_content_items"]) assert all(decision["decision_target_type"] == "content" for decision in result["rule_decisions"]) assert any(row["edge_id"] == "author_to_works" for row in result["walk_actions"]) def test_walk_engine_author_edge_skips_missing_author_id(tmp_path): context = build_initial_walk_context(tmp_path) context["discovered_content_items"][0]["platform_author_id"] = "" context["discovered_content_items"][0]["has_more"] = False context["discovered_content_items"][0]["tags"] = [] client = FakeWalkPlatformClient() run_bounded_walk(platform_client=client, **context) assert client.author_calls == [] def _override_decisions(context, action, effect_status): for decision in context["rule_decisions"]: decision["decision_action"] = action decision["search_query_effect_status"] = effect_status def test_author_edge_skips_rejected_content(tmp_path): context = build_initial_walk_context(tmp_path) _override_decisions(context, "REJECT_CONTENT", "rule_blocked") client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) assert client.author_calls == [] skipped = [ row for row in result["walk_actions"] if row["edge_id"] == "author_to_works" and row["walk_status"] == "skipped" ] assert len(skipped) == 1 assert skipped[0]["reason_code"] == "blocked_by_rule_decision" assert skipped[0]["budget_tier"] == "blocked" assert skipped[0]["raw_payload"]["decision_action"] == "REJECT_CONTENT" def test_author_edge_allows_add_content_pool(tmp_path): context = build_initial_walk_context(tmp_path) # M3: mock judgment scores 60 (review/low_budget); promote the seed decision to # pool so the author edge runs at normal budget, which is what this test asserts. _override_decisions(context, "ADD_TO_CONTENT_POOL", "success") client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) assert client.author_calls author_actions = [ row for row in result["walk_actions"] if row["edge_id"] == "author_to_works" and row["walk_status"] == "success" ] assert author_actions assert author_actions[0]["budget_tier"] == "normal" # M4 砍包受控变化:future 包 binding 已删,戳回退内容包(=executed_rule_pack_id)。 assert author_actions[0]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" def test_author_edge_keeps_review_low_budget(tmp_path): context = build_initial_walk_context(tmp_path) _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending") client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) assert client.author_calls author_actions = [ row for row in result["walk_actions"] if row["edge_id"] == "author_to_works" and row["walk_status"] == "success" ] assert author_actions assert author_actions[0]["budget_tier"] == "low_budget" assert author_actions[0]["raw_payload"]["rule_pack_execution"]["executed"] is True def test_author_works_skip_already_discovered_content(tmp_path): # 真实 E2E(v1_run_e6ba21f7543b)实证:作者近期作品包含首轮已发现的同一条视频, # 不去重会撞 DB 唯一索引 uk_ca_items_run_policy_content。 context = build_initial_walk_context(tmp_path) context["discovered_content_items"][0]["has_more"] = False context["discovered_content_items"][0]["tags"] = [] first_round_id = context["discovered_content_items"][0]["platform_content_id"] class OverlappingWorksClient(FakeWalkPlatformClient): def fetch_author_works(self, query): self.author_calls.append(dict(query)) duplicate = _dup_platform_result(query, first_round_id) fresh = _dup_platform_result(query, "7390000000000000399") return [duplicate, fresh] client = OverlappingWorksClient() result = run_bounded_walk(platform_client=client, **context) assert client.author_calls content_ids = [item["platform_content_id"] for item in result["discovered_content_items"]] # 重复视频不得二次进入 discovered;新作品正常进入。 assert content_ids.count(first_round_id) == 1 assert "7390000000000000399" in content_ids def _dup_platform_result(query, platform_content_id): from tests.p6_walk_helpers import _platform_result return _platform_result(query, platform_content_id, "作者作品", []) def test_author_works_have_search_query_lineage(tmp_path): # 真实 E2E(v1_run_3a3bc9f0d72d)实证:作者作品内容引用合成 query id 但 # search_queries 无此行,validate_run 报 missing_search_query_ref 等 8 条 fail。 # 血缘补全后:query 行落盘 → pattern_to_search_query 路径与 search_clue 经既有机制生成。 from content_agent.business_modules import run_record from content_agent.business_modules.run_record.validation import validate_run context = build_initial_walk_context(tmp_path) context["discovered_content_items"][0]["has_more"] = False context["discovered_content_items"][0]["tags"] = [] client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) author_queries = [ q for q in result["search_queries"] if q["search_query_generation_method"] == "author_works" ] assert len(author_queries) == 1 author_query = author_queries[0] assert author_query["search_query_id"].startswith("author_") assert author_query["previous_discovery_step"] == "author_to_works" # 作者作品内容引用的 query id 必须真实存在。 author_items = [ i for i in result["discovered_content_items"] if i.get("previous_discovery_step") == "author_works" ] assert author_items assert all(i["search_query_id"] == author_query["search_query_id"] for i in author_items) # 端到端血缘:M4 后终端边+血缘由 run_bounded_walk 一体产出,record_run 后 validate_run 必须 pass。 record = run_record.run( context["run_id"], context["policy_run_id"], result["search_queries"], result["discovered_content_items"], result["rule_decisions"], result["source_path_record_basis"], context["policy_bundle"], context["runtime"], walk_actions=result["walk_actions"], ) from content_agent.business_modules import learning_review, result_source_lookup result_source_lookup.run( context["run_id"], context["policy_run_id"], context["policy_bundle"], result["discovered_content_items"], result["content_media_records"], result["rule_decisions"], record["source_path_records"], record["search_clues"], context["runtime"], ) learning_review.run(context["run_id"], context["policy_run_id"], context["runtime"]) validation = validate_run(context["run_id"], context["runtime"]) fails = [f for f in validation["findings"] if f["level"] == "fail"] assert validation["status"] == "pass", fails