"""Real + synthetic case replay tests (V2-M0D). - real_id45: the harvested production baseline (demand_content.id=45). M3 受控变化: 画像门槛(missing_content_portrait / pattern_recall_required 等)整体退役,改由 Gemini 相关性(max60)+ 平台热度(max40)打分,≥70 进池 / 60-69 复看 / <60 拒。 默认 FakeGeminiVideoClient 给 relevance_score=0.85(→relevance 60),热度按各 item digg_count 对数归一化,real_id45 因此落 2 进池 + 2 复看(原全 KEEP)。 - syn_pool / syn_review: synthetic corpora (authored with high/low engagement) exercise the ADD / KEEP paths via the same relevance + platform-heat scoring. Snapshots lock the deterministic replay output; regenerate with UPDATE_SNAPSHOTS=1. """ from __future__ import annotations import copy import json from collections import Counter from pathlib import Path from typing import Any from tests.replay_harness import CASES_DIR, replay_case from tests.snapshot import assert_matches _SUMMARY_KEYS = [ "pooled_content_count", "review_content_count", "rejected_content_count", "pending_content_count", ] def _decision_counts(artifacts) -> dict[str, int]: return dict(Counter(d.get("decision_action") for d in artifacts.decisions)) def _build_synthetic_corpus(cases_dir: Path, case_id: str, items: list[dict[str, Any]]) -> None: """Author a minimal corpus: real (validated) source_context + given items.""" source_context = json.loads( (CASES_DIR / "real_id45" / "input" / "source_context.json").read_text(encoding="utf-8") ) dest = cases_dir / case_id / "input" dest.mkdir(parents=True, exist_ok=True) (dest / "source_context.json").write_text( json.dumps(source_context, ensure_ascii=False, indent=2), encoding="utf-8" ) (dest / "discovered_content_items.jsonl").write_text( json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8" ) def _synthetic_item(content_id: str, *, digg: int) -> dict[str, Any]: return { "content_discovery_id": f"syn_{content_id}", "search_query_id": "q_001", "platform": "douyin", "platform_content_id": content_id, "platform_content_format": "video", "description": "中医养生合成内容", "platform_author_id": "syn_author", "author_display_name": "养生作者", "statistics": {"digg_count": digg, "comment_count": 800, "share_count": 600}, "tags": ["#中医养生"], "score": 85, "risk_level": "low", "availability": "available", "discovery_start_source": "pattern_itemset", "previous_discovery_step": "search_query_direct", "content_metadata_source": "synthetic", } def test_replay_id45_baseline_gemini_score(tmp_path): # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分。 # 默认 FakeGeminiVideoClient 返回 fit_senior_50plus=true / relevance_score=0.85 # → relevance=60(满分)。平台热度按各 item digg_count 对数归一化: # R3 第二步(2026-06-12): 抖音热度改 赞+评+转+藏 四字段复合后, # 高转发高收藏的 content_732018(赞仅 2.1万 但转 3689/藏 1.5万)heat 0.38→总分 70, # 从复看升进池 → 3 进池(491098/72459/20801)+1 复看(content_907506 赞 24 全低)。 artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt") assert artifacts.state["status"] == "success" assert artifacts.summary["pooled_content_count"] == 3 assert artifacts.summary["review_content_count"] == 1 assert artifacts.summary["rejected_content_count"] == 0 assert artifacts.summary["pending_content_count"] == 0 assert _decision_counts(artifacts) == { "ADD_TO_CONTENT_POOL": 3, "KEEP_CONTENT_FOR_REVIEW": 1, } # 全部命中相关性+热度打分门(旧画像 reason_code 已退役)。 assert {d.get("decision_reason_code") for d in artifacts.decisions} == { "content_score_pool", "content_score_review", } assert_matches("real_id45/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS) def test_replay_synthetic_pool_case(tmp_path): _build_synthetic_corpus(tmp_path / "cases", "syn_pool", [_synthetic_item("9000000000000000001", digg=50000)]) artifacts = replay_case("syn_pool", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases") assert artifacts.state["status"] == "success" assert artifacts.summary["pooled_content_count"] >= 1 assert artifacts.summary["rejected_content_count"] == 0 assert_matches("syn_pool/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS) def test_replay_synthetic_review_case(tmp_path): # Low engagement scores into the review band (60-69). _build_synthetic_corpus(tmp_path / "cases", "syn_review", [_synthetic_item("9000000000000000002", digg=500)]) artifacts = replay_case("syn_review", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases") assert artifacts.state["status"] == "success" assert artifacts.summary["review_content_count"] >= 1 assert artifacts.summary["pooled_content_count"] == 0 assert_matches("syn_review/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS) def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path): # R3 第二步受控变化: 四字段热度复合后 real_id45 = 3 进池 + 1 复看。 # 进池内容驱动正常预算扩散——query 翻页、tag 扩词、作者抓作品均 success/normal; # 仅 1 条复看内容触发 budget_downgrade(low_budget)。动作仍全部带归属包与执行事实。 artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt") walk_actions = artifacts.files["walk_actions.jsonl"] next_page = [row for row in walk_actions if row["edge_id"] == "query_next_page"] assert next_page assert all(row["walk_status"] == "success" for row in next_page) tag_actions = [row for row in walk_actions if row["edge_id"] == "hashtag_to_query"] executed_tags = [row for row in tag_actions if row["walk_status"] == "success"] skipped_tags = [row for row in tag_actions if row["walk_status"] == "skipped"] assert executed_tags assert all(row["budget_tier"] == "normal" for row in executed_tags) # R8/R7 + R3 第二步: tag 预算 3 个名额被进池内容占满,executed 3; # 现在 3 进池内容,2 条排不上队(budget_exhausted)、1 条复看内容无资格(deny)。 assert sorted(row["reason_code"] for row in skipped_tags) == [ "budget_exhausted", "budget_exhausted", "review_tag_expansion_disabled", ] assert len(executed_tags) == 3 author_actions = [row for row in walk_actions if row["edge_id"] == "author_to_works"] assert author_actions assert all(row["walk_status"] == "success" for row in author_actions) assert all(row["budget_tier"] == "normal" for row in author_actions) downgrades = [row for row in walk_actions if row["edge_id"] == "budget_downgrade"] assert len(downgrades) == 1 assert all(row["budget_tier"] == "low_budget" for row in downgrades) assert all(row["reason_code"] == "content_score_review" for row in downgrades) # M4 砍包受控变化:Budget 包及 binding 已删,KEEP 的戳回退内容包(=executed_rule_pack_id)。 assert all(row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" for row in downgrades) for row in walk_actions: execution = row["raw_payload"]["rule_pack_execution"] assert execution["executed"] is True assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"