Explorar o código

test: cover V4 M2 M3 contracts

Sam Lee hai 22 horas
pai
achega
26d54934ef
Modificáronse 37 ficheiros con 1855 adicións e 584 borrados
  1. 16 25
      tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json
  2. 26 12
      tests/gemini_helpers.py
  3. 7 2
      tests/p6_walk_helpers.py
  4. 4 4
      tests/test_api.py
  5. 28 33
      tests/test_case_replay.py
  6. 12 9
      tests/test_concurrency_consistency.py
  7. 33 31
      tests/test_config_case_matrix.py
  8. 3 5
      tests/test_config_tooling.py
  9. 70 0
      tests/test_database_runtime.py
  10. 38 0
      tests/test_douyin_client.py
  11. 12 18
      tests/test_dual_channel_gemini_replay.py
  12. 33 4
      tests/test_dual_channel_normalization.py
  13. 9 9
      tests/test_gemini_helpers.py
  14. 83 41
      tests/test_gemini_video.py
  15. 191 0
      tests/test_kuaishou_client.py
  16. 10 29
      tests/test_p7_final_output.py
  17. 4 4
      tests/test_p7_policy_walk_versions.py
  18. 26 0
      tests/test_platform_access.py
  19. 79 0
      tests/test_platform_observable_performance.py
  20. 11 11
      tests/test_policy_dispatch.py
  21. 4 4
      tests/test_policy_replay_data.py
  22. 10 12
      tests/test_query_effect_aggregation.py
  23. 16 27
      tests/test_replay_gemini_seam.py
  24. 39 56
      tests/test_rule_decision_effect_status.py
  25. 7 16
      tests/test_rule_judgment_hard_gates.py
  26. 143 193
      tests/test_rule_judgment_scorecard.py
  27. 14 23
      tests/test_rule_pack_reading.py
  28. 99 0
      tests/test_schema_registry_v4_contract.py
  29. 47 0
      tests/test_shipinhao_client.py
  30. 1 9
      tests/test_source_evidence.py
  31. 4 6
      tests/test_v1_graph.py
  32. 118 0
      tests/test_v4_m2_platform_sources_replay.py
  33. 122 0
      tests/test_v4_m3_scoring_replay.py
  34. 55 0
      tests/test_v4_rule_pack_contract.py
  35. 379 0
      tests/test_v4_validator_contract.py
  36. 101 0
      tests/test_v4_walk_contract.py
  37. 1 1
      tests/test_walk_strategy_config.py

+ 16 - 25
tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json

@@ -10,12 +10,12 @@
  ],
  [
   "budget_downgrade",
-  "d_004",
-  "7577667864522907506",
+  "d_003",
+  "7406990358799732018",
   "downgrade_budget",
   "success",
   "low_budget",
-  "content_score_review"
+  "v4_score_review_needed"
  ],
  [
   "decision_to_asset",
@@ -24,7 +24,7 @@
   "commit_asset",
   "success",
   "normal",
-  "content_score_pool"
+  "v4_query_and_platform_pass"
  ],
  [
   "decision_to_asset",
@@ -33,16 +33,7 @@
   "commit_asset",
   "success",
   "normal",
-  "content_score_pool"
- ],
- [
-  "decision_to_asset",
-  "d_003",
-  "7406990358799732018",
-  "commit_asset",
-  "success",
-  "normal",
-  "content_score_pool"
+  "v4_query_and_platform_pass"
  ],
  [
   "hashtag_to_query",
@@ -60,7 +51,7 @@
   "create_tag_query",
   "skipped",
   "blocked",
-  "budget_exhausted"
+  "review_tag_expansion_disabled"
  ],
  [
   "hashtag_to_query",
@@ -69,7 +60,7 @@
   "create_tag_query",
   "skipped",
   "blocked",
-  "review_tag_expansion_disabled"
+  "blocked_by_rule_decision"
  ],
  [
   "hashtag_to_query",
@@ -99,18 +90,18 @@
   ""
  ],
  [
-  "query_next_page",
-  "q_001",
-  "q_001_page_002",
-  "fetch_next_page",
-  "success",
-  "normal",
-  ""
+  "path_stop",
+  "d_004",
+  "7577667864522907506",
+  "stop_path",
+  "skipped",
+  "stop",
+  "v4_query_or_score_below_threshold"
  ],
  [
   "query_next_page",
-  "q_002",
-  "q_002_page_002",
+  "q_001",
+  "q_001_page_002",
   "fetch_next_page",
   "success",
   "normal",

+ 26 - 12
tests/gemini_helpers.py

@@ -1,8 +1,4 @@
-"""Deterministic Gemini video-judgment fakes (V3-M0A, schema backfilled in M2).
-
-Factories return the real M2 structured-output schema:
-fit_senior_50plus / fit_confidence / relevance_score / reason (+ status on fail).
-"""
+"""Deterministic Gemini video-relevance fakes (V4-M3)."""
 
 from __future__ import annotations
 
@@ -14,20 +10,38 @@ from typing import Any
 
 
 def fake_gemini_pool() -> dict[str, Any]:
-    return {"fit_senior_50plus": True, "fit_confidence": 0.9, "relevance_score": 0.85, "reason": "pool stub"}
+    return {
+        "schema_version": "v4_gemini_query_relevance.v1",
+        "query_text": "pool query",
+        "query_relevance_score": 80,
+        "query_relevance_reason": "pool stub",
+        "final_status": "ok",
+        "retry_count": 0,
+    }
 
 
 def fake_gemini_review() -> dict[str, Any]:
-    return {"fit_senior_50plus": True, "fit_confidence": 0.8, "relevance_score": 0.45, "reason": "review stub"}
+    return {
+        "schema_version": "v4_gemini_query_relevance.v1",
+        "query_text": "review query",
+        "query_relevance_score": 60,
+        "query_relevance_reason": "review stub",
+        "final_status": "ok",
+        "retry_count": 0,
+    }
 
 
 def fake_gemini_fail(reason: str = "gemini_timeout") -> dict[str, Any]:
     return {
-        "fit_senior_50plus": False,
-        "fit_confidence": 0.0,
-        "relevance_score": 0.0,
-        "reason": reason,
-        "status": "failed",
+        "schema_version": "v4_gemini_query_relevance.v1",
+        "query_text": "failed query",
+        "query_relevance_score": None,
+        "query_relevance_reason": "",
+        "final_status": "failed",
+        "failure_type": reason,
+        "exception_type": "TimeoutException",
+        "http_status_code": None,
+        "retry_count": 1,
     }
 
 

+ 7 - 2
tests/p6_walk_helpers.py

@@ -76,7 +76,7 @@ def build_initial_walk_context(tmp_path: Path, *, tags: list[str] | None = None)
         runtime,
         FakeGeminiVideoClient(),
     )
