test_case_replay.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """Real + synthetic case replay tests (V2-M0D).
  2. - real_id45: the harvested production baseline (demand_content.id=45). M3 受控变化:
  3. 画像门槛(missing_content_portrait / pattern_recall_required 等)整体退役,改由
  4. Gemini 相关性(max60)+ 平台热度(max40)打分,≥70 进池 / 60-69 复看 / <60 拒。
  5. 默认 FakeGeminiVideoClient 给 relevance_score=0.85(→relevance 60),热度按各 item
  6. digg_count 对数归一化,real_id45 因此落 2 进池 + 2 复看(原全 KEEP)。
  7. - syn_pool / syn_review: synthetic corpora (authored with high/low engagement)
  8. exercise the ADD / KEEP paths via the same relevance + platform-heat scoring.
  9. Snapshots lock the deterministic replay output; regenerate with UPDATE_SNAPSHOTS=1.
  10. """
  11. from __future__ import annotations
  12. import copy
  13. import json
  14. from collections import Counter
  15. from pathlib import Path
  16. from typing import Any
  17. from tests.replay_harness import CASES_DIR, replay_case
  18. from tests.snapshot import assert_matches
  19. _SUMMARY_KEYS = [
  20. "pooled_content_count",
  21. "review_content_count",
  22. "rejected_content_count",
  23. "pending_content_count",
  24. ]
  25. def _decision_counts(artifacts) -> dict[str, int]:
  26. return dict(Counter(d.get("decision_action") for d in artifacts.decisions))
  27. def _build_synthetic_corpus(cases_dir: Path, case_id: str, items: list[dict[str, Any]]) -> None:
  28. """Author a minimal corpus: real (validated) source_context + given items."""
  29. source_context = json.loads(
  30. (CASES_DIR / "real_id45" / "input" / "source_context.json").read_text(encoding="utf-8")
  31. )
  32. dest = cases_dir / case_id / "input"
  33. dest.mkdir(parents=True, exist_ok=True)
  34. (dest / "source_context.json").write_text(
  35. json.dumps(source_context, ensure_ascii=False, indent=2), encoding="utf-8"
  36. )
  37. (dest / "discovered_content_items.jsonl").write_text(
  38. json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8"
  39. )
  40. def _synthetic_item(content_id: str, *, digg: int) -> dict[str, Any]:
  41. return {
  42. "content_discovery_id": f"syn_{content_id}",
  43. "search_query_id": "q_001",
  44. "platform": "douyin",
  45. "platform_content_id": content_id,
  46. "platform_content_format": "video",
  47. "description": "中医养生合成内容",
  48. "platform_author_id": "syn_author",
  49. "author_display_name": "养生作者",
  50. "statistics": {"digg_count": digg, "comment_count": 800, "share_count": 600},
  51. "tags": ["#中医养生"],
  52. "score": 85,
  53. "risk_level": "low",
  54. "availability": "available",
  55. "discovery_start_source": "pattern_itemset",
  56. "previous_discovery_step": "search_query_direct",
  57. "content_metadata_source": "synthetic",
  58. }
  59. def test_replay_id45_baseline_gemini_score(tmp_path):
  60. # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分。
  61. # 默认 FakeGeminiVideoClient 返回 fit_senior_50plus=true / relevance_score=0.85
  62. # → relevance=60(满分)。平台热度按各 item digg_count 对数归一化:
  63. # R3 第二步(2026-06-12): 抖音热度改 赞+评+转+藏 四字段复合后,
  64. # 高转发高收藏的 content_732018(赞仅 2.1万 但转 3689/藏 1.5万)heat 0.38→总分 70,
  65. # 从复看升进池 → 3 进池(491098/72459/20801)+1 复看(content_907506 赞 24 全低)。
  66. artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
  67. assert artifacts.state["status"] == "success"
  68. assert artifacts.summary["pooled_content_count"] == 3
  69. assert artifacts.summary["review_content_count"] == 1
  70. assert artifacts.summary["rejected_content_count"] == 0
  71. assert artifacts.summary["pending_content_count"] == 0
  72. assert _decision_counts(artifacts) == {
  73. "ADD_TO_CONTENT_POOL": 3,
  74. "KEEP_CONTENT_FOR_REVIEW": 1,
  75. }
  76. # 全部命中相关性+热度打分门(旧画像 reason_code 已退役)。
  77. assert {d.get("decision_reason_code") for d in artifacts.decisions} == {
  78. "content_score_pool",
  79. "content_score_review",
  80. }
  81. assert_matches("real_id45/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
  82. def test_replay_synthetic_pool_case(tmp_path):
  83. _build_synthetic_corpus(tmp_path / "cases", "syn_pool", [_synthetic_item("9000000000000000001", digg=50000)])
  84. artifacts = replay_case("syn_pool", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases")
  85. assert artifacts.state["status"] == "success"
  86. assert artifacts.summary["pooled_content_count"] >= 1
  87. assert artifacts.summary["rejected_content_count"] == 0
  88. assert_matches("syn_pool/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
  89. def test_replay_synthetic_review_case(tmp_path):
  90. # Low engagement scores into the review band (60-69).
  91. _build_synthetic_corpus(tmp_path / "cases", "syn_review", [_synthetic_item("9000000000000000002", digg=500)])
  92. artifacts = replay_case("syn_review", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases")
  93. assert artifacts.state["status"] == "success"
  94. assert artifacts.summary["review_content_count"] >= 1
  95. assert artifacts.summary["pooled_content_count"] == 0
  96. assert_matches("syn_review/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
  97. def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
  98. # R3 第二步受控变化: 四字段热度复合后 real_id45 = 3 进池 + 1 复看。
  99. # 进池内容驱动正常预算扩散——query 翻页、tag 扩词、作者抓作品均 success/normal;
  100. # 仅 1 条复看内容触发 budget_downgrade(low_budget)。动作仍全部带归属包与执行事实。
  101. artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
  102. walk_actions = artifacts.files["walk_actions.jsonl"]
  103. next_page = [row for row in walk_actions if row["edge_id"] == "query_next_page"]
  104. assert next_page
  105. assert all(row["walk_status"] == "success" for row in next_page)
  106. tag_actions = [row for row in walk_actions if row["edge_id"] == "hashtag_to_query"]
  107. executed_tags = [row for row in tag_actions if row["walk_status"] == "success"]
  108. skipped_tags = [row for row in tag_actions if row["walk_status"] == "skipped"]
  109. assert executed_tags
  110. assert all(row["budget_tier"] == "normal" for row in executed_tags)
  111. # R8/R7 + R3 第二步: tag 预算 3 个名额被进池内容占满,executed 3;
  112. # 现在 3 进池内容,2 条排不上队(budget_exhausted)、1 条复看内容无资格(deny)。
  113. assert sorted(row["reason_code"] for row in skipped_tags) == [
  114. "budget_exhausted",
  115. "budget_exhausted",
  116. "review_tag_expansion_disabled",
  117. ]
  118. assert len(executed_tags) == 3
  119. author_actions = [row for row in walk_actions if row["edge_id"] == "author_to_works"]
  120. assert author_actions
  121. assert all(row["walk_status"] == "success" for row in author_actions)
  122. assert all(row["budget_tier"] == "normal" for row in author_actions)
  123. downgrades = [row for row in walk_actions if row["edge_id"] == "budget_downgrade"]
  124. assert len(downgrades) == 1
  125. assert all(row["budget_tier"] == "low_budget" for row in downgrades)
  126. assert all(row["reason_code"] == "content_score_review" for row in downgrades)
  127. # M4 砍包受控变化:Budget 包及 binding 已删,KEEP 的戳回退内容包(=executed_rule_pack_id)。
  128. assert all(row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" for row in downgrades)
  129. for row in walk_actions:
  130. execution = row["raw_payload"]["rule_pack_execution"]
  131. assert execution["executed"] is True
  132. assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"