| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """Real + synthetic case replay tests (V2-M0D).
- - real_id45: the harvested production baseline (demand_content.id=45). M3 受控变化:
- 画像门槛(missing_content_portrait / pattern_recall_required 等)整体退役,改由
- Gemini 相关性(max60)+ 平台热度(max40)打分,≥70 进池 / 60-69 复看 / <60 拒。
- 默认 FakeGeminiVideoClient 给 relevance_score=0.85(→relevance 60),热度按各 item
- digg_count 对数归一化,real_id45 因此落 2 进池 + 2 复看(原全 KEEP)。
- - syn_pool / syn_review: synthetic corpora (authored with high/low engagement)
- exercise the ADD / KEEP paths via the same relevance + platform-heat scoring.
- Snapshots lock the deterministic replay output; regenerate with UPDATE_SNAPSHOTS=1.
- """
- from __future__ import annotations
- import copy
- import json
- from collections import Counter
- from pathlib import Path
- from typing import Any
- from tests.replay_harness import CASES_DIR, replay_case
- from tests.snapshot import assert_matches
- _SUMMARY_KEYS = [
- "pooled_content_count",
- "review_content_count",
- "rejected_content_count",
- "pending_content_count",
- ]
- def _decision_counts(artifacts) -> dict[str, int]:
- return dict(Counter(d.get("decision_action") for d in artifacts.decisions))
- def _build_synthetic_corpus(cases_dir: Path, case_id: str, items: list[dict[str, Any]]) -> None:
- """Author a minimal corpus: real (validated) source_context + given items."""
- source_context = json.loads(
- (CASES_DIR / "real_id45" / "input" / "source_context.json").read_text(encoding="utf-8")
- )
- dest = cases_dir / case_id / "input"
- dest.mkdir(parents=True, exist_ok=True)
- (dest / "source_context.json").write_text(
- json.dumps(source_context, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- (dest / "discovered_content_items.jsonl").write_text(
- json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- def _synthetic_item(content_id: str, *, digg: int) -> dict[str, Any]:
- return {
- "content_discovery_id": f"syn_{content_id}",
- "search_query_id": "q_001",
- "platform": "douyin",
- "platform_content_id": content_id,
- "platform_content_format": "video",
- "description": "中医养生合成内容",
- "platform_author_id": "syn_author",
- "author_display_name": "养生作者",
- "statistics": {"digg_count": digg, "comment_count": 800, "share_count": 600},
- "tags": ["#中医养生"],
- "score": 85,
- "risk_level": "low",
- "availability": "available",
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_direct",
- "content_metadata_source": "synthetic",
- }
- def test_replay_id45_baseline_gemini_score(tmp_path):
- # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分。
- # 默认 FakeGeminiVideoClient 返回 fit_senior_50plus=true / relevance_score=0.85
- # → relevance=60(满分)。平台热度按各 item digg_count 对数归一化:
- # R3 第二步(2026-06-12): 抖音热度改 赞+评+转+藏 四字段复合后,
- # 高转发高收藏的 content_732018(赞仅 2.1万 但转 3689/藏 1.5万)heat 0.38→总分 70,
- # 从复看升进池 → 3 进池(491098/72459/20801)+1 复看(content_907506 赞 24 全低)。
- artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
- assert artifacts.state["status"] == "success"
- assert artifacts.summary["pooled_content_count"] == 3
- assert artifacts.summary["review_content_count"] == 1
- assert artifacts.summary["rejected_content_count"] == 0
- assert artifacts.summary["pending_content_count"] == 0
- assert _decision_counts(artifacts) == {
- "ADD_TO_CONTENT_POOL": 3,
- "KEEP_CONTENT_FOR_REVIEW": 1,
- }
- # 全部命中相关性+热度打分门(旧画像 reason_code 已退役)。
- assert {d.get("decision_reason_code") for d in artifacts.decisions} == {
- "content_score_pool",
- "content_score_review",
- }
- assert_matches("real_id45/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
- def test_replay_synthetic_pool_case(tmp_path):
- _build_synthetic_corpus(tmp_path / "cases", "syn_pool", [_synthetic_item("9000000000000000001", digg=50000)])
- artifacts = replay_case("syn_pool", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases")
- assert artifacts.state["status"] == "success"
- assert artifacts.summary["pooled_content_count"] >= 1
- assert artifacts.summary["rejected_content_count"] == 0
- assert_matches("syn_pool/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
- def test_replay_synthetic_review_case(tmp_path):
- # Low engagement scores into the review band (60-69).
- _build_synthetic_corpus(tmp_path / "cases", "syn_review", [_synthetic_item("9000000000000000002", digg=500)])
- artifacts = replay_case("syn_review", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases")
- assert artifacts.state["status"] == "success"
- assert artifacts.summary["review_content_count"] >= 1
- assert artifacts.summary["pooled_content_count"] == 0
- assert_matches("syn_review/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
- def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
- # R3 第二步受控变化: 四字段热度复合后 real_id45 = 3 进池 + 1 复看。
- # 进池内容驱动正常预算扩散——query 翻页、tag 扩词、作者抓作品均 success/normal;
- # 仅 1 条复看内容触发 budget_downgrade(low_budget)。动作仍全部带归属包与执行事实。
- artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
- walk_actions = artifacts.files["walk_actions.jsonl"]
- next_page = [row for row in walk_actions if row["edge_id"] == "query_next_page"]
- assert next_page
- assert all(row["walk_status"] == "success" for row in next_page)
- tag_actions = [row for row in walk_actions if row["edge_id"] == "hashtag_to_query"]
- executed_tags = [row for row in tag_actions if row["walk_status"] == "success"]
- skipped_tags = [row for row in tag_actions if row["walk_status"] == "skipped"]
- assert executed_tags
- assert all(row["budget_tier"] == "normal" for row in executed_tags)
- # R8/R7 + R3 第二步: tag 预算 3 个名额被进池内容占满,executed 3;
- # 现在 3 进池内容,2 条排不上队(budget_exhausted)、1 条复看内容无资格(deny)。
- assert sorted(row["reason_code"] for row in skipped_tags) == [
- "budget_exhausted",
- "budget_exhausted",
- "review_tag_expansion_disabled",
- ]
- assert len(executed_tags) == 3
- author_actions = [row for row in walk_actions if row["edge_id"] == "author_to_works"]
- assert author_actions
- assert all(row["walk_status"] == "success" for row in author_actions)
- assert all(row["budget_tier"] == "normal" for row in author_actions)
- downgrades = [row for row in walk_actions if row["edge_id"] == "budget_downgrade"]
- assert len(downgrades) == 1
- assert all(row["budget_tier"] == "low_budget" for row in downgrades)
- assert all(row["reason_code"] == "content_score_review" for row in downgrades)
- # M4 砍包受控变化:Budget 包及 binding 已删,KEEP 的戳回退内容包(=executed_rule_pack_id)。
- assert all(row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" for row in downgrades)
- for row in walk_actions:
- execution = row["raw_payload"]["rule_pack_execution"]
- assert execution["executed"] is True
- assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
|