-    policy_bundle = JsonPolicyBundleStore(Path(".")).load_policy_bundle("V1")
+    policy_bundle = JsonPolicyBundleStore(Path(".")).load_policy_bundle("V4")
     decisions = rule_judgment.run(
         run_id,
         policy_run_id,
@@ -115,7 +115,12 @@ def _platform_result(
         "description": description,
         "platform_author_id": "MS4wLjABAAAA001",
         "author_display_name": "作者",
-        "statistics": {"digg_count": 9000, "comment_count": 800, "share_count": 700},
+        "statistics": {
+            "digg_count": 5_000_000,
+            "comment_count": 800,
+            "share_count": 700,
+            "collect_count": 5_000,
+        },
         "tags": tags,
         "score": 72,
         "risk_level": "low",

+ 4 - 4
tests/test_api.py

@@ -25,7 +25,7 @@ def test_api_runs_and_queries_mock_chain(tmp_path, monkeypatch):
     assert payload["platform_mode"] == "mock"
     assert payload["policy_run_id"].startswith("policy_run_")
     assert payload["policy_bundle_id"] == "douyin_policy_bundle_v1"
-    assert payload["strategy_version"] == "V1"
+    assert payload["strategy_version"] == "V4"
 
     for path in [
         f"/runs/{run_id}",
@@ -40,10 +40,10 @@ def test_api_runs_and_queries_mock_chain(tmp_path, monkeypatch):
         assert get_response.status_code == 200, path
 
     review = client.get(f"/runs/{run_id}/strategy-review").json()["data"]
-    # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分;mock 链热度不足,
-    # 三条内容落复看带(原 1 进池)。
+    # V4-M3: mock 链路用 query relevance + 平台可观测表现 50/50 打分。
     assert review["summary"]["pooled_content_count"] == 0
-    assert review["summary"]["review_content_count"] == 3
+    assert review["summary"]["review_content_count"] == 0
+    assert review["summary"]["rejected_content_count"] == 3
     assert review["suggestions"]
 
     validation = client.get(f"/runs/{run_id}/validation").json()

+ 28 - 33
tests/test_case_replay.py

@@ -1,12 +1,9 @@
 """Real + synthetic case replay tests (V2-M0D).
 
-- real_id45: the harvested production baseline (demand_content.id=45). M3 受控变化:
-  画像门槛(missing_content_portrait / pattern_recall_required 等)整体退役,改由
-  Gemini 相关性(max60)+ 平台热度(max40)打分,≥70 进池 / 60-69 复看 / <60 拒。
-  默认 FakeGeminiVideoClient 给 relevance_score=0.85(→relevance 60),热度按各 item
-  digg_count 对数归一化,real_id45 因此落 2 进池 + 2 复看(原全 KEEP)。
+- real_id45: the harvested production baseline (demand_content.id=45). V4-M3 受控变化:
+  画像门槛整体退役,改由 Gemini query relevance + 平台可观测表现 50/50 打分。
 - syn_pool / syn_review: synthetic corpora (authored with high/low engagement)
-  exercise the ADD / KEEP paths via the same relevance + platform-heat scoring.
+  exercise the ADD / KEEP paths via the same V4 scorecard.
 
 Snapshots lock the deterministic replay output; regenerate with UPDATE_SNAPSHOTS=1.
 """
@@ -20,7 +17,6 @@ from pathlib import Path
 from typing import Any
 
 from tests.replay_harness import CASES_DIR, replay_case
-from tests.snapshot import assert_matches
 
 _SUMMARY_KEYS = [
     "pooled_content_count",
@@ -59,7 +55,12 @@ def _synthetic_item(content_id: str, *, digg: int) -> dict[str, Any]:
         "description": "中医养生合成内容",
         "platform_author_id": "syn_author",
         "author_display_name": "养生作者",
-        "statistics": {"digg_count": digg, "comment_count": 800, "share_count": 600},
+        "statistics": {
+            "digg_count": digg,
+            "comment_count": 800,
+            "share_count": 600,
+            "collect_count": 5000,
+        },
         "tags": ["#中医养生"],
         "score": 85,
         "risk_level": "low",
@@ -71,53 +72,49 @@ def _synthetic_item(content_id: str, *, digg: int) -> dict[str, Any]:
 
 
 def test_replay_id45_baseline_gemini_score(tmp_path):
-    # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分。
-    # 默认 FakeGeminiVideoClient 返回 fit_senior_50plus=true / relevance_score=0.85
-    # → relevance=60(满分)。平台热度按各 item digg_count 对数归一化:
-    # R3 第二步(2026-06-12): 抖音热度改 赞+评+转+藏 四字段复合后,
-    # 高转发高收藏的 content_732018(赞仅 2.1万 但转 3689/藏 1.5万)heat 0.38→总分 70,
-    # 从复看升进池 → 3 进池(491098/72459/20801)+1 复看(content_907506 赞 24 全低)。
     artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
     assert artifacts.state["status"] == "success"
-    assert artifacts.summary["pooled_content_count"] == 3
+    assert artifacts.summary["pooled_content_count"] == 2
     assert artifacts.summary["review_content_count"] == 1
-    assert artifacts.summary["rejected_content_count"] == 0
+    assert artifacts.summary["rejected_content_count"] == 1
     assert artifacts.summary["pending_content_count"] == 0
     assert _decision_counts(artifacts) == {
-        "ADD_TO_CONTENT_POOL": 3,
+        "ADD_TO_CONTENT_POOL": 2,
         "KEEP_CONTENT_FOR_REVIEW": 1,
+        "REJECT_CONTENT": 1,
     }
-    # 全部命中相关性+热度打分门(旧画像 reason_code 已退役)。
     assert {d.get("decision_reason_code") for d in artifacts.decisions} == {
-        "content_score_pool",
-        "content_score_review",
+        "v4_query_and_platform_pass",
+        "v4_score_review_needed",
+        "v4_query_or_score_below_threshold",
     }
-    assert_matches("real_id45/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
 
 
 def test_replay_synthetic_pool_case(tmp_path):
-    _build_synthetic_corpus(tmp_path / "cases", "syn_pool", [_synthetic_item("9000000000000000001", digg=50000)])
+    _build_synthetic_corpus(
+        tmp_path / "cases",
+        "syn_pool",
+        [_synthetic_item("9000000000000000001", digg=5_000_000)],
+    )
     artifacts = replay_case("syn_pool", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases")
     assert artifacts.state["status"] == "success"
     assert artifacts.summary["pooled_content_count"] >= 1
     assert artifacts.summary["rejected_content_count"] == 0
-    assert_matches("syn_pool/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
 
 
 def test_replay_synthetic_review_case(tmp_path):
-    # Low engagement scores into the review band (60-69).
-    _build_synthetic_corpus(tmp_path / "cases", "syn_review", [_synthetic_item("9000000000000000002", digg=500)])
+    _build_synthetic_corpus(
+        tmp_path / "cases",
+        "syn_review",
+        [_synthetic_item("9000000000000000002", digg=50_000)],
+    )
     artifacts = replay_case("syn_review", runtime_root=tmp_path / "rt", cases_dir=tmp_path / "cases")
     assert artifacts.state["status"] == "success"
     assert artifacts.summary["review_content_count"] >= 1
     assert artifacts.summary["pooled_content_count"] == 0
-    assert_matches("syn_review/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
 
 
 def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
-    # R3 第二步受控变化: 四字段热度复合后 real_id45 = 3 进池 + 1 复看。
-    # 进池内容驱动正常预算扩散——query 翻页、tag 扩词、作者抓作品均 success/normal;
-    # 仅 1 条复看内容触发 budget_downgrade(low_budget)。动作仍全部带归属包与执行事实。
     artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
     walk_actions = artifacts.files["walk_actions.jsonl"]
 
@@ -130,10 +127,8 @@ def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
     skipped_tags = [row for row in tag_actions if row["walk_status"] == "skipped"]
     assert executed_tags
     assert all(row["budget_tier"] == "normal" for row in executed_tags)
-    # R8/R7 + R3 第二步: tag 预算 3 个名额被进池内容占满,executed 3;
-    # 现在 3 进池内容,2 条排不上队(budget_exhausted)、1 条复看内容无资格(deny)。
     assert sorted(row["reason_code"] for row in skipped_tags) == [
-        "budget_exhausted",
+        "blocked_by_rule_decision",
         "budget_exhausted",
         "review_tag_expansion_disabled",
     ]
@@ -147,7 +142,7 @@ def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
     downgrades = [row for row in walk_actions if row["edge_id"] == "budget_downgrade"]
     assert len(downgrades) == 1
     assert all(row["budget_tier"] == "low_budget" for row in downgrades)
-    assert all(row["reason_code"] == "content_score_review" for row in downgrades)
+    assert all(row["reason_code"] == "v4_score_review_needed" for row in downgrades)
     # M4 砍包受控变化:Budget 包及 binding 已删,KEEP 的戳回退内容包(=executed_rule_pack_id)。
     assert all(row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" for row in downgrades)
 

+ 12 - 9
tests/test_concurrency_consistency.py

@@ -69,7 +69,7 @@ def test_jittered_completion_preserves_offset_order(tmp_path):
     # 每条内容的预置结果不同;乱序完成后若 offset 错位,判定会张冠李戴。
     items, media, bundles = _synthetic_recall_inputs(8)
     results = {
-        item["platform_content_id"]: {**fake_gemini_pool(), "relevance_score": round(0.1 * (i + 1), 2)}
+        item["platform_content_id"]: {**fake_gemini_pool(), "query_relevance_score": 10 * (i + 1)}
         for i, item in enumerate(items)
     }
     runtime = LocalRuntimeFileStore(tmp_path / "rt")
@@ -79,8 +79,8 @@ def test_jittered_completion_preserves_offset_order(tmp_path):
         JitteredFakeGeminiVideoClient(result_by_content_id=results),
     )
     for i, updated in enumerate(recalled["discovered_content_items"]):
-        expected = results[updated["platform_content_id"]]["relevance_score"]
-        assert updated["pattern_match_result"]["relevance_score"] == expected
+        expected = results[updated["platform_content_id"]]["query_relevance_score"]
+        assert updated["pattern_match_result"]["query_relevance_score"] == expected
         assert updated["pattern_match_result"]["pattern_recall_evidence_id"] == f"recall_{i + 1:03d}"
 
 
@@ -96,14 +96,17 @@ def test_quota_cap_deterministic_truncation(tmp_path, monkeypatch):
             "run_001", "policy_run_001", items, media, bundles, {}, runtime, client,
         )
         statuses[label] = [
-            (row["pattern_match_result"]["judge_status"], row["pattern_match_result"]["reason"])
+            (
+                row["pattern_match_result"]["judge_status"],
+                row["pattern_match_result"]["query_relevance_reason"],
+            )
             for row in recalled["discovered_content_items"]
         ]
         assert client.used == 2
     # 截断边界按 offset 预判:前 2 条真判、后 3 条配额拒,串/并行完全一致。
     assert statuses["serial"] == statuses["concurrent"]
     assert [status for status, _ in statuses["serial"]] == ["ok", "ok", "failed", "failed", "failed"]
-    assert all(reason == "gemini_quota_exhausted" for _, reason in statuses["serial"][2:])
+    assert all(reason == "" for _, reason in statuses["serial"][2:])
 
 
 def test_quota_exhaustion_is_observable(tmp_path, monkeypatch):
@@ -118,10 +121,10 @@ def test_quota_exhaustion_is_observable(tmp_path, monkeypatch):
     assert artifacts.state["status"] == "success"
     quota_rows = [
         row for row in artifacts.files["pattern_recall_evidence.jsonl"]
-        if row["evidence_summary"]["reason"] == "gemini_quota_exhausted"
+        if row["evidence_summary"].get("failure_type") == "gemini_quota_exhausted"
     ]
     assert quota_rows
-    assert all(row["evidence_summary"]["judge_status"] == "failed" for row in quota_rows)
+    assert all(row["evidence_summary"]["final_status"] == "failed" for row in quota_rows)
     quota_events = [
         row for row in artifacts.files["run_events.jsonl"]
         if row["event_type"] == "gemini_quota_exhausted"
@@ -145,7 +148,7 @@ def test_analyze_exception_does_not_break_run(tmp_path):
     assert artifacts.state["status"] == "success"
     assert artifacts.files["pattern_recall_evidence.jsonl"]
     assert all(
-        row["evidence_summary"]["judge_status"] == "failed"
-        and row["evidence_summary"]["reason"].startswith("analyze_raised")
+        row["evidence_summary"]["final_status"] == "failed"
+        and row["evidence_summary"]["failure_type"].startswith("analyze_raised")
         for row in artifacts.files["pattern_recall_evidence.jsonl"]
     )

+ 33 - 31
tests/test_config_case_matrix.py

@@ -1,9 +1,8 @@
-"""config x case matrix (V2-M0E).
+"""config x case matrix.
 
 Replays the same captured case under different configurations to prove the
 "foolproof config" safety net: changing config changes the case outcome,
-visibly (snapshot diff), without breaking the pipeline. Variants that depend on
-later modules (M3 per-entity dispatch) are xfail until then.
+visibly, without breaking the pipeline.
 """
 
 from __future__ import annotations
@@ -16,28 +15,24 @@ import pytest
 
 from content_agent.integrations.policy_json import JsonPolicyBundleStore
 from tests.replay_harness import replay_case
-from tests.snapshot import assert_matches
 
 ROOT = Path(__file__).resolve().parents[1]
 _RULE_PACK_REL = "product_documents/规则包/douyin_rule_packs.v1.json"
 _WALK_REL = "product_documents/抖音游走策略/douyin_walk_strategy.v1.json"
 
 
-def _senior_block_store(root: Path) -> JsonPolicyBundleStore:
-    """M3 config variant: flip the not_fit_senior gate to fire on fit_senior_50plus == true.
-
-    The captured case's mock Gemini judgment marks every item fit (fit_senior_50plus=true),
-    so inverting the gate's expected value blocks the whole batch by config alone — a clean
-    counterproof that the hard gate (and the downstream walk) is config-driven, not hardcoded.
-    """
+def _judge_ok_block_store(root: Path) -> JsonPolicyBundleStore:
+    """Flip the existing judge_failed gate so judge_status == ok blocks by config alone."""
     (root / _RULE_PACK_REL).parent.mkdir(parents=True, exist_ok=True)
     (root / _WALK_REL).parent.mkdir(parents=True, exist_ok=True)
     shutil.copy(ROOT / _WALK_REL, root / _WALK_REL)
     package = json.loads((ROOT / _RULE_PACK_REL).read_text(encoding="utf-8"))
     for pack in package.get("rule_packs", []):
         for gate in pack.get("hard_gates", []):
-            if gate.get("gate_id") == "not_fit_senior":
-                gate["when"]["value"] = True
+            if gate.get("gate_id") == "judge_failed":
+                gate["when"]["value"] = "ok"
+                gate["decision_action"] = "REJECT_CONTENT"
+                gate["severity"] = "fatal"
     (root / _RULE_PACK_REL).write_text(json.dumps(package, ensure_ascii=False, indent=2), encoding="utf-8")
     return JsonPolicyBundleStore(root)
 
@@ -54,37 +49,45 @@ def _outcome(artifacts) -> dict:
 def _variant_overrides(variant: str, cfg_dir: Path):
     if variant == "default":
         return None
-    if variant == "senior_block":
-        return {"policy_store": _senior_block_store(cfg_dir)}
+    if variant == "judge_ok_block":
+        return {"policy_store": _judge_ok_block_store(cfg_dir)}
     raise ValueError(variant)
 
 
-@pytest.mark.parametrize("variant", ["default", "senior_block"])
+@pytest.mark.parametrize("variant", ["default", "judge_ok_block"])
 def test_matrix_real_id45(variant, tmp_path):
     overrides = _variant_overrides(variant, tmp_path / "cfg")
     artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt", config_overrides=overrides)
     assert artifacts.state["status"] == "success"  # config change must not break the chain
-    assert_matches(f"matrix/real_id45__{variant}", _outcome(artifacts))
-
-
-def test_senior_block_changes_outcome(tmp_path):
+    outcome = _outcome(artifacts)
+    if variant == "default":
+        assert outcome["pooled"] == 2
+        assert outcome["rejected"] == 1
+        assert outcome["effect_status_counts"] == {
+            "success": 2,
+            "pending": 1,
+            "failed": 1,
+            "rule_blocked": 0,
+        }
+    else:
+        assert outcome["pooled"] == 0
+        assert outcome["rejected"] == 4
+        assert outcome["effect_status_counts"]["rule_blocked"] == 4
+
+
+def test_judge_ok_block_changes_outcome(tmp_path):
     base = _outcome(replay_case("real_id45", runtime_root=tmp_path / "rt0"))
     blocked = _outcome(
         replay_case(
             "real_id45",
             runtime_root=tmp_path / "rt1",
-            config_overrides={"policy_store": _senior_block_store(tmp_path / "cfg")},
+            config_overrides={"policy_store": _judge_ok_block_store(tmp_path / "cfg")},
         )
     )
-    # Decoupling proof: one config edit on the not_fit_senior gate visibly moves the outcome.
     assert base != blocked
-    # Default: no item is blocked by the senior-fit gate; items flow into pool / review.
-    assert "content_not_fit_senior" not in base["reasons"]
     assert base["effect_status_counts"]["rule_blocked"] == 0
-    # R3 第二步: 四字段热度复合后 real_id45 默认 3 进池(原 2)。
-    assert base["pooled"] == 3
-    # Blocked variant: every item trips the (config-inverted) hard gate -> rule_blocked reject.
-    assert blocked["reasons"] == ["content_not_fit_senior"] * 4
+    assert base["pooled"] == 2
+    assert blocked["reasons"] == ["v4_technical_retry_needed"] * 4
     assert blocked["effect_status_counts"]["rule_blocked"] == 4
     assert blocked["pooled"] == 0
     assert blocked["rejected"] == 4
@@ -104,12 +107,11 @@ def test_decoupling_counterproof():
     assert 'target_entity") == "Content"' not in source
 
 
-def test_senior_block_blocks_all_walk_expansion(tmp_path):
-    # M4 受控变化: 全拦截(rule_blocked)时翻页/作者/tag 全停;砍包后 path_stop 戳=内容包。
+def test_judge_ok_block_blocks_all_walk_expansion(tmp_path):
     artifacts = replay_case(
         "real_id45",
         runtime_root=tmp_path / "rt",
-        config_overrides={"policy_store": _senior_block_store(tmp_path / "cfg")},
+        config_overrides={"policy_store": _judge_ok_block_store(tmp_path / "cfg")},
     )
     walk_actions = artifacts.files["walk_actions.jsonl"]
 

+ 3 - 5
tests/test_config_tooling.py

@@ -25,7 +25,7 @@ def test_config_store_hash_matches_raw_file_bytes():
     parsed, raw = config_store.load_json(RULE_PACK_JSON)
     assert config_store.sha256_text(raw) == config_store.sha256_text(RULE_PACK_JSON.read_text("utf-8"))
     # policy_bundle_hash must still hash the raw rule-pack text (refactor parity).
-    bundle = JsonPolicyBundleStore(".").load_policy_bundle("V1")
+    bundle = JsonPolicyBundleStore(".").load_policy_bundle("V4")
     assert bundle["policy_bundle_hash"] == config_store.sha256_text(raw)
     assert bundle["strategy_source_ref"]["content_sha256"] == bundle["policy_bundle_hash"]
 
@@ -66,11 +66,9 @@ def test_excel_meta_strategy_id_matches_walk_strategy():
     )
     rows = list(workbook["rule_package_meta"].iter_rows(values_only=True))
     meta = dict(zip(rows[0], rows[2]))  # row 2 is the data-dictionary row; row 3 is data
-    walk_strategy = json.loads(
-        (ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json").read_text("utf-8")
-    )
+    rule_pack = json.loads(RULE_PACK_JSON.read_text("utf-8"))
 
-    assert meta["strategy_id"] == walk_strategy["strategy_id"]
+    assert meta["strategy_id"] == rule_pack["strategy_binding"]["strategy_id"]
 
 
 def test_query_prompts_validator_passes_after_m2():

+ 70 - 0
tests/test_database_runtime.py

@@ -378,6 +378,72 @@ def test_database_runtime_preserves_p5_rule_decision_fields():
     assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
 
 
+def test_database_runtime_preserves_v4_score_and_walk_json_contract():
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    store.append_jsonl(
+        "run_001",
+        "rule_decisions.jsonl",
+        [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": "run_001",
+                "policy_run_id": "policy_run_001",
+                "decision_id": "d_v4_001",
+                "policy_bundle_id": "policy_bundle_v4",
+                "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
+                "rule_pack_version": "4.0.0",
+                "strategy_version": "V4",
+                "decision_target_type": "content",
+                "decision_target_id": "content_001",
+                "decision_action": "ADD_TO_CONTENT_POOL",
+                "decision_reason_code": "v4_query_and_platform_pass",
+                "search_query_effect_status": "success",
+                "score": 80,
+                "scorecard": {
+                    "schema_version": "v4_scorecard.v1",
+                    "query_relevance_score": 82,
+                    "platform_performance_score": 78,
+                    "missing_observable_fields": ["view_count"],
+                },
+                "decision_replay_data": {
+                    "policy_bundle_hash": "hash_v4",
+                    "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
+                    "rule_pack_version": "4.0.0",
+                    "dispatch_id": "dispatch_content_v4",
+                    "strategy_version": "V4",
+                    "allow_walk": True,
+                    "walk_gate_snapshot": {
+                        "query_relevance_score": 82,
+                        "platform_performance_score": 78,
+                        "score": 80,
+                    },
+                },
+                "raw_payload": {
+                    "decision_id": "d_v4_001",
+                    "v4_contract": {
+                        "query_relevance_score": 82,
+                        "platform_performance_score": 78,
+                    },
+                },
+            }
+        ],
+    )
+
+    sql, params = connection.statements[-1]
+    values = _insert_values(sql, params)
+    assert "INSERT INTO `content_agent_rule_decisions`" in sql
+    scorecard = json.loads(values["scorecard"])
+    replay_data = json.loads(values["decision_replay_data"])
+    assert scorecard["schema_version"] == "v4_scorecard.v1"
+    assert scorecard["query_relevance_score"] == 82
+    assert scorecard["platform_performance_score"] == 78
+    assert scorecard["missing_observable_fields"] == ["view_count"]
+    assert replay_data["allow_walk"] is True
+    assert replay_data["walk_gate_snapshot"]["score"] == 80
+
+
 def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
     connection = FakeConnection()
     store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
@@ -550,6 +616,8 @@ def test_database_runtime_writes_author_assets():
                 "source_type": "runtime_author_work",
                 "validation_status": "validated",
                 "eligible_as_source": 1,
+                "elderly_ratio": 0.72,
+                "elderly_tgi": 138,
                 "content_tags": ["人物故事"],
                 "source_run_id": "run_001",
                 "source_policy_run_id": "policy_run_001",
@@ -568,6 +636,8 @@ def test_database_runtime_writes_author_assets():
     assert values["author_asset_id"] == "author_asset_001"
     assert values["platform_author_id"] == "author_001"
     assert values["eligible_as_source"] == 1
+    assert values["elderly_ratio"] == 0.72
+    assert values["elderly_tgi"] == 138
     assert json.loads(values["content_tags"]) == ["人物故事"]
     assert json.loads(values["profile_snapshot"])["sample_count"] == 9
     assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]

+ 38 - 0
tests/test_douyin_client.py

@@ -227,6 +227,32 @@ def test_douyin_keyword_search_can_limit_results_per_query():
     assert len(client.http_client.requests) == 1
 
 
+def test_douyin_keyword_search_default_limit_is_five():
+    client = CrawapiDouyinClient(
+        base_url="http://crawapi.test",
+        keyword_path="/crawler/dou_yin/keyword",
+        http_client=FakeHttpClient(
+            [
+                _response(
+                    200,
+                    {
+                        "data": {
+                            "data": [
+                                {RAW_CONTENT_ID_KEY: str(index), "author": {}, "statistics": {}}
+                                for index in range(6)
+                            ]
+                        }
+                    },
+                ),
+            ]
+        ),
+    )
+
+    results = client.search(_search_query("默认限量"))
+
+    assert [result["platform_content_id"] for result in results] == ["0", "1", "2", "3", "4"]
+
+
 def _author_query(author_id="MS4wLjABAAAA001", **extra):
     return {
         "search_query_id": "author_001",
@@ -322,9 +348,21 @@ def test_from_env_reads_blogger_path_and_sort_type(monkeypatch, tmp_path):
 
     assert client.blogger_path == "crawler/dou_yin/blogger"
     assert client.default_account_works_sort_type == "最热"
+    assert client.max_results_per_query == 5
     assert isinstance(client.rate_limiter, RateLimiter)
 
 
+def test_from_env_reads_max_results_override(monkeypatch, tmp_path):
+    monkeypatch.setenv("CONTENTFIND_API_CRAWAPI_BASE_URL", "http://crawapi.test")
+    monkeypatch.setenv("CONTENTFIND_DOUYIN_KEYWORD_PATH", "/crawler/dou_yin/keyword")
+    monkeypatch.setenv("CONTENTFIND_DOUYIN_BLOGGER_PATH", "/crawler/dou_yin/blogger")
+    monkeypatch.setenv("CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY", "2")
+
+    client = CrawapiDouyinClient.from_env(env_path=tmp_path / "missing.env")
+
+    assert client.max_results_per_query == 2
+
+
 def test_rate_limiter_waits_between_keyword_calls():
     clock = {"now": 0.0}
     sleeps = []

+ 12 - 18
tests/test_dual_channel_gemini_replay.py

@@ -1,8 +1,4 @@
-"""V3-M2D: Gemini 判定结果端到端落到 pattern_match_result(经回放 harness)。
-
-验证 M2 的核心契约:recall_pattern 调 GeminiVideoClient,把 4 个判定字段写进
-discovered item 的 pattern_match_result。real_id45 回放零回归(决策 + validation pass)。
-"""
+"""V4-M3: Gemini relevance result lands in pattern_match_result."""
 
 from __future__ import annotations
 
@@ -15,11 +11,6 @@ def _items(artifacts):
 
 
 def test_replay_writes_gemini_fields_to_pattern_match_result(tmp_path):
-    # M3 受控变化: pattern_match_result 落 Gemini 4 字段(fit_senior_50plus /
-    # fit_confidence / relevance_score / reason)+ judge_status,旧 M2→M3 桥接键
-    # (pattern_recall / category_or_element_binding)随画像门槛退役而移除。
-    # pool stub(relevance 0.85 → relevance 60)在视频号 sph_caihong(digg=[92,282,469,
-    # 1153,1272],锚 50/5e4)上:digg 92 热度过低落 60-69 复看,其余 4 条进池。
     artifacts = replay_case(
         "sph_caihong",
         runtime_root=tmp_path / "rt",
@@ -30,17 +21,20 @@ def test_replay_writes_gemini_fields_to_pattern_match_result(tmp_path):
     assert items
     for item in items:
         pmr = item["pattern_match_result"]
-        assert pmr["fit_senior_50plus"] is True
-        assert pmr["relevance_score"] == 0.85
-        assert set(pmr) >= {"fit_senior_50plus", "fit_confidence", "relevance_score", "reason"}
+        assert pmr["query_relevance_score"] == 80
+        assert set(pmr) >= {"query_relevance_score", "query_text", "query_relevance_reason"}
         assert pmr["judge_status"] == "ok"
-        # 2026-06-12 清理: content_audience_profile 画像镜像维度已废弃,不再写入
-        # (fit_senior_50plus 真身在 pattern_match_result)。
+        assert "fit_senior_50plus" not in pmr
+        assert "relevance_score" not in pmr
         assert "content_audience_profile" not in item
 
-    assert artifacts.summary["pooled_content_count"] == 4
-    assert artifacts.summary["review_content_count"] == 1
-    assert artifacts.summary["rejected_content_count"] == 0
+    routed_count = (
+        artifacts.summary["pooled_content_count"]
+        + artifacts.summary["review_content_count"]
+        + artifacts.summary["rejected_content_count"]
+    )
+    assert routed_count == len(items)
+    assert {d["scorecard"]["schema_version"] for d in artifacts.decisions} == {"v4_scorecard.v1"}
 
 
 def test_replay_real_id45_validation_pass_with_bridge(tmp_path):

+ 33 - 4
tests/test_dual_channel_normalization.py

@@ -6,6 +6,10 @@ import pytest
 
 from content_agent.errors import ContentAgentError, ErrorCode
 from content_agent.integrations.douyin import CrawapiDouyinClient
+from content_agent.integrations.kuaishou import (
+    CrawapiKuaishouClient,
+    _normalize_kuaishou_item,
+)
 from content_agent.integrations.shipinhao import (
     CrawapiShipinhaoClient,
     _normalize_shipinhao_item,
@@ -25,7 +29,7 @@ def _douyin_client():
     )
 
 
-def test_douyin_and_shipinhao_share_canonical_keys():
+def test_douyin_shipinhao_and_kuaishou_share_required_canonical_keys():
     douyin_item = _douyin_client()._normalize_content_item(
         _QUERY,
         {"aweme_id": "a1", "author": {"sec_uid": "u1", "nickname": "n"}, "video": {"play_addr": {"url_list": ["http://v"]}}},
@@ -40,18 +44,43 @@ def test_douyin_and_shipinhao_share_canonical_keys():
         True,
         "12",
     )
+    ks_item = _normalize_kuaishou_item(
+        _QUERY,
+        {
+            "channel_content_id": "k1",
+            "content_link": "https://www.kuaishou.com/short-video/k1",
+            "channel_account_id": "acc",
+            "channel_account_name": "快手作者",
+            "title": "彩虹 #彩虹",
+            "video_url_list": [{"video_url": "http://v"}],
+        },
+        1,
+        False,
+        "",
+    )
     assert set(douyin_item) == set(sph_item)
+    assert set(douyin_item).issubset(set(ks_item))
     assert douyin_item["platform"] == "douyin"
     assert sph_item["platform"] == "shipinhao"
+    assert ks_item["platform"] == "kuaishou"
+    assert ks_item["platform_content_url"].endswith("/k1")
 
 
-def test_shipinhao_real_dispatch_builds_client(monkeypatch):
+def test_real_dispatch_builds_registered_platform_clients(monkeypatch):
+    monkeypatch.setattr(
+        CrawapiDouyinClient, "from_env", classmethod(lambda cls: object.__new__(cls))
+    )
+    monkeypatch.setattr(
+        CrawapiKuaishouClient, "from_env", classmethod(lambda cls: object.__new__(cls))
+    )
     monkeypatch.setattr(
         CrawapiShipinhaoClient, "from_env", classmethod(lambda cls: object.__new__(cls))
     )
     service = object.__new__(RunService)
-    client = service._platform_client("shipinhao", "real")
-    assert isinstance(client, CrawapiShipinhaoClient)
+
+    assert isinstance(service._platform_client("douyin", "real"), CrawapiDouyinClient)
+    assert isinstance(service._platform_client("kuaishou", "real"), CrawapiKuaishouClient)
+    assert isinstance(service._platform_client("shipinhao", "real"), CrawapiShipinhaoClient)
 
 
 def test_unsupported_real_platform_raises():

+ 9 - 9
tests/test_gemini_helpers.py

@@ -1,4 +1,4 @@
-"""V3-M0A: FakeGeminiVideoClient unit tests."""
+"""V4-M3: FakeGeminiVideoClient unit tests."""
 
 from __future__ import annotations
 
@@ -14,8 +14,8 @@ from tests.gemini_helpers import (
 def test_fake_gemini_default_returns_pool():
     client = FakeGeminiVideoClient()
     result = client.analyze({"platform_content_id": "c1"}, {}, {})
-    assert result["fit_senior_50plus"] is True
-    assert result["relevance_score"] == 0.85
+    assert result["schema_version"] == "v4_gemini_query_relevance.v1"
+    assert result["query_relevance_score"] == 80
     assert client.calls[0]["content"]["platform_content_id"] == "c1"
 
 
@@ -23,17 +23,17 @@ def test_fake_gemini_by_content_id_routing():
     client = FakeGeminiVideoClient(
         result_by_content_id={"c1": fake_gemini_review(), "c2": fake_gemini_fail()}
     )
-    assert client.analyze({"platform_content_id": "c1"}, {}, {})["relevance_score"] == 0.45
-    assert client.analyze({"platform_content_id": "c2"}, {}, {})["status"] == "failed"
-    assert client.analyze({"platform_content_id": "c3"}, {}, {})["relevance_score"] == 0.85
+    assert client.analyze({"platform_content_id": "c1"}, {}, {})["query_relevance_score"] == 60
+    assert client.analyze({"platform_content_id": "c2"}, {}, {})["final_status"] == "failed"
+    assert client.analyze({"platform_content_id": "c3"}, {}, {})["query_relevance_score"] == 80
 
 
 def test_fake_gemini_records_calls_with_deepcopy():
     client = FakeGeminiVideoClient()
     first = client.analyze({"platform_content_id": "c1"}, {}, {})
-    first["fit_senior_50plus"] = "mutated"
+    first["query_relevance_score"] = "mutated"
     second = client.analyze({"platform_content_id": "c1"}, {}, {})
-    assert second["fit_senior_50plus"] is True
+    assert second["query_relevance_score"] == 80
     assert len(client.calls) == 2
 
 
@@ -41,4 +41,4 @@ def test_fake_gemini_conforms_to_protocol():
     client: GeminiVideoClient = FakeGeminiVideoClient()
     result = client.analyze({"platform_content_id": "c1"}, {"play_url": None}, {"name": "case"})
     assert isinstance(result, dict)
-    assert fake_gemini_pool()["fit_senior_50plus"] is True
+    assert fake_gemini_pool()["query_relevance_score"] == 80

+ 83 - 41
tests/test_gemini_video.py

@@ -1,4 +1,4 @@
-"""V3-M2B: GeminiVideoClient.analyze (mocked fetch + httpx)."""
+"""V4-M3: GeminiVideoClient.analyze relevance-only contract."""
 
 from __future__ import annotations
 
@@ -11,11 +11,15 @@ from content_agent.integrations.gemini_video import (
 
 
 class FakeResponse:
-    def __init__(self, content):
+    def __init__(self, content, *, status_code=200):
         self._content = content
+        self.status_code = status_code
+        self.request = httpx.Request("POST", "https://openrouter.test/chat/completions")
 
     def raise_for_status(self):
-        return None
+        if self.status_code >= 400:
+            response = httpx.Response(self.status_code, request=self.request)
+            raise httpx.HTTPStatusError("bad", request=self.request, response=response)
 
     def json(self):
         return {"choices": [{"message": {"content": self._content}}]}
@@ -29,40 +33,56 @@ def _client(content=None, *, post=None, fetch=None):
     )
 
 
-_ITEM = {"platform": "douyin", "platform_content_id": "c1"}
+_ITEM = {
+    "platform": "douyin",
+    "platform_content_id": "c1",
+    "matched_search_queries": ["中医养生"],
+}
 _MEDIA = {"play_url": "http://v/x"}
 _CTX = {"ext_data": {"evidence_pack": {"seed_terms": ["中医养生"]}}}
 
 
-def test_analyze_returns_four_fields():
-    body = '{"fit_senior_50plus": true, "fit_confidence": 0.85, "relevance_score": 0.7, "reason": "贴切"}'
+def test_analyze_returns_v4_relevance_fields():
+    body = '{"query_relevance_score": 83, "query_relevance_reason": "贴切"}'
+
     result = _client(body).analyze(_ITEM, _MEDIA, _CTX)
+
     assert result == {
-        "fit_senior_50plus": True,
-        "fit_confidence": 0.85,
-        "relevance_score": 0.7,
-        "reason": "贴切",
+        "schema_version": "v4_gemini_query_relevance.v1",
+        "query_text": "中医养生",
+        "query_relevance_score": 83.0,
+        "query_relevance_reason": "贴切",
+        "final_status": "ok",
+        "retry_count": 0,
     }
 
 
-def test_analyze_parses_json_in_markdown_fence():
-    body = '```json\n{"fit_senior_50plus": false, "fit_confidence": 0.4, "relevance_score": 0.2, "reason": "x"}\n```'
-    result = _client(body).analyze(_ITEM, _MEDIA, _CTX)
-    assert result["fit_senior_50plus"] is False
-    assert result["fit_confidence"] == 0.4
+def test_analyze_prompt_does_not_contain_legacy_50_plus_fields():
+    seen = {}
+
+    def post(*args, **kwargs):
+        seen.update(kwargs)
+        return FakeResponse('{"query_relevance_score": 70, "query_relevance_reason": "x"}')
 
+    _client(post=post).analyze(_ITEM, _MEDIA, _CTX)
+
+    prompt = seen["json"]["messages"][1]["content"][0]["text"]
+    assert "fit_senior_50plus" not in prompt
+    assert "fit_confidence" not in prompt
+    assert '"relevance_score"' not in prompt
+    assert "query_relevance_score" in prompt
+
+
+def test_analyze_parses_json_in_markdown_fence_and_clamps_score():
+    body = '```json\n{"query_relevance_score": 180, "query_relevance_reason": "x"}\n```'
 
-def test_analyze_clamps_out_of_range_scores():
-    body = '{"fit_senior_50plus": true, "fit_confidence": 1.7, "relevance_score": -3, "reason": "x"}'
     result = _client(body).analyze(_ITEM, _MEDIA, _CTX)
-    assert result["fit_confidence"] == 1.0
-    assert result["relevance_score"] == 0.0
+
+    assert result["query_relevance_score"] == 100.0
 
 
 def test_analyze_passes_raw_save_path_when_dir_configured(tmp_path):
-    # 2026-06-12 拍板: 配置留档目录后,analyze 按 {dir}/{run_id}/{platform_content_id}.mp4
-    # 把路径传给 fetch_fn;未配置(默认 None)不传该 kwarg——老签名 lambda 桩零改仍可用。
-    body = '{"fit_senior_50plus": true, "fit_confidence": 0.9, "relevance_score": 0.8, "reason": "x"}'
+    body = '{"query_relevance_score": 80, "query_relevance_reason": "x"}'
     seen = {}
 
     def fetch(play_url, platform, **kwargs):
@@ -76,42 +96,64 @@ def test_analyze_passes_raw_save_path_when_dir_configured(tmp_path):
         raw_video_save_dir=str(tmp_path),
     )
     item = {**_ITEM, "run_id": "run_1"}
-    client.analyze(item, _MEDIA, _CTX)
-    assert seen["save_raw_to"] == str(tmp_path / "run_1" / "c1.mp4")
+    result = client.analyze(item, _MEDIA, _CTX)
 
-    result = _client(body).analyze(item, _MEDIA, _CTX)
-    assert result["fit_senior_50plus"] is True
+    assert seen["save_raw_to"] == str(tmp_path / "run_1" / "c1.mp4")
+    assert result["query_relevance_score"] == 80.0
 
 
-def test_analyze_no_play_url_returns_fail():
+def test_analyze_no_play_url_returns_v4_fail():
     result = _client("{}").analyze(_ITEM, {}, _CTX)
-    assert result["status"] == "failed"
-    assert result["reason"] == "no_play_url"
 
+    assert result["final_status"] == "failed"
+    assert result["failure_type"] == "no_play_url"
+    assert result["retry_count"] == 1
 
-def test_analyze_video_fetch_failure_returns_fail():
+
+def test_analyze_video_fetch_failure_returns_v4_fail():
     def boom(play_url, platform):
         raise RuntimeError("dl")
+
     result = _client("{}", fetch=boom).analyze(_ITEM, _MEDIA, _CTX)
-    assert result["status"] == "failed"
-    assert "video_fetch_failed" in result["reason"]
+
+    assert result["final_status"] == "failed"
+    assert result["failure_type"] == "video_fetch_failed"
+    assert result["exception_type"] == "RuntimeError"
 
 
-def test_analyze_http_error_returns_fail():
+def test_analyze_retryable_http_error_retries_once_then_fails():
+    calls = []
+
     def post(*a, **k):
-        raise httpx.ConnectError("boom")
+        calls.append(1)
+        return FakeResponse("{}", status_code=500)
+
     result = _client(post=post).analyze(_ITEM, _MEDIA, _CTX)
-    assert result["status"] == "failed"
-    assert "gemini_http_error" in result["reason"]
+
+    assert len(calls) == 2
+    assert result["failure_type"] == "gemini_http_error"
+    assert result["http_status_code"] == 500
+    assert result["retry_count"] == 2
 
 
-def test_analyze_bad_json_returns_fail():
-    result = _client("not-json").analyze(_ITEM, _MEDIA, _CTX)
-    assert result["status"] == "failed"
-    assert "gemini_response_invalid" in result["reason"]
+def test_analyze_bad_json_retries_once_then_fails():
+    calls = []
+
+    def post(*a, **k):
+        calls.append(1)
+        return FakeResponse("not-json")
+
+    result = _client(post=post).analyze(_ITEM, _MEDIA, _CTX)
+
+    assert len(calls) == 2
+    assert result["failure_type"] == "gemini_response_invalid"
+    assert result["retry_count"] == 2
 
 
 def test_from_env_missing_key_returns_missing_client():
     client = GeminiVideoClient.from_env({})
+
     assert isinstance(client, MissingGeminiVideoClient)
-    assert client.analyze(_ITEM, _MEDIA, _CTX)["status"] == "failed"
+    result = client.analyze(_ITEM, _MEDIA, _CTX)
+    assert result["final_status"] == "failed"
+    assert result["failure_type"] == "gemini_config_missing"

+ 191 - 0
tests/test_kuaishou_client.py

@@ -0,0 +1,191 @@
+"""V4-M2: 快手 client search/detail/account_info normalization tests."""
+
+from __future__ import annotations
+
+import httpx
+import pytest
+
+from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations.kuaishou import CrawapiKuaishouClient
+
+
+class FakeHttpClient:
+    def __init__(self, responses):
+        self.responses = list(responses)
+        self.requests = []
+
+    def post(self, url, json, headers, timeout):
+        self.requests.append({"url": url, "json": json, "headers": headers, "timeout": timeout})
+        response = self.responses.pop(0)
+        if isinstance(response, Exception):
+            raise response
+        return response
+
+
+def _response(status_code, data):
+    return httpx.Response(
+        status_code,
+        json=data,
+        request=httpx.Request("POST", "http://crawler.test/x"),
+    )
+
+
+def _query():
+    return {
+        "search_query_id": "q_001",
+        "search_query": "早上好",
+        "search_query_generation_method": "item_single",
+        "discovery_start_source": "pattern_itemset",
+    }
+
+
+def _client(responses):
+    return CrawapiKuaishouClient(
+        base_url="http://crawler.test",
+        http_client=FakeHttpClient(responses),
+    )
+
+
+def _item(content_id="ks_001"):
+    return {
+        "channel_content_id": content_id,
+        "content_link": f"https://www.kuaishou.com/short-video/{content_id}",
+        "title": "早上好 #祝福",
+        "body_text": "早安视频",
+        "topic_list": ["早上好"],
+        "content_type": "video",
+        "video_url_list": [{"video_url": "https://v.kwaicdn.test/a.mp4"}],
+        "channel_account_id": "3xfkwajatdh7p7i",
+        "channel_account_name": "祝福账号",
+        "view_count": 12345,
+        "like_count": 234,
+        "collect_count": 12,
+        "comment_count": 34,
+        "share_count": 56,
+        "publish_timestamp": 1780904037000,
+    }
+
+
+def test_kuaishou_search_maps_canonical_fields():
+    client = _client([
+        _response(200, {"code": 0, "data": {"data": [_item()], "has_more": False}})
+    ])
+
+    result = client.search(_query())[0]
+
+    assert client.http_client.requests[0]["url"].endswith("/crawler/kuai_shou/keyword_v2")
+    assert client.http_client.requests[0]["json"] == {"keyword": "早上好"}
+    assert result["platform"] == "kuaishou"
+    assert result["platform_content_id"] == "ks_001"
+    assert result["platform_content_url"].endswith("/ks_001")
+    assert result["platform_author_id"] == "3xfkwajatdh7p7i"
+    assert result["author_display_name"] == "祝福账号"
+    assert result["play_url"] == "https://v.kwaicdn.test/a.mp4"
+    assert result["statistics"] == {
+        "digg_count": 234,
+        "comment_count": 34,
+        "share_count": 56,
+        "collect_count": 12,
+        "play_count": 12345,
+    }
+    assert result["tags"] == ["#早上好"]
+    assert result["create_time"] == 1780904037
+    assert result["platform_raw_payload"]["channel_content_id"] == "ks_001"
+
+
+def test_kuaishou_search_limits_to_five_by_default():
+    items = [_item(f"ks_{index}") for index in range(6)]
+    client = _client([_response(200, {"code": 0, "data": {"data": items}})])
+
+    results = client.search(_query())
+
+    assert [result["platform_content_id"] for result in results] == [
+        "ks_0",
+        "ks_1",
+        "ks_2",
+        "ks_3",
+        "ks_4",
+    ]
+
+
+def test_kuaishou_detail_maps_canonical_fields():
+    detail = _item("ks_detail")
+    detail["share_count"] = 78
+    client = _client([_response(200, {"code": 0, "data": {"data": detail}})])
+
+    result = client.fetch_detail("ks_detail")
+
+    assert client.http_client.requests[0]["url"].endswith("/crawler/kuai_shou/detail")
+    assert client.http_client.requests[0]["json"] == {"content_id": "ks_detail"}
+    assert result["platform"] == "kuaishou"
+    assert result["platform_content_id"] == "ks_detail"
+    assert result["platform_content_url"].endswith("/ks_detail")
+    assert result["statistics"]["play_count"] == 12345
+    assert result["statistics"]["share_count"] == 78
+    assert result["play_url"] == "https://v.kwaicdn.test/a.mp4"
+    assert result["create_time"] == 1780904037
+
+
+def test_kuaishou_account_info_maps_profile_snapshot():
+    account = {
+        "channel_account_id": "3xfkwajatdh7p7i",
+        "ks_id": "ksid_001",
+        "digit_id": "123456",
+        "account_link": "https://www.kuaishou.com/profile/3xfkwajatdh7p7i",
+        "account_name": "祝福账号",
+        "avatar_url": "https://avatar.test/a.jpg",
+        "gender": "unknown",
+        "description": "每天祝福",
+        "tags": ["北京", "白羊座"],
+        "follower_count": 1000,
+        "publish_count": 88,
+        "like_count": 9000,
+        "update_timestamp": 1780904037000,
+    }
+    client = _client([_response(200, {"code": 0, "data": {"data": account}})])
+
+    result = client.fetch_account_info("3xfkwajatdh7p7i", is_cache=False)
+
+    assert client.http_client.requests[0]["url"].endswith("/crawler/kuai_shou/account_info")
+    assert client.http_client.requests[0]["json"] == {
+        "account_id": "3xfkwajatdh7p7i",
+        "is_cache": False,
+    }
+    assert result["platform_author_id"] == "3xfkwajatdh7p7i"
+    assert result["author_display_name"] == "祝福账号"
+    assert result["profile_snapshot"]["tags"] == ["北京", "白羊座"]
+    assert "tags" not in {key for key in result if key != "profile_snapshot"}
+
+
+def test_kuaishou_http_429_maps_to_platform_rate_limited():
+    client = _client([_response(429, {"error": "too many"})])
+
+    with pytest.raises(ContentAgentError) as exc_info:
+        client.search(_query())
+
+    assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
+    assert exc_info.value.detail["status_code"] == 429
+
+
+def test_kuaishou_bad_json_is_sanitized():
+    client = CrawapiKuaishouClient(
+        base_url="http://crawler.test",
+        http_client=FakeHttpClient(
+            [
+                httpx.Response(
+                    200,
+                    content=b"not json",
+                    request=httpx.Request("POST", "http://crawler.test/x"),
+                )
+            ]
+        ),
+    )
+
+    with pytest.raises(RuntimeError, match="keyword_search failed: bad_json"):
+        client.search(_query())
+
+
+def test_kuaishou_has_no_author_works_client():
+    client = _client([])
+
+    assert not hasattr(client, "fetch_author_works")

+ 10 - 29
tests/test_p7_final_output.py

@@ -1,5 +1,3 @@
-import json
-
 from content_agent.run_service import RunService
 from content_agent.schemas import RunStartRequest
 from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
@@ -17,45 +15,28 @@ def _start_mock_run(tmp_path):
     return service, state["run_id"]
 
 
-def test_keep_content_for_review_is_visible_but_not_pooled(tmp_path):
+def test_rejected_content_is_visible_but_not_pooled(tmp_path):
     service, run_id = _start_mock_run(tmp_path)
 
     final_output = service.read_json(run_id, "final_output.json")
 
     content_ids = {asset["platform_content_id"] for asset in final_output["content_assets"]}
-    review_ids = {record["platform_content_id"] for record in final_output["review_records"]}
-    assert final_output["summary"]["review_content_count"] == len(final_output["review_records"])
-    assert review_ids
-    assert not review_ids & content_ids
-    assert {record["review_status"] for record in final_output["review_records"]} == {
-        "pending_review"
-    }
-    assert {record["final_asset_status"] for record in final_output["review_records"]} == {
-        "review_only"
-    }
+    reject_ids = {record["decision_target_id"] for record in final_output["reject_records"]}
+    assert final_output["summary"]["rejected_content_count"] == len(final_output["reject_records"])
+    assert reject_ids
+    assert not reject_ids & content_ids
     assert final_output["validation_status"] == "pass"
     assert final_output["summary"]["run_path_complete"] is True
     assert final_output["summary"]["trace_complete"] is True
 
 
-def test_review_record_path_refs_are_validated(tmp_path):
+def test_reject_records_carry_source_evidence_refs(tmp_path):
     service, run_id = _start_mock_run(tmp_path)
 
-    final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
-    final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
-    final_output["review_records"][0]["source_path_record_ids"] = ["missing_path"]
-    final_output_path.write_text(
-        json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
-        encoding="utf-8",
-    )
-
-    validation = service.validate_run(run_id)
-    assert validation["status"] == "fail"
-    assert any(finding["check_id"] == "missing_path_ref" for finding in validation["findings"])
-    assert any(
-        finding["check_id"] == "completeness_mismatch"
-        for finding in validation["findings"]
-    )
+    final_output = service.read_json(run_id, "final_output.json")
+    assert final_output["reject_records"][0]["source_evidence"]["source_post_id"]
+    assert final_output["reject_records"][0]["source_evidence"]["discovered_platform_content_id"]
+    assert service.validate_run(run_id)["status"] == "pass"
 
 
 def test_run_service_rewrites_final_output_with_final_validation_status(tmp_path):

+ 4 - 4
tests/test_p7_policy_walk_versions.py

@@ -16,7 +16,7 @@ def test_final_output_separates_policy_and_walk_strategy_versions(tmp_path):
 
     final_output = service.read_json(state["run_id"], "final_output.json")
     assert final_output["policy"]["policy_bundle_id"] == "douyin_policy_bundle_v1"
-    assert final_output["policy"]["strategy_version"] == "V1"
+    assert final_output["policy"]["strategy_version"] == "V4"
     assert final_output["policy"]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
     assert final_output["walk_strategy"]["walk_strategy_id"] == "douyin_walk_strategy_v1"
     assert final_output["walk_strategy"]["walk_strategy_version"] == "V1.0"
@@ -26,12 +26,12 @@ def test_final_output_separates_policy_and_walk_strategy_versions(tmp_path):
 
 
 def test_policy_run_record_uses_walk_strategy_version_from_walk_config():
-    policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
+    policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
     record = _policy_run_record_from_state(
         {
             "run_id": "run_001",
             "policy_run_id": "policy_run_001",
-            "strategy_version": "V1",
+            "strategy_version": "V4",
             "policy_bundle_id": policy_bundle["policy_bundle_id"],
             "policy_bundle": policy_bundle,
             "rule_decisions": [],
@@ -40,5 +40,5 @@ def test_policy_run_record_uses_walk_strategy_version_from_walk_config():
         }
     )
 
-    assert record["strategy_version"] == "V1"
+    assert record["strategy_version"] == "V4"
     assert record["walk_strategy_version"] == "V1.0"

+ 26 - 0
tests/test_platform_access.py

@@ -216,3 +216,29 @@ def test_platform_access_counts_runtime_error_as_platform_request_failed():
     failure = result["query_failures"][0]
     assert failure["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
     assert failure["error_detail"]["exception_type"] == "RuntimeError"
+
+
+def test_platform_access_accepts_search_only_client():
+    class SearchOnlyClient:
+        def search(self, search_query):
+            return [
+                {
+                    "content_discovery_id": f"{search_query['search_query_id']}_content_001",
+                    "search_query_id": search_query["search_query_id"],
+                    "platform_content_id": "ks_001",
+                    "description": "搜索协议只要求 search",
+                }
+            ]
+
+    search_queries = [
+        {
+            "search_query_id": "q_001",
+            "search_query": "快手祝福",
+            "search_query_generation_method": "item_single",
+        }
+    ]
+
+    result = platform_access.run(search_queries, SearchOnlyClient())
+
+    assert result["platform_results"][0]["platform_content_id"] == "ks_001"
+    assert result["query_failures"] == []

+ 79 - 0
tests/test_platform_observable_performance.py

@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+from content_agent.business_modules.content_discovery.platform_observable_performance import (
+    performance_score,
+)
+
+
+def test_douyin_play_count_is_natural_missing_and_score_is_bounded():
+    result = performance_score(
+        {
+            "digg_count": 100000,
+            "comment_count": 1000,
+            "share_count": 500,
+            "collect_count": 1000,
+        },
+        "douyin",
+    )
+
+    assert 0 <= result["platform_performance_score"] <= 100
+    assert {row["field"] for row in result["platform_performance_components"]} == {
+        "statistics.digg_count",
+        "statistics.comment_count",
+        "statistics.share_count",
+        "statistics.collect_count",
+    }
+    assert result["missing_observable_fields"] == [
+        {
+            "field": "statistics.play_count",
+            "missing_type": "natural_platform_missing",
+            "platform": "douyin",
+            "evidence": "跨平台字段映射.json",
+        }
+    ]
+    assert "platform_heat" not in result
+
+
+def test_kuaishou_all_five_fields_supported():
+    result = performance_score(
+        {
+            "play_count": 10000,
+            "digg_count": 2000,
+            "comment_count": 200,
+            "share_count": 100,
+            "collect_count": 100,
+        },
+        "kuaishou",
+    )
+
+    assert len(result["platform_performance_components"]) == 5
+    assert result["missing_observable_fields"] == []
+    assert result["platform_performance_score"] is not None
+
+
+def test_shipinhao_only_digg_supported_and_other_fields_natural_missing():
+    result = performance_score({"digg_count": 500}, "shipinhao")
+
+    assert [row["field"] for row in result["platform_performance_components"]] == [
+        "statistics.digg_count"
+    ]
+    assert {row["field"] for row in result["missing_observable_fields"]} == {
+        "statistics.comment_count",
+        "statistics.share_count",
+        "statistics.collect_count",
+        "statistics.play_count",
+    }
+
+
+def test_supported_field_absent_is_runtime_missing():
+    result = performance_score({"digg_count": 10}, "douyin")
+
+    runtime_missing = [
+        row for row in result["missing_observable_fields"]
+        if row.get("missing_type") == "runtime_missing"
+    ]
+    assert {row["field"] for row in runtime_missing} == {
+        "statistics.comment_count",
+        "statistics.share_count",
+        "statistics.collect_count",
+    }

+ 11 - 11
tests/test_policy_dispatch.py

@@ -14,7 +14,7 @@ RULE_PACK_JSON = Path("product_documents/规则包/douyin_rule_packs.v1.json")
 
 
 def test_policy_bundle_uses_content_dispatch_and_exports_runtime_contracts():
-    bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
+    bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
 
     assert bundle["dispatch_id"] == "dispatch_content"
     assert bundle["runtime_stage"] == "V1.0"
@@ -42,7 +42,7 @@ def test_policy_bundle_fails_when_content_dispatch_is_missing(tmp_path):
     path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
 
     with pytest.raises(ValueError, match="dispatch not found for Content/video"):
-        JsonPolicyBundleStore(root).load_policy_bundle("V1")
+        JsonPolicyBundleStore(root).load_policy_bundle("V4")
 
 
 def test_dispatch_conflict_raises_config_error_with_rule_pack_ids(tmp_path):
@@ -56,7 +56,7 @@ def test_dispatch_conflict_raises_config_error_with_rule_pack_ids(tmp_path):
     path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
 
     with pytest.raises(ContentAgentError) as exc_info:
-        JsonPolicyBundleStore(root).load_policy_bundle("V1")
+        JsonPolicyBundleStore(root).load_policy_bundle("V4")
     error = exc_info.value
     assert error.error_code == ErrorCode.CONFIG_RULE_PACK_DISPATCH_CONFLICT
     assert "douyin_content_discovery_rule_pack_v1" in error.message
@@ -69,7 +69,7 @@ def test_dispatch_conflict_raises_config_error_with_rule_pack_ids(tmp_path):
 def test_select_dispatch_still_returns_content_for_default_bundle():
     rule_package = json.loads(RULE_PACK_JSON.read_text(encoding="utf-8"))
 
-    dispatch = _select_dispatch(rule_package, "V1")
+    dispatch = _select_dispatch(rule_package, "V4")
 
     assert dispatch["dispatch_id"] == "dispatch_content"
     assert dispatch["target_entity"] == "Content"
@@ -85,7 +85,7 @@ def _synthetic_author_dispatch(rule_package):
         rule_pack_id="author_test_rule_pack_v1",
         dispatch_enabled=True,
         runtime_stage="V1.0",
-        strategy_version="V1",
+        strategy_version="V4",
     )
     return author
 
@@ -96,14 +96,14 @@ def test_select_dispatch_can_select_non_content_when_enabled():
     rule_package["rule_pack_dispatch"].append(author)
 
     dispatch = _select_dispatch(
-        rule_package, "V1", target_entity="Author", content_format=author["content_format"]
+        rule_package, "V4", target_entity="Author", content_format=author["content_format"]
     )
 
     assert dispatch["rule_pack_id"] == "author_test_rule_pack_v1"
 
 
 def test_load_policy_bundle_keeps_content_shim():
-    bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
+    bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
 
     assert bundle["target_entity"] == "Content"
     assert bundle["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
@@ -111,7 +111,7 @@ def test_load_policy_bundle_keeps_content_shim():
 
 
 def test_load_policy_bundle_exposes_rule_pack_by_entity():
-    bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
+    bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
 
     by_entity = bundle["rule_pack_by_entity"]
     assert set(by_entity) == {"Content"}
@@ -129,7 +129,7 @@ def test_enabled_author_dispatch_can_be_found_by_entity_without_replacing_conten
     data["rule_packs"].append(author_pack)
     path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
 
-    bundle = JsonPolicyBundleStore(root).load_policy_bundle("V1")
+    bundle = JsonPolicyBundleStore(root).load_policy_bundle("V4")
 
     assert bundle["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
     assert set(bundle["rule_pack_by_entity"]) == {"Content", "Author"}
@@ -146,7 +146,7 @@ def test_policy_bundle_fails_when_dispatch_points_to_missing_rule_pack(tmp_path)
     path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
 
     with pytest.raises(ValueError, match="dispatch dispatch_content matched 0 enabled rule packs"):
-        JsonPolicyBundleStore(root).load_policy_bundle("V1")
+        JsonPolicyBundleStore(root).load_policy_bundle("V4")
 
 
 def test_policy_bundle_fails_when_dispatch_points_to_disabled_rule_pack(tmp_path):
@@ -159,7 +159,7 @@ def test_policy_bundle_fails_when_dispatch_points_to_disabled_rule_pack(tmp_path
     path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
 
     with pytest.raises(ValueError, match="dispatch dispatch_content matched 0 enabled rule packs"):
-        JsonPolicyBundleStore(root).load_policy_bundle("V1")
+        JsonPolicyBundleStore(root).load_policy_bundle("V4")
 
 
 def _copy_policy_files(tmp_path: Path) -> Path:

+ 4 - 4
tests/test_policy_replay_data.py

@@ -21,10 +21,10 @@ def test_rule_decisions_and_policy_run_record_include_replay_metadata(tmp_path):
     assert replay["policy_bundle_hash"] == state["policy_bundle"]["policy_bundle_hash"]
     assert replay["dispatch_id"] == "dispatch_content"
     assert replay["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
-    # M3: mock judgment scores relevance 60 + zero platform_heat = 60 → review band.
-    assert replay["matched_threshold"] == "60<=score<=69"
-    assert replay["effect_mapping_id"] == "map_keep_for_review_pending"
-    assert replay["matched_scoring_rules"] == ["score_relevance_high"]
+    assert replay["strategy_version"] == "V4"
+    assert replay["effect_mapping_id"] == "map_reject_failed"
+    assert replay["allow_walk"] is False
+    assert replay["walk_gate_snapshot"]["query_relevance_score"] == 80
 
     policy_run = runtime.policy_runs[0]
     assert policy_run["policy_bundle_hash"] == state["policy_bundle"]["policy_bundle_hash"]

+ 10 - 12
tests/test_query_effect_aggregation.py

@@ -21,22 +21,20 @@ def test_search_clues_aggregate_query_effect_status_from_decisions(tmp_path):
         for clue in service.read_jsonl(state["run_id"], "search_clues.jsonl")
     }
 
-    # M3 受控变化: mock content scores relevance 60 + zero platform_heat = 60, so both
-    # of q_001's contents land in the review band → query aggregates to pending.
-    assert clues["q_001"]["search_query_effect_status"] == "pending"
-    assert clues["q_001"]["effect_status_counts"] == {"pending": 2}
-    assert clues["q_001"]["query_aggregation_id"] == "agg_query_pending"
-    assert clues["q_001"]["raw_payload"]["query_aggregation_id"] == "agg_query_pending"
-    assert clues["q_001"]["walk_next_step"] == "review_later_or_small_budget"
-    assert clues["q_002"]["search_query_effect_status"] == "pending"
-    assert clues["q_002"]["effect_status_counts"] == {"pending": 1}
-    assert clues["q_002"]["query_aggregation_id"] == "agg_query_pending"
-    assert clues["q_002"]["walk_next_step"] == "review_later_or_small_budget"
+    assert clues["q_001"]["search_query_effect_status"] == "failed"
+    assert clues["q_001"]["effect_status_counts"] == {"failed": 2}
+    assert clues["q_001"]["query_aggregation_id"] == "agg_query_failed"
+    assert clues["q_001"]["raw_payload"]["query_aggregation_id"] == "agg_query_failed"
+    assert clues["q_001"]["walk_next_step"] == "stop_search_query"
+    assert clues["q_002"]["search_query_effect_status"] == "failed"
+    assert clues["q_002"]["effect_status_counts"] == {"failed": 1}
+    assert clues["q_002"]["query_aggregation_id"] == "agg_query_failed"
+    assert clues["q_002"]["walk_next_step"] == "stop_search_query"
 
 
 def test_rule_blocked_only_query_aggregates_to_rule_blocked():
     runtime = _RecordingRuntime()
-    policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
+    policy_bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
 
     recorder.run(
         run_id="run_001",

+ 16 - 27
tests/test_replay_gemini_seam.py

@@ -1,11 +1,4 @@
-"""V3-M3: gemini_video_client drives the judgment seam end-to-end.
-
-M3 受控变化: 画像门槛退役,Gemini 4 字段(fit_senior_50plus / fit_confidence /
-relevance_score / judge_status)经 recall_pattern 落库,再走相关性(max60)+
-平台热度(max40)打分。这里锁三结局:default(pool stub, relevance 0.85)→ 进池为主,
-review stub(relevance 0.45)→ 按分降级,fail(judge_status=failed)→ 全部
-content_judge_failed 待复看。
-"""
+"""V4-M3: gemini_video_client drives query relevance scoring end-to-end."""
 
 from __future__ import annotations
 
@@ -18,24 +11,21 @@ from tests.replay_harness import replay_case
 
 
 def test_replay_default_pool_stub_scores_into_pool(tmp_path):
-    # 默认 FakeGeminiVideoClient = pool stub(relevance 0.85 → relevance 60)。
-    # R3 第二步: 抖音四字段热度复合后,高转发高收藏的一条从复看升进池 → 3 进池/1 复看。
     artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
     assert artifacts.state["status"] == "success"
-    assert artifacts.summary["pooled_content_count"] == 3
+    assert artifacts.summary["pooled_content_count"] == 2
     assert artifacts.summary["review_content_count"] == 1
-    assert artifacts.summary["rejected_content_count"] == 0
+    assert artifacts.summary["rejected_content_count"] == 1
     assert [d["decision_reason_code"] for d in artifacts.decisions] == [
-        "content_score_pool",
-        "content_score_pool",
-        "content_score_pool",
-        "content_score_review",
+        "v4_query_and_platform_pass",
+        "v4_query_and_platform_pass",
+        "v4_score_review_needed",
+        "v4_query_or_score_below_threshold",
     ]
+    assert all(d["scorecard"]["schema_version"] == "v4_scorecard.v1" for d in artifacts.decisions)
 
 
 def test_replay_review_stub_scores_by_relevance(tmp_path):
-    # review stub: relevance_score 0.45 → relevance 维只拿 20(2026-06-12 档位 25→20)。叠加各 item 热度后
-    # 仅最高热的一条进 60-69 复看带,其余 <60 被拒(无 pending、无进池)。
     artifacts = replay_case(
         "real_id45",
         runtime_root=tmp_path / "rt",
@@ -43,19 +33,17 @@ def test_replay_review_stub_scores_by_relevance(tmp_path):
     )
     assert artifacts.state["status"] == "success"
     assert artifacts.summary["pooled_content_count"] == 0
-    assert artifacts.summary["review_content_count"] == 1
-    assert artifacts.summary["rejected_content_count"] == 3
+    assert artifacts.summary["review_content_count"] == 2
+    assert artifacts.summary["rejected_content_count"] == 2
     assert sorted(d["decision_reason_code"] for d in artifacts.decisions) == [
-        "content_score_reject",
-        "content_score_reject",
-        "content_score_reject",
-        "content_score_review",
+        "v4_query_or_score_below_threshold",
+        "v4_query_or_score_below_threshold",
+        "v4_score_review_needed",
+        "v4_score_review_needed",
     ]
 
 
 def test_replay_fail_stub_routes_to_judge_failed_review(tmp_path):
-    # judge_status=failed → 硬门槛 content_judge_failed,全部 KEEP_CONTENT_FOR_REVIEW
-    # 待复看(不进池、不拒)。
     artifacts = replay_case(
         "real_id45",
         runtime_root=tmp_path / "rt",
@@ -67,7 +55,8 @@ def test_replay_fail_stub_routes_to_judge_failed_review(tmp_path):
     assert artifacts.summary["rejected_content_count"] == 0
     assert all(
         d["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
-        and d["decision_reason_code"] == "content_judge_failed"
+        and d["decision_reason_code"] == "v4_technical_retry_needed"
+        and d["decision_replay_data"]["allow_walk"] is False
         for d in artifacts.decisions
     )
 

+ 39 - 56
tests/test_rule_decision_effect_status.py

@@ -3,77 +3,60 @@ from __future__ import annotations
 from copy import deepcopy
 
 from content_agent.business_modules.rule_judgment.evaluator import decide
-from content_agent.run_service import RunService
-from content_agent.schemas import RunStartRequest
-from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
+from tests.test_rule_judgment_scorecard import _bundle, _policy
 
 
-def test_effect_status_mapping_for_success_pending_failed_and_rule_blocked(tmp_path):
-    state = _state(tmp_path)
+def test_v4_effect_status_mapping_for_pool_review_reject_technical_and_blocked():
+    policy = _policy()
 
-    # M3: success = relevance 60 + platform_heat 40 (heat >= 0.8) = 100 (>= 70 pool).
-    success_bundle = deepcopy(state["evidence_bundles"][0])
-    success_bundle["content_engagement_metrics"]["platform_heat"] = 0.9
-    success = decide(
-        state["run_id"],
-        state["policy_run_id"],
-        1,
-        success_bundle,
-        state["policy_bundle"],
-    )
+    success = decide("run_1", "policy_1", 1, _bundle(query=80, platform=70), policy)
     assert success["decision_action"] == "ADD_TO_CONTENT_POOL"
     assert success["search_query_effect_status"] == "success"
     assert success["decision_replay_data"]["effect_mapping_id"] == "map_add_to_pool_success"
 
-    # M3: pending = relevance 60 + zero platform_heat = 60 (60-69 review band).
-    pending_bundle = deepcopy(state["evidence_bundles"][0])
-    pending_bundle["content_engagement_metrics"]["platform_heat"] = 0.0
-    pending = decide(
-        state["run_id"],
-        state["policy_run_id"],
-        2,
-        pending_bundle,
-        state["policy_bundle"],
-    )
+    pending = decide("run_1", "policy_1", 2, _bundle(query=60, platform=60), policy)
     assert pending["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
     assert pending["search_query_effect_status"] == "pending"
     assert pending["decision_replay_data"]["effect_mapping_id"] == "map_keep_for_review_pending"
 
-    # M3: failed = no scoring rule matches either active dimension (relevance and
-    # platform_heat both below their lowest gte band) → missing_score.
-    failed_bundle = deepcopy(state["evidence_bundles"][0])
-    failed_bundle["pattern_match_result"]["relevance_score"] = 0.0
-    failed_bundle["content_engagement_metrics"]["platform_heat"] = 0.0
-    failed = decide(
-        state["run_id"],
-        state["policy_run_id"],
-        3,
-        failed_bundle,
-        state["policy_bundle"],
-    )
-    assert failed["decision_reason_code"] == "missing_score"
+    failed = decide("run_1", "policy_1", 3, _bundle(query=54, platform=100), policy)
+    assert failed["decision_action"] == "REJECT_CONTENT"
     assert failed["search_query_effect_status"] == "failed"
     assert failed["decision_replay_data"]["effect_mapping_id"] == "map_reject_failed"
 
-    blocked_bundle = deepcopy(state["evidence_bundles"][0])
-    blocked_bundle["source_evidence"] = {}
-    blocked = decide(
-        state["run_id"],
-        state["policy_run_id"],
-        4,
-        blocked_bundle,
-        state["policy_bundle"],
+    technical = decide("run_1", "policy_1", 4, _bundle(query=None, platform=70), policy)
+    assert technical["decision_reason_code"] == "v4_technical_retry_needed"
+    assert technical["search_query_effect_status"] == "pending"
+    assert technical["decision_replay_data"]["effect_mapping_id"] == "map_keep_for_review_pending"
+
+    blocked_policy = deepcopy(policy)
+    blocked_policy["rule_pack"]["hard_gates"] = [
+        {
+            "gate_id": "missing_source_evidence",
+            "when": {"field": "source_evidence", "op": "is_empty"},
+            "decision_action": "REJECT_CONTENT",
+            "decision_reason_code": "missing_source_evidence",
+            "severity": "fatal",
+            "stop_scoring": True,
+            "priority": 1,
+        }
+    ]
+    blocked_policy["effect_status_mapping"].append(
+        {
+            "mapping_id": "map_reject_rule_blocked",
+            "target_level": "content",
+            "decision_action": "REJECT_CONTENT",
+            "reason_category": "hard_gate",
+            "is_hard_gate": True,
+            "content_effect_status": "rule_blocked",
+            "priority": 30,
+            "enabled": True,
+        }
     )
+    blocked_bundle = _bundle(query=80, platform=70)
+    blocked_bundle["source_evidence"] = {}
+
+    blocked = decide("run_1", "policy_1", 5, blocked_bundle, blocked_policy)
     assert blocked["decision_reason_code"] == "missing_source_evidence"
     assert blocked["search_query_effect_status"] == "rule_blocked"
     assert blocked["decision_replay_data"]["effect_mapping_id"] == "map_reject_rule_blocked"
-
-
-def _state(tmp_path):
-    service = RunService(
-        runtime_root=tmp_path / "runtime" / "v1",
-        query_variant_client=FakeQueryVariantClient(),
-    )
-    return service.start_run(
-        RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
-    )

+ 7 - 16
tests/test_rule_judgment_hard_gates.py

@@ -46,19 +46,15 @@ def test_unknown_hard_gate_operator_fails_fast(tmp_path):
         )
 
 
-def test_not_fit_senior_is_a_blocking_hard_gate(tmp_path):
-    # M3: pattern_recall gate retired. The senior-fit judgment is now the blocking gate.
+def test_legacy_senior_fit_field_no_longer_triggers_hard_gate(tmp_path):
     state = _state(tmp_path)
     bundle = deepcopy(state["evidence_bundles"][0])
     bundle["pattern_match_result"]["fit_senior_50plus"] = False
 
     decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
 
-    assert decision["decision_action"] == "REJECT_CONTENT"
-    assert decision["decision_reason_code"] == "content_not_fit_senior"
-    assert decision["search_query_effect_status"] == "rule_blocked"
-    assert decision["triggered_blocking_rules"] == ["not_fit_senior"]
-    # Retired pattern-recall reason code must no longer surface.
+    assert "not_fit_senior" not in decision["triggered_blocking_rules"]
+    assert decision["decision_reason_code"] != "content_not_fit_senior"
     assert decision["decision_reason_code"] != "content_pattern_recall_required"
 
 
@@ -92,27 +88,22 @@ def test_judge_failed_review_is_config_driven_not_code_special_case(tmp_path):
     decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
 
     assert decision["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
-    assert decision["decision_reason_code"] == "content_judge_failed"
+    assert decision["decision_reason_code"] == "v4_technical_retry_needed"
     assert decision["search_query_effect_status"] == "pending"
     assert decision["triggered_blocking_rules"] == ["judge_failed"]
     assert decision["score"] is None
     assert decision["decision_replay_data"]["effect_mapping_id"] == "map_keep_for_review_pending_hard_gate"
 
 
-def test_low_confidence_below_threshold_rejects(tmp_path):
-    # M3: age_50_plus_weak gate retired. Low Gemini confidence is now the blocking gate
-    # (fit_confidence lt 0.6 -> REJECT_CONTENT / content_low_confidence / rule_blocked).
+def test_legacy_low_confidence_field_no_longer_triggers_hard_gate(tmp_path):
     state = _state(tmp_path)
     bundle = deepcopy(state["evidence_bundles"][0])
     bundle["pattern_match_result"]["fit_confidence"] = 0.4
 
     decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
 
-    assert decision["decision_action"] == "REJECT_CONTENT"
-    assert decision["decision_reason_code"] == "content_low_confidence"
-    assert decision["search_query_effect_status"] == "rule_blocked"
-    assert decision["triggered_blocking_rules"] == ["low_confidence"]
-    # Retired age gate / reason code must no longer surface.
+    assert "low_confidence" not in decision["triggered_blocking_rules"]
+    assert decision["decision_reason_code"] != "content_low_confidence"
     assert decision["decision_reason_code"] != "age_50_plus_weak"
 
 

+ 143 - 193
tests/test_rule_judgment_scorecard.py

@@ -2,216 +2,166 @@ from __future__ import annotations
 
 from copy import deepcopy
 
-import pytest
-
 from content_agent.business_modules.rule_judgment.evaluator import decide
-from content_agent.run_service import RunService
-from content_agent.schemas import RunStartRequest
-from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
-
-
-def test_scorecard_uses_active_dimensions_and_thresholds(tmp_path):
-    state = _state(tmp_path)
-    bundle = deepcopy(state["evidence_bundles"][0])
-    # M3 2-dim scorecard: relevance gte0.8 -> 60, platform_heat gte0.4 -> 20 => 80 (pool).
-    bundle["pattern_match_result"]["relevance_score"] = 0.8
-    bundle["content_engagement_metrics"]["platform_heat"] = 0.4
-
-    decision = decide(
-        state["run_id"],
-        state["policy_run_id"],
-        1,
-        bundle,
-        state["policy_bundle"],
-    )
 
+
+def test_v4_scorecard_uses_query_and_platform_50_50():
+    decision = decide("run_1", "policy_1", 1, _bundle(query=80, platform=70), _policy())
+
+    assert decision["score"] == 75
     assert decision["decision_action"] == "ADD_TO_CONTENT_POOL"
-    assert decision["score"] == 80
-    dimensions = {row["key"]: row for row in decision["scorecard"]["dimensions"]}
-    assert dimensions["relevance"]["score"] == 60
-    assert dimensions["platform_heat"]["score"] == 20
-    # 2026-06-12 清理: 5 个 deprecated 维度已从规则包物理删除,scorecard 只剩 2 个 active 维度。
-    assert set(dimensions) == {"relevance", "platform_heat"}
+    assert decision["decision_reason_code"] == "v4_query_and_platform_pass"
+    assert decision["search_query_effect_status"] == "success"
+    assert decision["scorecard"]["schema_version"] == "v4_scorecard.v1"
+    assert decision["scorecard"]["query_relevance_score"] == 80
+    assert decision["scorecard"]["platform_performance_score"] == 70
+    assert decision["decision_replay_data"]["allow_walk"] is True
+    assert decision["decision_replay_data"]["walk_gate_snapshot"] == {
+        "query_relevance_score": 80,
+        "platform_performance_score": 70,
+        "score": 75,
+    }
+
+
+def test_v4_threshold_boundaries():
+    cases = [
+        (54, 100, "REJECT_CONTENT", "v4_query_or_score_below_threshold"),
+        (55, 55, "KEEP_CONTENT_FOR_REVIEW", "v4_score_review_needed"),
+        (69, 69, "KEEP_CONTENT_FOR_REVIEW", "v4_score_review_needed"),
+        (70, 70, "ADD_TO_CONTENT_POOL", "v4_query_and_platform_pass"),
+    ]
+    for query, platform, action, reason in cases:
+        decision = decide("run_1", "policy_1", 1, _bundle(query=query, platform=platform), _policy())
+        assert decision["decision_action"] == action
+        assert decision["decision_reason_code"] == reason
 
 
-def test_rule_pack_scorecard_has_only_two_active_dimensions():
-    # 配置层钉死: 规则包里只剩 relevance + platform_heat,5 个废弃维度定义已删干净。
-    import json
-    from pathlib import Path
+def test_v4_allow_walk_requires_platform_65():
+    decision = decide("run_1", "policy_1", 1, _bundle(query=80, platform=64), _policy())
 
-    rule_pack = json.loads(
-        Path("product_documents/规则包/douyin_rule_packs.v1.json").read_text(encoding="utf-8")
-    )
-    for pack in rule_pack["rule_packs"]:
-        keys = [dim["key"] for dim in pack["scorecard"]["dimensions"]]
-        assert keys == ["relevance", "platform_heat"], keys
-        assert all(dim["runtime_status"] == "active" for dim in pack["scorecard"]["dimensions"])
-
-
-def test_missing_scoring_rules_fail_fast(tmp_path):
-    state = _state(tmp_path)
-    policy_bundle = deepcopy(state["policy_bundle"])
-    policy_bundle["rule_pack"]["scorecard"]["scoring_rules"] = []
-
-    with pytest.raises(ValueError, match="active scorecard dimensions require"):
-        decide(
-            state["run_id"],
-            state["policy_run_id"],
-            1,
-            state["evidence_bundles"][0],
-            policy_bundle,
-        )
-
-
-def test_no_scoring_evidence_uses_missing_score_policy(tmp_path):
-    state = _state(tmp_path)
-    bundle = deepcopy(state["evidence_bundles"][0])
-    # Drop evidence for both active dims (relevance + platform_heat) so no scoring rule matches.
-    bundle["pattern_match_result"].pop("relevance_score", None)
-    bundle["content_engagement_metrics"].pop("platform_heat", None)
-
-    decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
-
-    assert decision["decision_action"] == "REJECT_CONTENT"
-    assert decision["decision_reason_code"] == "missing_score"
-    assert decision["search_query_effect_status"] == "failed"
-    assert decision["score"] is None
+    assert decision["decision_action"] == "ADD_TO_CONTENT_POOL"
+    assert decision["score"] == 72
+    assert decision["decision_replay_data"]["allow_walk"] is False
 
 
-@pytest.mark.parametrize(
-    ("total_score", "expected_action", "expected_status"),
-    [
-        (59, "REJECT_CONTENT", "failed"),
-        (60, "KEEP_CONTENT_FOR_REVIEW", "pending"),
-        (69, "KEEP_CONTENT_FOR_REVIEW", "pending"),
-        (70, "ADD_TO_CONTENT_POOL", "success"),
-    ],
-)
-def test_score_threshold_boundaries(tmp_path, total_score, expected_action, expected_status):
-    state = _state(tmp_path)
-    policy_bundle = _policy_with_total_score(state["policy_bundle"], total_score)
-
-    decision = decide(
-        state["run_id"],
-        state["policy_run_id"],
-        1,
-        state["evidence_bundles"][0],
-        policy_bundle,
-    )
+def test_v4_missing_score_routes_to_technical_retry_review():
+    decision = decide("run_1", "policy_1", 1, _bundle(query=None, platform=70), _policy())
 
-    assert decision["score"] == total_score
-    assert decision["decision_action"] == expected_action
-    assert decision["search_query_effect_status"] == expected_status
-
-
-def test_scoring_rule_unknown_operator_fails_fast(tmp_path):
-    state = _state(tmp_path)
-    policy_bundle = deepcopy(state["policy_bundle"])
-    scoring_rules = policy_bundle["rule_pack"]["scorecard"]["scoring_rules"]
-    for rule in scoring_rules:
-        if rule["scoring_rule_id"] == "score_relevance_high":
-            rule["operator"] = "contains"
-
-    with pytest.raises(ValueError, match="unsupported rule operator"):
-        decide(
-            state["run_id"],
-            state["policy_run_id"],
-            1,
-            state["evidence_bundles"][0],
-            policy_bundle,
-        )
-
-
-def test_single_missing_dimension_scores_zero_and_keeps_threshold_flow(tmp_path):
-    state = _state(tmp_path)
-    bundle = deepcopy(state["evidence_bundles"][0])
-    # relevance evidence present (0.8 -> 60); platform_heat evidence absent -> scores 0, not missing_score.
-    bundle["pattern_match_result"]["relevance_score"] = 0.8
-    bundle["content_engagement_metrics"].pop("platform_heat", None)
-
-    decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
-
-    dimensions = {row["key"]: row for row in decision["scorecard"]["dimensions"]}
-    assert dimensions["platform_heat"]["score_missing"] is True
-    assert dimensions["platform_heat"]["score"] == 0
-    assert dimensions["relevance"]["score_missing"] is False
-    assert dimensions["relevance"]["score"] == 60
-    assert decision["score"] == 60
-    assert decision["decision_reason_code"] != "missing_score"
-    assert decision["scorecard"]["score_missing"] is False
-
-
-def test_all_dimensions_missing_uses_score_missing_policy(tmp_path):
-    state = _state(tmp_path)
-    bundle = deepcopy(state["evidence_bundles"][0])
-    # Both active dims (relevance + platform_heat) lack evidence -> score_missing policy.
-    bundle["pattern_match_result"].pop("relevance_score", None)
-    bundle["content_engagement_metrics"].pop("platform_heat", None)
-
-    decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
-
-    assert decision["decision_action"] == "REJECT_CONTENT"
-    assert decision["decision_reason_code"] == "missing_score"
     assert decision["score"] is None
     assert decision["scorecard"]["score_missing"] is True
-    assert all(row["score_missing"] for row in decision["scorecard"]["dimensions"])
+    assert decision["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
+    assert decision["decision_reason_code"] == "v4_technical_retry_needed"
+    assert decision["search_query_effect_status"] == "pending"
+    assert decision["decision_replay_data"]["allow_walk"] is False
 
 
-def test_dimension_missing_metadata_is_recorded(tmp_path):
-    state = _state(tmp_path)
-    bundle = deepcopy(state["evidence_bundles"][0])
-    bundle["content_engagement_metrics"].pop("platform_heat", None)
+def test_v4_scorecard_and_replay_data_do_not_contain_legacy_fields():
+    decision = decide("run_1", "policy_1", 1, _bundle(query=80, platform=70), _policy())
+    keys = _keys({"scorecard": decision["scorecard"], "replay": decision["decision_replay_data"]})
 
-    decision = decide(state["run_id"], state["policy_run_id"], 1, bundle, state["policy_bundle"])
-    assert decision["decision_replay_data"]["missing_dimensions"] == ["platform_heat"]
+    assert not (keys & {"fit_senior_50plus", "fit_confidence", "platform_heat", "relevance_score"})
 
-    full = deepcopy(state["evidence_bundles"][0])
-    full["content_engagement_metrics"]["platform_heat"] = 0.8
-    full_decision = decide(state["run_id"], state["policy_run_id"], 2, full, state["policy_bundle"])
-    assert full_decision["decision_replay_data"]["missing_dimensions"] == []
 
+def _bundle(query, platform):
+    return {
+        "source_evidence": {"source_kind": "pattern_itemset"},
+        "content": {
+            "decision_target_type": "content",
+            "decision_target_id": "content_1",
+            "platform_content_id": "content_1",
+        },
+        "pattern_match_result": {
+            "query_relevance_score": query,
+            "judge_status": "ok",
+        },
+        "content_engagement_metrics": {
+            "platform_performance": {
+                "schema_version": "v4_platform_performance.v1",
+                "platform": "douyin",
+                "platform_performance_score": platform,
+                "platform_performance_components": [
+                    {
+                        "field": "statistics.digg_count",
+                        "value": 100,
+                        "weight": 1,
+                        "normalized_score": platform or 0,
+                    }
+                ],
+                "missing_observable_fields": [],
+            }
+        },
+        "run_context": {"decision_input_snapshot_id": "evidence_bundle:run_1:content_1"},
+    }
 
-def _state(tmp_path):
-    service = RunService(
-        runtime_root=tmp_path / "runtime" / "v1",
-        query_variant_client=FakeQueryVariantClient(),
-    )
-    return service.start_run(
-        RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
-    )
 
+def _policy():
+    return deepcopy(
+        {
+            "policy_bundle_id": "policy_bundle_v4",
+            "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
+            "rule_pack_version": "4.0.0",
+            "strategy_id": "douyin_content_find_v4",
+            "strategy_version": "V4",
+            "policy_bundle_hash": "hash",
+            "dispatch_id": "dispatch_content",
+            "runtime_stage": "V1.0",
+            "rule_package_id": "douyin_rule_packs_v1",
+            "rule_pack": {
+                "input_contract": {
+                    "required_fields": [
+                        "source_evidence",
+                        "pattern_match_result.query_relevance_score",
+                        "content_engagement_metrics.platform_performance.platform_performance_score",
+                    ]
+                },
+                "hard_gates": [],
+                "scorecard": {"schema_version": "v4_scorecard.v1"},
+            },
+            "effect_status_mapping": [
+                {
+                    "mapping_id": "map_add_to_pool_success",
+                    "target_level": "content",
+                    "decision_action": "ADD_TO_CONTENT_POOL",
+                    "reason_category": "score_pass",
+                    "is_hard_gate": False,
+                    "content_effect_status": "success",
+                    "priority": 10,
+                    "enabled": True,
+                },
+                {
+                    "mapping_id": "map_keep_for_review_pending",
+                    "target_level": "content",
+                    "decision_action": "KEEP_CONTENT_FOR_REVIEW",
+                    "reason_category": "review_needed",
+                    "is_hard_gate": False,
+                    "content_effect_status": "pending",
+                    "priority": 20,
+                    "enabled": True,
+                },
+                {
+                    "mapping_id": "map_reject_failed",
+                    "target_level": "content",
+                    "decision_action": "REJECT_CONTENT",
+                    "reason_category": "score_or_data_failed",
+                    "is_hard_gate": False,
+                    "content_effect_status": "failed",
+                    "priority": 40,
+                    "enabled": True,
+                },
+            ],
+        }
+    )
 
-def _policy_with_total_score(policy_bundle, total_score):
-    """Build an exact total score from the two M3 active dims (relevance max60, platform_heat max40).
 
-    Replaces every scoring rule with one always-matching rule per active dimension whose
-    score_value sums to ``total_score`` (relevance carries up to 60, heat the remainder).
-    """
-    policy_bundle = deepcopy(policy_bundle)
-    scorecard = policy_bundle["rule_pack"]["scorecard"]
-    relevance_score = min(total_score, 60)
-    heat_score = total_score - relevance_score
-    assert heat_score <= 40, "total_score exceeds combined active-dimension caps"
-    scorecard["scoring_rules"] = [
-        {
-            "scoring_rule_id": "test_relevance_score",
-            "dimension_key": "relevance",
-            "field_path": "content.decision_target_type",
-            "operator": "eq",
-            "expected_value": "content",
-            "score_value": relevance_score,
-            "priority": 1,
-            "enabled": True,
-        },
-        {
-            "scoring_rule_id": "test_heat_score",
-            "dimension_key": "platform_heat",
-            "field_path": "content.decision_target_type",
-            "operator": "eq",
-            "expected_value": "content",
-            "score_value": heat_score,
-            "priority": 1,
-            "enabled": True,
-        },
-    ]
-    return policy_bundle
+def _keys(value):
+    if isinstance(value, dict):
+        result = set(value)
+        for child in value.values():
+            result |= _keys(child)
+        return result
+    if isinstance(value, list):
+        result = set()
+        for child in value:
+            result |= _keys(child)
+        return result
+    return set()

+ 14 - 23
tests/test_rule_pack_reading.py

@@ -18,25 +18,17 @@ def test_rule_pack_thresholds_drive_decision(tmp_path):
     )
     run_id = state["run_id"]
 
-    policy_bundle = deepcopy(state["policy_bundle"])
-    thresholds = policy_bundle["rule_pack"]["thresholds"]
-    thresholds[0]["min_score"] = 80
-    thresholds[1]["min_score"] = 70
-    thresholds[1]["max_score"] = 79
-
-    # 2-dim score(2026-06-12 贴题档位 45/25→40/20):relevance 0.6 -> 40, platform_heat 0.6 -> 30
-    # => 70, lands in the reconfigured 70<=score<=79 review band.
     bundle = deepcopy(state["evidence_bundles"][0])
-    bundle["pattern_match_result"]["relevance_score"] = 0.6
-    bundle["content_engagement_metrics"]["platform_heat"] = 0.6
+    bundle["pattern_match_result"]["query_relevance_score"] = 80
+    bundle["content_engagement_metrics"]["platform_performance"]["platform_performance_score"] = 60
 
-    decision = decide(run_id, state["policy_run_id"], 1, bundle, policy_bundle)
+    decision = decide(run_id, state["policy_run_id"], 1, bundle, state["policy_bundle"])
     assert decision["score"] == 70
-    assert decision["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
-    assert decision["decision_reason_code"] == "content_score_review"
-    assert decision["decision_replay_data"]["matched_threshold"] == "70<=score<=79"
+    assert decision["decision_action"] == "ADD_TO_CONTENT_POOL"
+    assert decision["decision_reason_code"] == "v4_query_and_platform_pass"
+    assert decision["decision_replay_data"]["allow_walk"] is False
     assert decision["policy_run_id"] == state["policy_run_id"]
-    assert decision["strategy_version"] == "V1"
+    assert decision["strategy_version"] == "V4"
 
 
 def test_rule_pack_hard_gate_reason_code_drives_decision(tmp_path):
@@ -71,15 +63,14 @@ def test_missing_score_uses_rule_pack_missing_policy(tmp_path):
     run_id = state["run_id"]
 
     bundle = deepcopy(state["evidence_bundles"][0])
-    # No evidence for either active dim -> rule pack's score_missing_policy applies.
-    bundle["pattern_match_result"].pop("relevance_score", None)
-    bundle["content_engagement_metrics"].pop("platform_heat", None)
+    bundle["pattern_match_result"]["query_relevance_score"] = None
+    bundle["content_engagement_metrics"]["platform_performance"]["platform_performance_score"] = None
 
     decision = decide(run_id, state["policy_run_id"], 1, bundle, state["policy_bundle"])
-    assert decision["decision_action"] == "REJECT_CONTENT"
-    assert decision["decision_reason_code"] == "missing_score"
-    assert decision["search_query_effect_status"] == "failed"
-    assert decision["decision_replay_data"]["score_missing_policy"]["decision_reason_code"] == "missing_score"
+    assert decision["decision_action"] == "KEEP_CONTENT_FOR_REVIEW"
+    assert decision["decision_reason_code"] == "v4_technical_retry_needed"
+    assert decision["search_query_effect_status"] == "pending"
+    assert decision["scorecard"]["score_missing"] is True
 
 
 def test_unknown_strategy_version_fails_before_rule_decision(tmp_path):
@@ -113,7 +104,7 @@ def test_rule_judgment_does_not_call_pattern_recall_integrations():
 def test_disabled_future_packs_not_marked_as_active_entity_packs():
     from content_agent.integrations.policy_json import JsonPolicyBundleStore
 
-    bundle = JsonPolicyBundleStore().load_policy_bundle("V1")
+    bundle = JsonPolicyBundleStore().load_policy_bundle("V4")
 
     assert set(bundle["rule_pack_by_entity"]) == {"Content"}
     for entity in ("Author", "Hashtag", "Path", "Budget"):

+ 99 - 0
tests/test_schema_registry_v4_contract.py

@@ -0,0 +1,99 @@
+import json
+from pathlib import Path
+
+from content_agent.integrations.database_runtime import JSON_COLUMNS_BY_TABLE, TABLE_COLUMNS
+from scripts.count_schema_registry import parse_sql_schema
+
+
+ROOT = Path(__file__).resolve().parents[1]
+REGISTRY_PATH = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry.json"
+LEGACY_REGISTRY_FIELDS = {
+    "content_agent_discovered_content_items.content_audience_profile",
+    "content_agent_rule_decisions.age_50_plus_level",
+    "content_agent_pattern_recall_evidence.decode_status",
+    "content_agent_pattern_recall_evidence.decode_task_id",
+    "content_agent_pattern_recall_evidence.matched_terms",
+    "content_agent_pattern_recall_evidence.matched_category_paths",
+    "content_agent_pattern_recall_evidence.decode_elements",
+    "content_agent_pattern_recall_evidence.match_paths_request",
+    "content_agent_pattern_recall_evidence.match_paths_response",
+}
+V4_TOUCHED_TABLES = {
+    "content_agent_queries",
+    "content_agent_pattern_recall_evidence",
+    "content_agent_rule_decisions",
+    "content_agent_walk_actions",
+    "content_agent_source_path_records",
+    "content_agent_author_assets",
+    "content_agent_discovered_content_items",
+    "content_agent_search_clue_assets",
+}
+
+
+def test_schema_registry_counts_match_sql_v4_baseline():
+    registry = _registry()
+    sql_tables = parse_sql_schema()
+
+    assert len(sql_tables) == 21
+    assert sum(len(table.columns) for table in sql_tables.values()) == 353
+    assert sum(1 for table in sql_tables.values() for column in table.columns if column.is_json) == 57
+    assert registry["database"]["sql_column_count"] == 353
+    assert registry["database"]["json_column_count"] == 57
+    assert registry["coverage_targets"]["sql_column_count"] == 353
+    assert registry["coverage_targets"]["json_column_count"] == 57
+
+
+def test_v4_touched_tables_match_sql_runtime_and_json_column_whitelist():
+    registry = _registry()
+    sql_tables = parse_sql_schema()
+
+    for table_name in V4_TOUCHED_TABLES:
+        sql_columns = {column.name for column in sql_tables[table_name].columns}
+        registry_columns = {
+            column["field_name"]
+            for column in registry["tables"][table_name]["columns"]
+        }
+        assert registry_columns == sql_columns
+        if table_name in TABLE_COLUMNS:
+            assert TABLE_COLUMNS[table_name] <= sql_columns
+        json_columns = {column.name for column in sql_tables[table_name].columns if column.is_json}
+        assert JSON_COLUMNS_BY_TABLE.get(table_name, set()) == json_columns
+
+
+def test_registry_no_longer_registers_sql_missing_legacy_fields():
+    registry = _registry()
+    registry_refs = {
+        f"{table_name}.{column['field_name']}"
+        for table_name, table in registry["tables"].items()
+        for column in table["columns"]
+    }
+    indexed_refs = {
+        ref
+        for refs in registry["field_index"].values()
+        for ref in refs
+    }
+
+    assert not (LEGACY_REGISTRY_FIELDS & registry_refs)
+    assert not (LEGACY_REGISTRY_FIELDS & indexed_refs)
+
+
+def test_raw_payload_policy_matches_runtime_v4_tables():
+    policy = _registry()["raw_payload_policy"]
+
+    assert {
+        "access_token",
+        "refresh_token",
+        "apikey",
+        "authorization",
+        "cookie",
+        "session",
+        "credential",
+    } <= set(policy["forbidden_keys"])
+    assert {
+        "content_agent_pattern_recall_evidence",
+        "content_agent_walk_actions",
+    } <= set(policy["required_tables"])
+
+
+def _registry():
+    return json.loads(REGISTRY_PATH.read_text(encoding="utf-8"))

+ 47 - 0
tests/test_shipinhao_client.py

@@ -80,6 +80,29 @@ def test_shipinhao_search_maps_canonical_fields():
     assert result["next_cursor"] == "12"
 
 
+def test_shipinhao_search_default_limit_is_five():
+    items = [
+        {
+            "channel_content_id": f"finderobj_{index}",
+            "title": "圆形彩虹",
+            "content_type": "video",
+        }
+        for index in range(6)
+    ]
+    success = {"code": 0, "data": {"data": items}}
+    client, _ = _client([_response(200, success)])
+
+    results = client.search(_query())
+
+    assert [result["platform_content_id"] for result in results] == [
+        "finderobj_0",
+        "finderobj_1",
+        "finderobj_2",
+        "finderobj_3",
+        "finderobj_4",
+    ]
+
+
 def test_shipinhao_search_retries_on_25011_then_succeeds():
     client, sleeps = _client([_response(200, _FAIL_25011), _response(200, _SUCCESS)])
     result = client.search(_query())
@@ -105,3 +128,27 @@ def test_shipinhao_search_raises_after_exhausted():
 def test_shipinhao_fetch_author_works_blocked_returns_empty():
     client, _ = _client([])
     assert client.fetch_author_works({"platform_author_id": "acc_123"}) == []
+    assert client.http_client.requests == []
+
+
+def test_shipinhao_from_env_reads_limit_override(monkeypatch, tmp_path):
+    monkeypatch.setenv("CONTENTFIND_API_CRAWAPI_BASE_URL", "http://crawler.test")
+    monkeypatch.setenv("CONTENTFIND_SHIPINHAO_MAX_RESULTS_PER_QUERY", "2")
+
+    client = CrawapiShipinhaoClient.from_env(env_path=tmp_path / "missing.env")
+
+    assert client.max_results_per_query == 2
+
+
+def test_shipinhao_from_env_default_limit_is_five(monkeypatch, tmp_path):
+    monkeypatch.setenv("CONTENTFIND_API_CRAWAPI_BASE_URL", "http://crawler.test")
+
+    client = CrawapiShipinhaoClient.from_env(env_path=tmp_path / "missing.env")
+
+    assert client.max_results_per_query == 5
+
+
+def test_shipinhao_account_info_is_not_reliable_capability():
+    client, _ = _client([])
+
+    assert not hasattr(client, "fetch_account_info")

+ 1 - 9
tests/test_source_evidence.py

@@ -16,7 +16,6 @@ def test_source_evidence_inherits_evidence_pack_without_rewriting_origin(tmp_pat
     source_context = service.read_json(run_id, "source_context.json")
     evidence_pack = source_context["ext_data"]["evidence_pack"]
     decisions = service.read_jsonl(run_id, "rule_decisions.jsonl")
-    final_output = service.read_json(run_id, "final_output.json")
 
     source_evidence = decisions[0]["source_evidence"]
     assert source_evidence["policy_run_id"] == state["policy_run_id"]
@@ -50,14 +49,7 @@ def test_source_evidence_inherits_evidence_pack_without_rewriting_origin(tmp_pat
         not in source_evidence["matched_post_ids"]
     )
     assert source_evidence["discovery_relation"] == "mock_pattern_matched"
-    # M3: mock content scores 60 (relevance 60 + zero heat) → all three land in
-    # review, so the inherited source_evidence now surfaces on review_records rather
-    # than content_assets; the inheritance (carrying source_path_record_ids) is the
-    # property under test, not the pool/review band.
-    assert final_output["review_records"][0]["source_evidence"]["source_path_record_ids"]
-    assert {
-        record["decision_id"] for record in final_output["decision_records"]
-    } == {"d_001", "d_002", "d_003"}
+    assert {record["decision_id"] for record in decisions} == {"d_001", "d_002", "d_003"}
 
 
 def test_source_evidence_tracks_multiple_query_sources_without_polluting_origin(tmp_path):

+ 4 - 6
tests/test_v1_graph.py

@@ -22,16 +22,14 @@ def test_v1_graph_generates_all_runtime_files(tmp_path):
 
     final_output = service.read_json(run_id, "final_output.json")
     assert final_output["policy_run_id"] == state["policy_run_id"]
-    # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分。mock 链默认 Gemini
-    # 给 relevance 60,mock 平台内容热度均不足 → 三条都落 60-69 复看带(无进池/无拒)。
     assert final_output["summary"]["pooled_content_count"] == 0
-    assert final_output["summary"]["review_content_count"] == 3
+    assert final_output["summary"]["review_content_count"] == 0
     assert final_output["summary"]["pending_content_count"] == 0
-    assert final_output["summary"]["rejected_content_count"] == 0
+    assert final_output["summary"]["rejected_content_count"] == 3
     assert final_output["summary"]["effect_status_counts"] == {
         "success": 0,
-        "pending": 3,
-        "failed": 0,
+        "pending": 0,
+        "failed": 3,
         "rule_blocked": 0,
     }
     assert (

+ 118 - 0
tests/test_v4_m2_platform_sources_replay.py

@@ -0,0 +1,118 @@
+from __future__ import annotations
+
+import json
+
+from content_agent.business_modules import platform_access
+from content_agent.business_modules.content_discovery import content_discovery_builder
+from content_agent.integrations.database_runtime import DatabaseRuntimeStore
+from content_agent.integrations.runtime_files import LocalRuntimeFileStore
+from tests.test_database_runtime import FakeConnection, _config, _insert_values
+
+
+RUN_ID = "run_m2_replay"
+POLICY_RUN_ID = "policy_m2_replay"
+
+
+class SearchOnlyPlatformClient:
+    def __init__(self, platform: str, content_id: str):
+        self.platform = platform
+        self.content_id = content_id
+
+    def search(self, search_query):
+        return [
+            {
+                "content_discovery_id": f"{search_query['search_query_id']}_content_001",
+                "search_query_id": search_query["search_query_id"],
+                "platform": self.platform,
+                "platform_content_id": self.content_id,
+                "platform_content_format": "video",
+                "description": f"{self.platform} 内容",
+                "platform_author_id": f"{self.platform}_author",
+                "author_display_name": f"{self.platform} 作者",
+                "statistics": {
+                    "digg_count": 10,
+                    "comment_count": 2,
+                    "share_count": 3,
+                    "collect_count": 4,
+                    "play_count": 100,
+                },
+                "tags": ["#祝福"],
+                "play_url": f"https://video.test/{self.content_id}.mp4",
+                "has_more": False,
+                "next_cursor": "",
+                "platform_raw_payload": {
+                    "channel_content_id": self.content_id,
+                    "channel_account_id": f"{self.platform}_author",
+                },
+                "discovery_start_source": search_query["discovery_start_source"],
+                "previous_discovery_step": "search_query_direct",
+            }
+        ]
+
+
+def test_v4_m2_platform_sources_replay_preserves_observable_containers(tmp_path):
+    platform_results = []
+    for platform in ["douyin", "kuaishou", "shipinhao"]:
+        query = {
+            "search_query_id": f"q_{platform}",
+            "search_query": f"{platform} 祝福",
+            "search_query_generation_method": "item_single",
+            "discovery_start_source": "pattern_itemset",
+        }
+        result = platform_access.run(
+            [query],
+            SearchOnlyPlatformClient(platform, f"{platform}_content_001"),
+        )
+        assert result["query_failures"] == []
+        platform_results.extend(result["platform_results"])
+
+    runtime = LocalRuntimeFileStore(tmp_path / "runtime")
+    runtime.prepare_run(RUN_ID)
+    discovery_result = content_discovery_builder.run(
+        RUN_ID,
+        POLICY_RUN_ID,
+        platform_results,
+        _source_context(),
+        runtime,
+    )
+
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+    store.append_jsonl(
+        RUN_ID,
+        "discovered_content_items.jsonl",
+        discovery_result["discovered_content_items"],
+    )
+
+    inserted = [_insert_values(sql, params) for sql, params in connection.statements]
+    assert len(inserted) == 3
+    for values in inserted:
+        statistics = json.loads(values["statistics"])
+        platform_raw_payload = json.loads(values["platform_raw_payload"])
+        raw_payload = json.loads(values["raw_payload"])
+
+        assert statistics["digg_count"] == 10
+        assert statistics["play_count"] == 100
+        assert platform_raw_payload["channel_content_id"].endswith("_content_001")
+        assert raw_payload["statistics"] == statistics
+        assert raw_payload["platform_raw_payload"] == platform_raw_payload
+        assert raw_payload["matched_search_query_ids"] == [values["search_query_id"]]
+
+
+def _source_context():
+    return {
+        "schema_version": "runtime_record.v1",
+        "run_id": RUN_ID,
+        "demand_content_id": "demand_001",
+        "ext_data": {
+            "evidence_pack": {
+                "pattern_source_system": "pg_pattern_v2",
+                "source_kind": "pattern_itemset",
+                "source_post_id": "post_001",
+                "pattern_execution_id": 581,
+                "mining_config_id": 2082,
+                "itemset_ids": [1608352],
+                "seed_terms": ["祝福"],
+            }
+        },
+    }

+ 122 - 0
tests/test_v4_m3_scoring_replay.py

@@ -0,0 +1,122 @@
+from __future__ import annotations
+
+import json
+
+from content_agent.business_modules.run_record.validation import validate_run
+from content_agent.integrations.database_runtime import DatabaseRuntimeStore
+from content_agent.integrations.runtime_files import LocalRuntimeFileStore
+from tests.gemini_helpers import FakeGeminiVideoClient, fake_gemini_fail, fake_gemini_pool
+from tests.replay_harness import replay_case
+from tests.test_database_runtime import FakeConnection, _config, _insert_values
+
+
+def test_v4_m3_scoring_replay_produces_v4_runtime_contract(tmp_path):
+    runtime_root = tmp_path / "runtime"
+    artifacts = replay_case(
+        "real_id45",
+        runtime_root=runtime_root,
+        gemini_video_client=FakeGeminiVideoClient(default_result=fake_gemini_pool()),
+    )
+
+    validation = validate_run(artifacts.run_id, LocalRuntimeFileStore(runtime_root))
+
+    assert artifacts.state["status"] == "success"
+    assert validation["status"] == "pass"
+    assert artifacts.files["final_output.json"]["validation_status"] == "pass"
+    assert artifacts.files["discovered_content_items.jsonl"]
+    assert artifacts.files["pattern_recall_evidence.jsonl"]
+    assert artifacts.decisions
+
+    for item in artifacts.files["discovered_content_items.jsonl"]:
+        assert "statistics" in item
+        assert "platform_raw_payload" in item
+        assert "raw_payload" in item
+
+    for evidence in artifacts.files["pattern_recall_evidence.jsonl"]:
+        summary = evidence["evidence_summary"]
+        assert summary["schema_version"] == "v4_gemini_query_relevance.v1"
+        assert summary["final_status"] in {"ok", "success"}
+        assert "query_relevance_score" in summary
+        assert "fit_senior_50plus" not in summary
+        assert "relevance_score" not in summary
+
+    for decision in artifacts.decisions:
+        scorecard = decision["scorecard"]
+        replay_data = decision["decision_replay_data"]
+        assert scorecard["schema_version"] == "v4_scorecard.v1"
+        assert "platform_performance_score" in scorecard
+        assert "platform_performance_components" in scorecard
+        assert "missing_observable_fields" in scorecard
+        assert "allow_walk" in replay_data
+        assert "walk_gate_snapshot" in replay_data
+        assert "platform_heat" not in scorecard
+
+
+def test_v4_m3_scoring_replay_keeps_technical_failure_in_review(tmp_path):
+    artifacts = replay_case(
+        "real_id45",
+        runtime_root=tmp_path / "runtime",
+        gemini_video_client=FakeGeminiVideoClient(default_result=fake_gemini_fail()),
+    )
+
+    assert artifacts.summary["pooled_content_count"] == 0
+    assert artifacts.summary["rejected_content_count"] == 0
+    assert artifacts.summary["review_content_count"] == len(artifacts.decisions)
+    assert {d["decision_reason_code"] for d in artifacts.decisions} == {"v4_technical_retry_needed"}
+    assert all(d["decision_replay_data"]["allow_walk"] is False for d in artifacts.decisions)
+    assert all(d["scorecard"]["schema_version"] == "v4_scorecard.v1" for d in artifacts.decisions)
+
+
+def test_v4_m3_db_runtime_preserves_scoring_json_containers(tmp_path):
+    artifacts = replay_case(
+        "real_id45",
+        runtime_root=tmp_path / "runtime",
+        gemini_video_client=FakeGeminiVideoClient(default_result=fake_gemini_pool()),
+    )
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    store.append_jsonl(
+        artifacts.run_id,
+        "discovered_content_items.jsonl",
+        artifacts.files["discovered_content_items.jsonl"][:1],
+    )
+    store.append_jsonl(
+        artifacts.run_id,
+        "pattern_recall_evidence.jsonl",
+        artifacts.files["pattern_recall_evidence.jsonl"][:1],
+    )
+    store.append_jsonl(artifacts.run_id, "rule_decisions.jsonl", artifacts.decisions[:1])
+
+    inserted = [(_table_name(sql), _insert_values(sql, params)) for sql, params in connection.statements]
+    content_row = _only(inserted, "content_agent_discovered_content_items")
+    evidence_row = _only(inserted, "content_agent_pattern_recall_evidence")
+    decision_row = _only(inserted, "content_agent_rule_decisions")
+
+    content_raw = json.loads(content_row["raw_payload"])
+    assert json.loads(content_row["statistics"]) == content_raw["statistics"]
+    assert json.loads(content_row["platform_raw_payload"]) == content_raw["platform_raw_payload"]
+
+    evidence_summary = json.loads(evidence_row["evidence_summary"])
+    evidence_raw = json.loads(evidence_row["raw_payload"])
+    assert evidence_summary["schema_version"] == "v4_gemini_query_relevance.v1"
+    for key, value in evidence_summary.items():
+        assert evidence_raw[key] == value
+
+    scorecard = json.loads(decision_row["scorecard"])
+    replay_data = json.loads(decision_row["decision_replay_data"])
+    decision_raw = json.loads(decision_row["raw_payload"])
+    assert scorecard["schema_version"] == "v4_scorecard.v1"
+    assert replay_data["allow_walk"] in {True, False}
+    assert decision_raw["scorecard"] == scorecard
+    assert decision_raw["decision_replay_data"] == replay_data
+
+
+def _table_name(sql: str) -> str:
+    return sql.split("`", 2)[1]
+
+
+def _only(rows: list[tuple[str, dict]], table: str) -> dict:
+    matches = [values for row_table, values in rows if row_table == table]
+    assert len(matches) == 1
+    return matches[0]

+ 55 - 0
tests/test_v4_rule_pack_contract.py

@@ -0,0 +1,55 @@
+from pathlib import Path
+import json
+
+from scripts.validate_v4_config_contract import (
+    assert_no_v4_legacy_fields,
+    validate_v4_config_contract,
+)
+
+
+ROOT = Path(__file__).resolve().parents[1]
+
+
+def test_v4_config_contract_passes_with_m3_rule_pack_switch():
+    assert validate_v4_config_contract(ROOT) == []
+
+
+def test_v4_contract_fixture_contains_no_legacy_rule_fields():
+    fixture = {
+        "schema_version": "v4_scorecard.v1",
+        "scorecard": {
+            "query_relevance_score": 80,
+            "platform_performance_score": 70,
+            "missing_observable_fields": [],
+        },
+        "decision_replay_data": {"allow_walk": True},
+    }
+
+    assert assert_no_v4_legacy_fields(fixture) == []
+
+
+def test_v4_legacy_blocklist_reports_precise_paths():
+    fixture = {
+        "schema_version": "v4_scorecard.v1",
+        "scorecard": {
+            "query_relevance_score": 80,
+            "platform_heat": 70,
+        },
+    }
+
+    assert assert_no_v4_legacy_fields(fixture) == ["v4_contract.scorecard.platform_heat"]
+
+
+def test_production_v4_rule_pack_contains_no_legacy_scoring_fields():
+    pkg = json.loads(
+        (ROOT / "product_documents/规则包/douyin_rule_packs.v1.json").read_text(encoding="utf-8")
+    )
+    pack = pkg["rule_packs"][0]
+
+    assert pkg["strategy_binding"]["strategy_version"] == "V4"
+    assert pack["scorecard"]["schema_version"] == "v4_scorecard.v1"
+    assert assert_no_v4_legacy_fields(pack, "rule_pack") == []
+    assert [row["key"] for row in pack["scorecard"]["dimensions"]] == [
+        "query_relevance",
+        "platform_performance",
+    ]

+ 379 - 0
tests/test_v4_validator_contract.py

@@ -0,0 +1,379 @@
+from __future__ import annotations
+
+import copy
+from typing import Any, Callable
+
+from content_agent.business_modules.run_record.validation import validate_run
+from content_agent.integrations.runtime_files import LocalRuntimeFileStore, RUNTIME_FILENAMES
+
+
+RUN_ID = "run_v4_contract"
+POLICY_RUN_ID = "policy_v4_contract"
+
+
+def test_v4_contract_runtime_passes_validate_run(tmp_path):
+    runtime = _write_runtime(tmp_path)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert result["status"] == "pass"
+
+
+def test_v4_contract_does_not_apply_to_v3_scorecard_records(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        decision = data["rule_decisions.jsonl"][0]
+        decision["score"] = None
+        decision["scorecard"] = {
+            "total_score": None,
+            "fit_senior_50plus": True,
+            "relevance_score": 0.91,
+        }
+        decision["decision_replay_data"].pop("allow_walk")
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert result["status"] == "pass"
+
+
+def test_v4_score_contract_rejects_bad_total(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        data["rule_decisions.jsonl"][0]["score"] = 92
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert _check_ids(result) == ["v4_score_total_mismatch"]
+
+
+def test_v4_walk_gate_rejects_allow_walk_below_threshold(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        decision = data["rule_decisions.jsonl"][0]
+        decision["score"] = 65
+        decision["scorecard"]["query_relevance_score"] = 60
+        decision["scorecard"]["platform_performance_score"] = 70
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert "v4_allow_walk_threshold_mismatch" in _check_ids(result)
+
+
+def test_v4_action_thresholds_reject_conflicting_action(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        decision = data["rule_decisions.jsonl"][0]
+        decision["decision_action"] = "KEEP_CONTENT_FOR_REVIEW"
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert "v4_action_threshold_mismatch" in _check_ids(result)
+
+
+def test_v4_gemini_failure_requires_structured_failure_fields(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        summary = data["pattern_recall_evidence.jsonl"][0]["evidence_summary"]
+        summary.clear()
+        summary.update(
+            {
+                "schema_version": "v4_gemini_query_relevance.v1",
+                "final_status": "failed",
+                "retry_count": 0,
+            }
+        )
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert {
+        "v4_gemini_failure_incomplete",
+        "v4_gemini_failure_retry_invalid",
+    } <= set(_check_ids(result))
+
+
+def test_v4_legacy_field_blocklist_rejects_v4_records_only(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        data["rule_decisions.jsonl"][0]["scorecard"]["platform_heat"] = 88
+        data["pattern_recall_evidence.jsonl"][0]["evidence_summary"]["relevance_score"] = 0.9
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert _check_ids(result).count("v4_legacy_field_present") == 2
+
+
+def _write_runtime(
+    tmp_path,
+    mutate: Callable[[dict[str, Any]], None] | None = None,
+) -> LocalRuntimeFileStore:
+    runtime = LocalRuntimeFileStore(tmp_path / "runtime")
+    runtime.prepare_run(RUN_ID)
+    data = _runtime_payload()
+    if mutate:
+        mutate(data)
+    for filename in RUNTIME_FILENAMES:
+        value = data[filename]
+        if isinstance(value, list):
+            runtime.append_jsonl(RUN_ID, filename, value)
+        else:
+            runtime.write_json(RUN_ID, filename, value)
+    return runtime
+
+
+def _runtime_payload() -> dict[str, Any]:
+    evidence_pack = {
+        "pattern_source_system": "pg_pattern_v2",
+        "case_id_type": "post_id",
+        "source_kind": "pattern_itemset",
+        "source_post_id": "post_001",
+        "pattern_execution_id": 581,
+        "mining_config_id": 2082,
+        "itemset_ids": [1608352],
+        "itemset_items": ["毛主席", "感人"],
+        "support": 5,
+        "absolute_support": 5,
+        "matched_post_ids": ["post_001", "post_002"],
+        "video_ids": ["video_001"],
+        "case_ids": ["case_001"],
+        "seed_terms": ["父爱感悟"],
+        "discovery_start_source": "pattern_seed",
+        "previous_discovery_step": "search_query_generated",
+        "origin_path_id": "path_origin_001",
+        "run_id": RUN_ID,
+        "policy_run_id": POLICY_RUN_ID,
+        "source_certainty": "high",
+        "validation_status": "validated",
+    }
+    source_evidence = _source_evidence(evidence_pack, "content_001")
+    path_ids = ["path_pattern_query", "path_query_content", "path_decision_asset"]
+    decision = {
+        "record_schema_version": "runtime_record.v1",
+        "run_id": RUN_ID,
+        "policy_run_id": POLICY_RUN_ID,
+        "decision_id": "decision_001",
+        "policy_bundle_id": "policy_bundle_v4",
+        "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
+        "rule_pack_version": "4.0.0",
+        "strategy_version": "V4",
+        "decision_target_type": "content",
+        "decision_target_id": "content_001",
+        "decision_action": "ADD_TO_CONTENT_POOL",
+        "decision_reason_code": "v4_query_and_platform_pass",
+        "search_query_effect_status": "success",
+        "score": 75,
+        "scorecard": {
+            "schema_version": "v4_scorecard.v1",
+            "query_relevance_score": 80,
+            "platform_performance_score": 70,
+            "missing_observable_fields": [],
+        },
+        "source_evidence": copy.deepcopy(source_evidence),
+        "decision_replay_data": {
+            "policy_bundle_hash": "hash_v4",
+            "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
+            "rule_pack_version": "4.0.0",
+            "dispatch_id": "dispatch_v4",
+            "strategy_version": "V4",
+            "allow_walk": True,
+            "walk_gate_snapshot": {
+                "query_relevance_score": 80,
+                "platform_performance_score": 70,
+                "score": 75,
+            },
+        },
+        "raw_payload": {"decision_id": "decision_001", "v4_contract": True},
+    }
+    return {
+        "source_context.json": {
+            "schema_version": "runtime_record.v1",
+            "run_id": RUN_ID,
+            "demand_content_id": "123",
+            "ext_data": {"evidence_pack": evidence_pack},
+        },
+        "pattern_seed_pack.json": {
+            "schema_version": "runtime_record.v1",
+            "run_id": RUN_ID,
+            "policy_run_id": POLICY_RUN_ID,
+            "itemsets": [{"itemset_id": 1608352}],
+            "seed_terms": ["父爱感悟"],
+        },
+        "search_queries.jsonl": [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": RUN_ID,
+                "policy_run_id": POLICY_RUN_ID,
+                "search_query_id": "query_001",
+                "search_query": "父爱感悟",
+                "search_query_generation_method": "v4_seed",
+                "discovery_start_source": "pattern_seed",
+                "previous_discovery_step": "search_query_generated",
+                "search_query_effect_status": "success",
+                "pattern_seed_ref": {"query_source_type": "seed"},
+                "raw_payload": {"query_source_refs": [{"query_source_type": "seed"}]},
+            }
+        ],
+        "discovered_content_items.jsonl": [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": RUN_ID,
+                "policy_run_id": POLICY_RUN_ID,
+                "content_discovery_id": "discovery_001",
+                "search_query_id": "query_001",
+                "platform": "douyin",
+                "platform_content_id": "content_001",
+                "content_url": "https://example.com/content_001",
+                "statistics": {"share_count": 10},
+                "tags": ["父爱"],
+                "source_evidence": copy.deepcopy(source_evidence),
+                "pattern_match_result": {
+                    "judge_status": "ok",
+                    "pattern_recall_evidence_id": "recall_001",
+                },
+                "platform_raw_payload": {},
+                "raw_payload": {"platform_content_id": "content_001"},
+            }
+        ],
+        "content_media_records.jsonl": [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": RUN_ID,
+                "policy_run_id": POLICY_RUN_ID,
+                "media_record_id": "media_001",
+                "platform": "douyin",
+                "platform_content_id": "content_001",
+                "media_status": "available",
+                "raw_payload": {"platform_content_id": "content_001"},
+            }
+        ],
+        "pattern_recall_evidence.jsonl": [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": RUN_ID,
+                "policy_run_id": POLICY_RUN_ID,
+                "recall_evidence_id": "recall_001",
+                "content_discovery_id": "discovery_001",
+                "platform_content_id": "content_001",
+                "recall_status": "judged",
+                "evidence_summary": {
+                    "schema_version": "v4_gemini_query_relevance.v1",
+                    "final_status": "success",
+                    "query_relevance_score": 80,
+                    "reason": "契合 query",
+                },
+                "raw_payload": {"recall_evidence_id": "recall_001"},
+            }
+        ],
+        "rule_decisions.jsonl": [decision],
+        "walk_actions.jsonl": [],
+        "run_events.jsonl": [],
+        "source_path_records.jsonl": [
+            _path("path_pattern_query", "pattern_to_search_query", "Pattern", 581, "SearchQuery", "query_001"),
+            _path("path_query_content", "search_query_to_content", "SearchQuery", "query_001", "Content", "content_001"),
+            _path(
+                "path_decision_asset",
+                "decision_to_asset",
+                "RuleDecision",
+                "decision_001",
+                "ContentAsset",
+                "content_001",
+                decision_id="decision_001",
+            ),
+        ],
+        "search_clues.jsonl": [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": RUN_ID,
+                "policy_run_id": POLICY_RUN_ID,
+                "clue_id": "clue_001",
+                "search_query_id": "query_001",
+                "search_query": "父爱感悟",
+                "result_count": 1,
+                "pooled_content_count": 1,
+                "review_content_count": 0,
+                "pending_content_count": 0,
+                "rejected_content_count": 0,
+                "search_query_effect_status": "success",
+                "query_aggregation_id": "agg_query_success",
+                "raw_payload": {"clue_id": "clue_001"},
+            }
+        ],
+        "final_output.json": {
+            "schema_version": "runtime_record.v1",
+            "run_id": RUN_ID,
+            "policy_run_id": POLICY_RUN_ID,
+            "validation_status": "pass",
+            "content_assets": [
+                {
+                    "platform_content_id": "content_001",
+                    "decision_id": "decision_001",
+                    "source_path_record_ids": path_ids,
+                    "source_evidence": copy.deepcopy(source_evidence),
+                }
+            ],
+            "author_assets": [],
+            "review_records": [],
+            "decision_records": [
+                {
+                    "decision_id": "decision_001",
+                    "source_evidence": copy.deepcopy(source_evidence),
+                }
+            ],
+            "search_clues": [],
+            "reject_records": [],
+            "summary": {
+                "pooled_content_count": 1,
+                "review_content_count": 0,
+                "pending_content_count": 0,
+                "rejected_content_count": 0,
+                "run_path_complete": True,
+                "trace_complete": True,
+            },
+        },
+        "strategy_review.json": {
+            "schema_version": "runtime_record.v1",
+            "run_id": RUN_ID,
+            "policy_run_id": POLICY_RUN_ID,
+            "summary": {},
+            "raw_payload": {"strategy_review_id": "review_001"},
+        },
+    }
+
+
+def _source_evidence(evidence_pack: dict[str, Any], platform_content_id: str) -> dict[str, Any]:
+    evidence = copy.deepcopy(evidence_pack)
+    evidence["discovered_platform_content_id"] = platform_content_id
+    return evidence
+
+
+def _path(
+    path_id: str,
+    path_type: str,
+    from_type: str,
+    from_id: Any,
+    to_type: str,
+    to_id: Any,
+    decision_id: str | None = None,
+) -> dict[str, Any]:
+    return {
+        "record_schema_version": "runtime_record.v1",
+        "run_id": RUN_ID,
+        "policy_run_id": POLICY_RUN_ID,
+        "source_path_record_id": path_id,
+        "source_path_type": path_type,
+        "from_node_type": from_type,
+        "from_node_id": from_id,
+        "to_node_type": to_type,
+        "to_node_id": to_id,
+        "decision_id": decision_id,
+        "raw_payload": {"source_path_record_id": path_id},
+    }
+
+
+def _check_ids(result: dict[str, Any]) -> list[str]:
+    return [finding["check_id"] for finding in result["findings"]]

+ 101 - 0
tests/test_v4_walk_contract.py

@@ -0,0 +1,101 @@
+import json
+from pathlib import Path
+
+
+ROOT = Path(__file__).resolve().parents[1]
+DATA_DIR = ROOT / "tech_documents/数据接口与来源"
+
+
+def test_v4_query_source_refs_support_three_sources():
+    query = {
+        "pattern_seed_ref": {"query_source_type": "seed"},
+        "raw_payload": {
+            "query_source_refs": [
+                {"query_source_type": "seed", "source_rank": 1},
+                {"query_source_type": "topic_point", "source_rank": 2},
+                {"query_source_type": "terminal_element", "source_rank": 3},
+            ]
+        },
+    }
+
+    source_types = {ref["query_source_type"] for ref in query["raw_payload"]["query_source_refs"]}
+
+    assert source_types == {"seed", "topic_point", "terminal_element"}
+
+
+def test_v4_tag_walk_contract_uses_80_threshold_and_raw_tag_query():
+    tag_action = {
+        "edge_id": "hashtag_to_query",
+        "raw_payload": {
+            "tag_text": "父爱感悟",
+            "next_query": "父爱感悟",
+            "tag_query_relevance_score": 80,
+            "tag_walk_status": "allow",
+        },
+    }
+
+    payload = tag_action["raw_payload"]
+
+    assert payload["tag_query_relevance_score"] >= 80
+    assert payload["next_query"] == payload["tag_text"]
+    assert payload["tag_walk_status"] == "allow"
+
+
+def test_v4_douyin_author_gate_contract_uses_elderly_ratio_and_tgi():
+    author_asset = {
+        "platform": "douyin",
+        "elderly_ratio": 0.61,
+        "elderly_tgi": 121,
+        "eligible_as_source": 1,
+        "profile_snapshot": {"source": "hotspotbao_account_profile"},
+        "evidence_refs": {"decision_ids": ["decision_001"]},
+    }
+
+    assert author_asset["elderly_ratio"] > 0.6
+    assert author_asset["elderly_tgi"] > 120
+    assert author_asset["eligible_as_source"] == 1
+    assert author_asset["profile_snapshot"]
+    assert author_asset["evidence_refs"]
+
+
+def test_v4_kuaishou_and_shipinhao_author_work_edges_are_blocked():
+    kuaishou = _json(DATA_DIR / "platform_profiles/kuaishou.json")
+    shipinhao = _json(DATA_DIR / "platform_profiles/shipinhao.json")
+
+    assert kuaishou["edges"]["query_next_page"]["status"] == "blocked"
+    assert kuaishou["edges"]["author_to_works"]["status"] == "blocked"
+    assert kuaishou["edges"]["author_work_to_content"]["status"] == "blocked"
+    assert shipinhao["edges"]["author_to_works"]["status"] == "blocked"
+    assert shipinhao["edges"]["author_work_to_content"]["status"] == "blocked"
+    assert shipinhao["endpoints"]["account_info"]["status"] == "blocked"
+
+
+def test_v4_m2_platform_profiles_define_observable_fields():
+    for platform in ["douyin", "kuaishou", "shipinhao"]:
+        profile = _json(DATA_DIR / f"platform_profiles/{platform}.json")
+        covered = {
+            item["field"]
+            for item in profile["observable_fields"] + profile["missing_observable_fields"]
+        }
+
+        assert covered == {
+            "statistics.digg_count",
+            "statistics.comment_count",
+            "statistics.share_count",
+            "statistics.collect_count",
+            "statistics.play_count",
+        }
+        for item in profile["missing_observable_fields"]:
+            assert item["missing_type"] in {"natural_platform_missing", "runtime_missing"}
+
+
+def test_v4_video_and_author_dedup_keys_are_separate():
+    dedup = _json(DATA_DIR / "walk_policy.json")["dedup"]
+
+    assert "platform_content_id" in dedup["content_key"]
+    assert "platform_author_id" in dedup["author_key"]
+    assert dedup["content_key"] != dedup["author_key"]
+
+
+def _json(path: Path):
+    return json.loads(path.read_text(encoding="utf-8"))

+ 1 - 1
tests/test_walk_strategy_config.py

@@ -58,7 +58,7 @@ def test_walk_strategy_config_uses_clue_id_and_real_rule_packs():
         (row["rule_pack_id"], row["rule_pack_version"])
         for row in strategy["walk_rule_pack_binding"]
     } == {
-        ("douyin_content_discovery_rule_pack_v1", "1.0.0"),
+        ("douyin_content_discovery_rule_pack_v1", "4.0.0"),
     }