Просмотр исходного кода

feat: enforce V4 allow_walk in walk runtime

Sam Lee 1 неделя назад
Родитель
Сommit
3b0cbdabbf

+ 55 - 3
content_agent/business_modules/run_record/validation.py

@@ -125,6 +125,7 @@ def validate_run(run_id: str, runtime: RuntimeFileStore) -> dict[str, Any]:
     _check_completeness(data, findings)
     _check_v4_score_contract(data, findings)
     _check_v4_walk_gate_contract(data, findings)
+    _check_v4_walk_action_consumption(data, findings)
     _check_v4_action_thresholds(data, findings)
     _check_v4_gemini_failure_contract(data, findings)
     _check_v4_legacy_field_blocklist(data, findings)
@@ -907,18 +908,69 @@ def _check_v4_walk_gate_contract(data: dict[str, Any], findings: list[dict[str,
             )
             continue
         query_score, platform_score, total_score = _v4_score_values(decision)
-        if replay_data["allow_walk"] and not (
+        expected_allow_walk = (
             _is_number(query_score)
             and _is_number(platform_score)
             and _is_number(total_score)
             and query_score >= 70
             and platform_score >= 65
             and total_score >= 70
-        ):
+        )
+        if replay_data["allow_walk"] != expected_allow_walk:
             _fail(
                 findings,
                 "v4_allow_walk_threshold_mismatch",
-                f"decision {decision.get('decision_id')} allow_walk=true without query>=70/platform>=65/score>=70",
+                f"decision {decision.get('decision_id')} allow_walk does not match query>=70/platform>=65/score>=70",
+            )
+
+
+def _check_v4_walk_action_consumption(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    v4_decisions_by_id = {
+        decision.get("decision_id"): decision
+        for decision in data.get("rule_decisions.jsonl", [])
+        if _is_v4_contract_record(decision) and decision.get("decision_id")
+    }
+    if not v4_decisions_by_id:
+        return
+    for action in data.get("walk_actions.jsonl", []):
+        edge_id = action.get("edge_id")
+        if edge_id not in {"query_next_page", "hashtag_to_query", "author_to_works"}:
+            continue
+        raw_payload = action.get("raw_payload") or {}
+        decision_id = raw_payload.get("decision_id") or action.get("decision_id")
+        decision = v4_decisions_by_id.get(decision_id)
+        if not decision:
+            continue
+        missing_fields = [
+            field
+            for field in ["decision_id", "allow_walk", "allow_walk_reason", "walk_gate_snapshot"]
+            if field not in raw_payload
+        ]
+        if missing_fields:
+            _fail(
+                findings,
+                "v4_walk_action_gate_context_missing",
+                f"walk action {action.get('walk_action_id')} missing V4 gate context fields: {missing_fields}",
+            )
+            continue
+        expected_allow_walk = (decision.get("decision_replay_data") or {}).get("allow_walk")
+        if raw_payload.get("allow_walk") != expected_allow_walk:
+            _fail(
+                findings,
+                "v4_walk_action_allow_walk_mismatch",
+                f"walk action {action.get('walk_action_id')} allow_walk does not match decision {decision_id}",
+            )
+        if action.get("walk_status") == "success" and expected_allow_walk is not True:
+            _fail(
+                findings,
+                "v4_walk_action_allow_walk_denied",
+                f"walk action {action.get('walk_action_id')} succeeded for allow_walk=false decision {decision_id}",
+            )
+        if action.get("reason_code") == "v4_allow_walk_denied" and expected_allow_walk is not False:
+            _fail(
+                findings,
+                "v4_walk_action_deny_mismatch",
+                f"walk action {action.get('walk_action_id')} denies walk but decision {decision_id} is not allow_walk=false",
             )
 
 

+ 149 - 8
content_agent/business_modules/walk_engine.py

@@ -31,6 +31,10 @@ from content_agent.interfaces import (
 from content_agent.record_payload import with_raw_payload
 
 
+V4_SCORECARD_SCHEMA_VERSION = "v4_scorecard.v1"
+V4_ALLOW_WALK_DENIED_REASON = "v4_allow_walk_denied"
+V4_WALK_GATE_EDGES = {"query_next_page", "hashtag_to_query", "author_to_works"}
+
 BLOCKED_TAG_TERMS = {
     "h" + "ot",
     "trend" + "ing",
@@ -250,18 +254,57 @@ def _expand_queries(
         page_binding, _ = _resolve_edge_binding("query_next_page", walk_strategy)
         query_by_id = {row["search_query_id"]: row for row in search_queries}
         query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions)
+        query_gate_by_id = _query_v4_walk_gate_by_search_query_id(discovered_content_items, rule_decisions)
         seen_queries: set[str] = set()
         for item in discovered_content_items:
             search_query_id = item.get("search_query_id")
             cursor = item.get("next_cursor")
             if not item.get("has_more") or not cursor or search_query_id in seen_queries:
                 continue
-            if not _can_fetch_next_page(search_query_id, query_effect_by_id):
+            if not _can_fetch_next_page(search_query_id, query_effect_by_id, query_gate_by_id):
+                denied_decision = (query_gate_by_id.get(search_query_id) or {}).get("decision")
+                if (
+                    query_effect_by_id.get(search_query_id) == "success"
+                    and denied_decision
+                    and _v4_walk_gate_denied(denied_decision)
+                ):
+                    skipped_actions.append(
+                        _walk_action(
+                            run_id,
+                            policy_run_id,
+                            _walk_action_id(
+                                run_id, policy_run_id, "query_next_page", search_query_id, "allow_walk"
+                            ),
+                            "query_next_page",
+                            "query",
+                            "SearchQuery",
+                            search_query_id,
+                            "SearchQuery",
+                            "next_page_skipped",
+                            "fetch_next_page",
+                            "skipped",
+                            created_at,
+                            reason_code=V4_ALLOW_WALK_DENIED_REASON,
+                            budget_tier="blocked",
+                            rule_pack_binding=page_binding,
+                            rule_pack_execution=_execution_record(
+                                denied_decision,
+                                content_pack_id=content_pack["rule_pack_id"],
+                            ),
+                            fallback_rule_pack=content_pack,
+                            raw_extra={
+                                "parent_search_query_id": search_query_id,
+                                **_decision_context(denied_decision, denied=True),
+                            },
+                        )
+                    )
+                    seen_queries.add(search_query_id)
                 continue
             source = query_by_id.get(search_query_id)
             if not source:
                 continue
             seen_queries.add(search_query_id)
+            gate_decision = (query_gate_by_id.get(search_query_id) or {}).get("decision")
             if len(page_rows) >= page_budget:
                 skipped_actions.append(
                     _walk_action(
@@ -302,7 +345,10 @@ def _expand_queries(
                     "query_next_page",
                     created_at,
                     page_cursor=str(cursor),
-                    raw_extra={"parent_search_query_id": search_query_id},
+                    raw_extra={
+                        "parent_search_query_id": search_query_id,
+                        **_decision_context(gate_decision),
+                    },
                 )
             )
 
@@ -319,6 +365,9 @@ def _expand_queries(
             if _edge_permission_for(decision, "video_to_hashtag", policy) == "deny":
                 if item.get("tags"):
                     reason_code = (
+                        V4_ALLOW_WALK_DENIED_REASON
+                        if _v4_walk_gate_denied(decision)
+                        else
                         "review_tag_expansion_disabled"
                         if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"
                         else "blocked_by_rule_decision"
@@ -344,7 +393,10 @@ def _expand_queries(
                             rule_pack_binding=tag_binding,
                             rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
                             fallback_rule_pack=content_pack,
-                            raw_extra=_decision_context(decision),
+                            raw_extra=_decision_context(
+                                decision,
+                                denied=reason_code == V4_ALLOW_WALK_DENIED_REASON,
+                            ),
                         )
                     )
                 continue
@@ -394,6 +446,7 @@ def _expand_queries(
                         raw_extra={
                             "hashtag": normalized,
                             "source_content_id": item.get("platform_content_id"),
+                            **_decision_context(decision),
                         },
                     )
                 )
@@ -470,10 +523,15 @@ def _expand_authors(
         decision = decision_by_content_id.get(item.get("platform_content_id"))
         permission = _edge_permission_for(decision, "author_to_works", policy)
         if permission == "deny":
+            reason_code = (
+                V4_ALLOW_WALK_DENIED_REASON
+                if _v4_walk_gate_denied(decision)
+                else "blocked_by_rule_decision"
+            )
             walk_actions.append(
                 _author_walk_action(
                     run_id, policy_run_id, author_id, "skipped", created_at,
-                    reason_code="blocked_by_rule_decision",
+                    reason_code=reason_code,
                     budget_tier="blocked",
                     binding=binding,
                     decision=decision,
@@ -792,6 +850,7 @@ def _query_actions(
                     "reason": "content_decision_reused_for_walk_gate",
                 },
                 fallback_rule_pack=content_pack,
+                raw_extra=_walk_gate_context_from_query_row(row),
             )
         )
     return actions
@@ -911,8 +970,41 @@ def _query_effect_by_search_query_id(
     return effects
 
 
-def _can_fetch_next_page(search_query_id: str, query_effect_by_id: dict[str, str]) -> bool:
-    return query_effect_by_id.get(search_query_id) == "success"
+def _query_v4_walk_gate_by_search_query_id(
+    discovered_content_items: list[dict[str, Any]],
+    rule_decisions: list[dict[str, Any]],
+) -> dict[str, dict[str, Any]]:
+    decision_by_content_id = _decision_by_content_id(rule_decisions)
+    gates: dict[str, dict[str, Any]] = {}
+    for item in discovered_content_items:
+        decision = decision_by_content_id.get(item.get("platform_content_id"))
+        if not _is_v4_decision(decision):
+            continue
+        query_sources = item.get("query_sources") or [{"search_query_id": item.get("search_query_id")}]
+        for query_source in query_sources:
+            search_query_id = query_source.get("search_query_id")
+            if not search_query_id:
+                continue
+            gate = gates.setdefault(search_query_id, {"has_v4": True, "allow_walk": False, "decision": decision})
+            if _v4_allow_walk_allowed(decision):
+                gate["allow_walk"] = True
+                gate["decision"] = decision
+            elif not gate.get("allow_walk"):
+                gate["decision"] = decision
+    return gates
+
+
+def _can_fetch_next_page(
+    search_query_id: str,
+    query_effect_by_id: dict[str, str],
+    query_gate_by_id: dict[str, dict[str, Any]] | None = None,
+) -> bool:
+    if query_effect_by_id.get(search_query_id) != "success":
+        return False
+    gate = (query_gate_by_id or {}).get(search_query_id)
+    if gate and gate.get("has_v4"):
+        return bool(gate.get("allow_walk"))
+    return True
 
 
 def _edge_permission_for(
@@ -921,9 +1013,28 @@ def _edge_permission_for(
     """判定→边通行证:无判定 / 查询 rule_blocked 一律 deny,其余查 edge_permissions。"""
     if not decision or decision.get("search_query_effect_status") == "rule_blocked":
         return "deny"
+    if edge_id in V4_WALK_GATE_EDGES or edge_id == "video_to_hashtag":
+        if _is_v4_decision(decision) and not _v4_allow_walk_allowed(decision):
+            return "deny"
     return edge_permission(policy, decision.get("decision_action"), edge_id)
 
 
+def _is_v4_decision(decision: dict[str, Any] | None) -> bool:
+    scorecard = (decision or {}).get("scorecard") or {}
+    return isinstance(scorecard, dict) and scorecard.get("schema_version") == V4_SCORECARD_SCHEMA_VERSION
+
+
+def _v4_allow_walk_allowed(decision: dict[str, Any] | None) -> bool:
+    if not _is_v4_decision(decision):
+        return False
+    replay_data = (decision or {}).get("decision_replay_data") or {}
+    return replay_data.get("allow_walk") is True
+
+
+def _v4_walk_gate_denied(decision: dict[str, Any] | None) -> bool:
+    return _is_v4_decision(decision) and not _v4_allow_walk_allowed(decision)
+
+
 def _binding_by_edge_id(walk_strategy: dict[str, Any]) -> dict[str, dict[str, Any]]:
     return {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])}
 
@@ -951,13 +1062,43 @@ def _execution_record(decision: dict[str, Any] | None, *, content_pack_id: str)
     }
 
 
-def _decision_context(decision: dict[str, Any] | None) -> dict[str, Any]:
+def _decision_context(decision: dict[str, Any] | None, *, denied: bool = False) -> dict[str, Any]:
     if not decision:
         return {"decision_action": None, "search_query_effect_status": None}
-    return {
+    context = {
+        "decision_id": decision.get("decision_id"),
         "decision_action": decision.get("decision_action"),
         "search_query_effect_status": decision.get("search_query_effect_status"),
     }
+    if _is_v4_decision(decision):
+        context.update(_v4_walk_gate_context(decision, denied=denied))
+    return context
+
+
+def _v4_walk_gate_context(decision: dict[str, Any], *, denied: bool = False) -> dict[str, Any]:
+    replay_data = decision.get("decision_replay_data") or {}
+    allow_walk = replay_data.get("allow_walk")
+    is_denied = denied or allow_walk is not True
+    return {
+        "allow_walk": allow_walk,
+        "allow_walk_reason": replay_data.get("allow_walk_reason"),
+        "walk_gate_snapshot": replay_data.get("walk_gate_snapshot"),
+        "walk_gate_status": "denied" if is_denied else "allowed",
+        "walk_gate_reason_code": V4_ALLOW_WALK_DENIED_REASON if is_denied else None,
+    }
+
+
+def _walk_gate_context_from_query_row(row: dict[str, Any]) -> dict[str, Any]:
+    payload = row.get("raw_payload") or {}
+    fields = [
+        "decision_id",
+        "allow_walk",
+        "allow_walk_reason",
+        "walk_gate_snapshot",
+        "walk_gate_status",
+        "walk_gate_reason_code",
+    ]
+    return {field: payload[field] for field in fields if field in payload}
 
 
 def _merge_batch(context: dict[str, list[dict[str, Any]]], batch: dict[str, list[dict[str, Any]]]) -> None:

+ 29 - 0
content_agent/integrations/walk_strategy_json.py

@@ -17,6 +17,7 @@ RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
 REQUIRED_SECTIONS = [
     "walk_edge_catalog",
     "walk_rule_pack_binding",
+    "v4_walk_gate",
     "walk_fact_contract",
 ]
 
@@ -68,6 +69,7 @@ def validate_walk_strategy_config(
 
     edge_ids = _ids(strategy["walk_edge_catalog"], "edge_id")
     _check_edge_refs(strategy, edge_ids, findings)
+    _check_v4_walk_gate(strategy["v4_walk_gate"], edge_ids, findings)
     _check_fact_contract(strategy["walk_fact_contract"], findings)
     _check_rule_pack_bindings(
         strategy["walk_rule_pack_binding"],
@@ -130,6 +132,33 @@ def _check_fact_contract(
         _fail(findings, "search_clues_unique_key", "search_clues unique key must use clue_id")
 
 
+def _check_v4_walk_gate(
+    gates: list[dict[str, Any]],
+    edge_ids: set[str],
+    findings: list[dict[str, Any]],
+) -> None:
+    by_id = {gate.get("gate_id"): gate for gate in gates}
+    gate = by_id.get("allow_walk_required")
+    if not gate:
+        _fail(findings, "v4_walk_gate_missing", "allow_walk_required gate is required")
+        return
+    if gate.get("requires_allow_walk") is not True:
+        _fail(findings, "v4_walk_gate_requires_allow_walk", "allow_walk_required must require allow_walk")
+    if gate.get("deny_reason_code") != "v4_allow_walk_denied":
+        _fail(findings, "v4_walk_gate_deny_reason", "allow_walk_required deny reason is invalid")
+    applies_to = gate.get("applies_to_edges")
+    expected_edges = {"query_next_page", "hashtag_to_query", "author_to_works"}
+    if set(applies_to or []) != expected_edges:
+        _fail(findings, "v4_walk_gate_edges", "allow_walk_required must cover M4 expansion edges")
+    for edge_id in applies_to or []:
+        if edge_id not in edge_ids:
+            _fail(findings, "v4_walk_gate_edge_ref", f"allow_walk_required unknown edge_id: {edge_id}")
+    raw_payload_fields = set(gate.get("raw_payload_fields") or [])
+    expected_fields = {"decision_id", "allow_walk", "allow_walk_reason", "walk_gate_snapshot"}
+    if not expected_fields <= raw_payload_fields:
+        _fail(findings, "v4_walk_gate_raw_fields", "allow_walk_required raw payload fields incomplete")
+
+
 def _check_rule_pack_bindings(
     bindings: list[dict[str, Any]],
     rule_pack_path: Path,

+ 20 - 0
product_documents/抖音游走策略/douyin_walk_strategy.v1.json

@@ -150,6 +150,26 @@
       "notes": "Asset commit is owned by the Content pack decision."
     }
   ],
+  "v4_walk_gate": [
+    {
+      "gate_id": "allow_walk_required",
+      "requires_allow_walk": true,
+      "source_field": "rule_decisions.jsonl[].decision_replay_data.allow_walk",
+      "deny_reason_code": "v4_allow_walk_denied",
+      "applies_to_edges": [
+        "query_next_page",
+        "hashtag_to_query",
+        "author_to_works"
+      ],
+      "raw_payload_fields": [
+        "decision_id",
+        "allow_walk",
+        "allow_walk_reason",
+        "walk_gate_snapshot"
+      ],
+      "notes": "M4 V4 walk consumer gate. Runtime must write allow_walk evidence into walk_actions.raw_payload."
+    }
+  ],
   "walk_fact_contract": [
     {
       "runtime_file": "walk_actions.jsonl",

+ 1 - 0
scripts/build_config_from_excel.py

@@ -78,6 +78,7 @@ RULE_PACK_SPECS = [
 _WALK = {
     "walk_edge_catalog": "edge_id",
     "walk_rule_pack_binding": "binding_id",
+    "v4_walk_gate": "gate_id",
     "walk_fact_contract": "runtime_file",
 }
 WALK_SPECS = [SheetSpec(sheet, section=sheet, id_excel=idc) for sheet, idc in _WALK.items()]

+ 56 - 1
scripts/validate_v4_config_contract.py

@@ -11,6 +11,7 @@ from typing import Any
 ROOT = Path(__file__).resolve().parents[1]
 DATA_DIR = ROOT / "tech_documents/数据接口与来源"
 RULE_PACK_PATH = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json"
+WALK_STRATEGY_PATH = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json"
 LEGACY_FIELD_BLOCKLIST = {
     "fit_senior_50plus",
     "fit_confidence",
@@ -36,6 +37,8 @@ OBSERVABLE_FIELDS = {
     "statistics.play_count",
 }
 MISSING_OBSERVABLE_TYPES = {"natural_platform_missing", "runtime_missing"}
+M4_WALK_GATE_EDGES = {"query_next_page", "hashtag_to_query", "author_to_works"}
+M4_WALK_GATE_RAW_FIELDS = {"decision_id", "allow_walk", "allow_walk_reason", "walk_gate_snapshot"}
 
 
 def main() -> int:
@@ -60,9 +63,19 @@ def validate_v4_config_contract(root: Path = ROOT) -> list[dict[str, str]]:
     if walk_policy:
         _check_no_legacy_fields(findings, walk_policy, "walk_policy.json")
         _check_value(findings, "walk_policy_schema", walk_policy.get("schema_version"), "walk_policy.v1")
-        for key in ["global", "edge_budgets", "dedup", "edge_permissions"]:
+        for key in ["global", "edge_budgets", "dedup", "edge_permissions", "v4_walk_gate"]:
             if key not in walk_policy:
                 _fail(findings, "walk_policy_missing_key", f"walk_policy.json missing {key}")
+        _check_v4_walk_gate_config(findings, walk_policy.get("v4_walk_gate"), "walk_policy.v4_walk_gate")
+        if walk_graph:
+            graph_edges = {
+                edge.get("edge_id")
+                for edge in walk_graph.get("edges", [])
+                if isinstance(edge, dict)
+            }
+            missing = sorted(M4_WALK_GATE_EDGES - graph_edges)
+            if missing:
+                _fail(findings, "v4_walk_gate_graph_edges_missing", f"walk_graph missing M4 gate edges: {missing}")
 
     endpoint_registry = _load_json(data_dir / "crawler_endpoints.registry.json", findings)
     if endpoint_registry:
@@ -147,6 +160,11 @@ def validate_v4_config_contract(root: Path = ROOT) -> list[dict[str, str]]:
     if rule_pack_pkg:
         _check_v4_rule_pack_contract(findings, rule_pack_pkg)
 
+    walk_strategy = _load_json(WALK_STRATEGY_PATH, findings)
+    if walk_strategy:
+        _check_no_legacy_fields(findings, walk_strategy, "douyin_walk_strategy.v1.json")
+        _check_v4_walk_strategy_contract(findings, walk_strategy)
+
     return findings
 
 
@@ -365,6 +383,43 @@ def _check_m2_platform_specifics(
         _check_nested_status(findings, label, edges, "author_work_to_content", "blocked")
 
 
+def _check_v4_walk_gate_config(
+    findings: list[dict[str, str]],
+    gate: Any,
+    label: str,
+) -> None:
+    if not isinstance(gate, dict):
+        _fail(findings, "v4_walk_gate_invalid", f"{label} must be an object")
+        return
+    if gate.get("requires_allow_walk") is not True:
+        _fail(findings, "v4_walk_gate_requires_allow_walk", f"{label}.requires_allow_walk must be true")
+    if gate.get("source_field") != "rule_decisions.jsonl[].decision_replay_data.allow_walk":
+        _fail(findings, "v4_walk_gate_source_field", f"{label}.source_field is invalid")
+    if gate.get("deny_reason_code") != "v4_allow_walk_denied":
+        _fail(findings, "v4_walk_gate_deny_reason", f"{label}.deny_reason_code is invalid")
+    if set(gate.get("applies_to_edges") or []) != M4_WALK_GATE_EDGES:
+        _fail(findings, "v4_walk_gate_edges", f"{label}.applies_to_edges must cover M4 expansion edges")
+    raw_fields = set(gate.get("raw_payload_fields") or [])
+    if not M4_WALK_GATE_RAW_FIELDS <= raw_fields:
+        _fail(findings, "v4_walk_gate_raw_fields", f"{label}.raw_payload_fields missing required fields")
+
+
+def _check_v4_walk_strategy_contract(
+    findings: list[dict[str, str]],
+    strategy: dict[str, Any],
+) -> None:
+    rows = strategy.get("v4_walk_gate")
+    if not isinstance(rows, list) or not rows:
+        _fail(findings, "v4_walk_strategy_gate_missing", "douyin_walk_strategy.v1.json missing v4_walk_gate")
+        return
+    by_id = {row.get("gate_id"): row for row in rows if isinstance(row, dict)}
+    gate = by_id.get("allow_walk_required")
+    if not gate:
+        _fail(findings, "v4_walk_strategy_gate_missing", "allow_walk_required gate missing")
+        return
+    _check_v4_walk_gate_config(findings, gate, "douyin_walk_strategy.v4_walk_gate.allow_walk_required")
+
+
 def _check_nested_status(
     findings: list[dict[str, str]],
     label: str,

+ 16 - 0
tech_documents/数据接口与来源/walk_policy.json

@@ -28,6 +28,22 @@
     "KEEP_CONTENT_FOR_REVIEW": { "author_to_works": "allow_low_budget", "video_to_hashtag": { "value": "deny", "tbd": false, "note": "拍板 2026-06-11:从严 deny(v1 等价,防漂移)" }, "decision_to_asset": "deny" },
     "REJECT_CONTENT":          { "author_to_works": "deny", "video_to_hashtag": "deny", "decision_to_asset": "deny", "note": "即 path_stop:rule_blocked/failed 不开任何出边" }
   },
+  "v4_walk_gate": {
+    "requires_allow_walk": true,
+    "source_field": "rule_decisions.jsonl[].decision_replay_data.allow_walk",
+    "deny_reason_code": "v4_allow_walk_denied",
+    "applies_to_edges": [
+      "query_next_page",
+      "hashtag_to_query",
+      "author_to_works"
+    ],
+    "raw_payload_fields": [
+      "decision_id",
+      "allow_walk",
+      "allow_walk_reason",
+      "walk_gate_snapshot"
+    ]
+  },
   "reseed_quality_gate": {
     "rule": "回灌边(reseed)仅从 gate=keep_only 放行的内容采集种子;V3 下 keep_only 的判据 = 判定 decision_action 经 edge_permissions 查表",
     "tag_quality": "仅强相关 tag 可扩散(沿用 hashtag node 现有约束);弱平台(快手 topic_list 常空、youtube 仅正文正则)允许正则兜底但同样过质量门"

BIN
tech_documents/游走策略/游走策略配置表.xlsx


+ 108 - 108
tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json

@@ -1,110 +1,110 @@
 [
- [
-  "author_to_works",
-  "<scrubbed>",
-  "<scrubbed>",
-  "fetch_author_works",
-  "success",
-  "normal",
-  ""
- ],
- [
-  "budget_downgrade",
-  "d_003",
-  "7406990358799732018",
-  "downgrade_budget",
-  "success",
-  "low_budget",
-  "v4_score_review_needed"
- ],
- [
-  "decision_to_asset",
-  "d_001",
-  "7590384169986572223",
-  "commit_asset",
-  "success",
-  "normal",
-  "v4_query_and_platform_pass"
- ],
- [
-  "decision_to_asset",
-  "d_002",
-  "7595957496808770826",
-  "commit_asset",
-  "success",
-  "normal",
-  "v4_query_and_platform_pass"
- ],
- [
-  "hashtag_to_query",
-  "2026养生",
-  "tag_b9984f678e",
-  "create_tag_query",
-  "success",
-  "normal",
-  ""
- ],
- [
-  "hashtag_to_query",
-  "7406990358799732018",
-  "tag_query_skipped",
-  "create_tag_query",
-  "skipped",
-  "blocked",
-  "review_tag_expansion_disabled"
- ],
- [
-  "hashtag_to_query",
-  "7577667864522907506",
-  "tag_query_skipped",
-  "create_tag_query",
-  "skipped",
-  "blocked",
-  "blocked_by_rule_decision"
- ],
- [
-  "hashtag_to_query",
-  "7595957496808770826",
-  "tag_query_skipped",
-  "create_tag_query",
-  "skipped",
-  "blocked",
-  "budget_exhausted"
- ],
- [
-  "hashtag_to_query",
-  "中医养生",
-  "tag_88fedc1c14",
-  "create_tag_query",
-  "success",
-  "normal",
-  ""
- ],
- [
-  "hashtag_to_query",
-  "黄帝内经",
-  "tag_838c83a538",
-  "create_tag_query",
-  "success",
-  "normal",
-  ""
- ],
- [
-  "path_stop",
-  "d_004",
-  "7577667864522907506",
-  "stop_path",
-  "skipped",
-  "stop",
-  "v4_query_or_score_below_threshold"
- ],
- [
-  "query_next_page",
-  "q_001",
-  "q_001_page_002",
-  "fetch_next_page",
-  "success",
-  "normal",
-  ""
- ]
+  [
+    "author_to_works",
+    "<scrubbed>",
+    "<scrubbed>",
+    "fetch_author_works",
+    "success",
+    "normal",
+    ""
+  ],
+  [
+    "budget_downgrade",
+    "d_003",
+    "7406990358799732018",
+    "downgrade_budget",
+    "success",
+    "low_budget",
+    "v4_score_review_needed"
+  ],
+  [
+    "decision_to_asset",
+    "d_001",
+    "7590384169986572223",
+    "commit_asset",
+    "success",
+    "normal",
+    "v4_query_and_platform_pass"
+  ],
+  [
+    "decision_to_asset",
+    "d_002",
+    "7595957496808770826",
+    "commit_asset",
+    "success",
+    "normal",
+    "v4_query_and_platform_pass"
+  ],
+  [
+    "hashtag_to_query",
+    "2026养生",
+    "tag_b9984f678e",
+    "create_tag_query",
+    "success",
+    "normal",
+    ""
+  ],
+  [
+    "hashtag_to_query",
+    "7406990358799732018",
+    "tag_query_skipped",
+    "create_tag_query",
+    "skipped",
+    "blocked",
+    "v4_allow_walk_denied"
+  ],
+  [
+    "hashtag_to_query",
+    "7577667864522907506",
+    "tag_query_skipped",
+    "create_tag_query",
+    "skipped",
+    "blocked",
+    "v4_allow_walk_denied"
+  ],
+  [
+    "hashtag_to_query",
+    "7595957496808770826",
+    "tag_query_skipped",
+    "create_tag_query",
+    "skipped",
+    "blocked",
+    "budget_exhausted"
+  ],
+  [
+    "hashtag_to_query",
+    "中医养生",
+    "tag_88fedc1c14",
+    "create_tag_query",
+    "success",
+    "normal",
+    ""
+  ],
+  [
+    "hashtag_to_query",
+    "黄帝内经",
+    "tag_838c83a538",
+    "create_tag_query",
+    "success",
+    "normal",
+    ""
+  ],
+  [
+    "path_stop",
+    "d_004",
+    "7577667864522907506",
+    "stop_path",
+    "skipped",
+    "stop",
+    "v4_query_or_score_below_threshold"
+  ],
+  [
+    "query_next_page",
+    "q_001",
+    "q_001_page_002",
+    "fetch_next_page",
+    "success",
+    "normal",
+    ""
+  ]
 ]

+ 26 - 0
tests/p6_walk_helpers.py

@@ -100,6 +100,32 @@ def build_initial_walk_context(tmp_path: Path, *, tags: list[str] | None = None)
     }
 
 
+def set_v4_allow_walk(decision: dict[str, Any], allow_walk: bool) -> None:
+    query_score = 80
+    platform_score = 66.84 if allow_walk else 60
+    total_score = round(query_score * 0.5 + platform_score * 0.5, 2)
+    decision["decision_action"] = "ADD_TO_CONTENT_POOL"
+    decision["decision_reason_code"] = "v4_query_and_platform_pass"
+    decision["search_query_effect_status"] = "success"
+    decision["score"] = total_score
+    decision.setdefault("scorecard", {})["schema_version"] = "v4_scorecard.v1"
+    decision["scorecard"]["query_relevance_score"] = query_score
+    decision["scorecard"]["platform_performance_score"] = platform_score
+    decision["scorecard"]["total_score"] = total_score
+    replay = decision.setdefault("decision_replay_data", {})
+    replay["allow_walk"] = allow_walk
+    replay["allow_walk_reason"] = (
+        "query>=70/platform>=65/score>=70"
+        if allow_walk
+        else "v4_query_and_platform_pass"
+    )
+    replay["walk_gate_snapshot"] = {
+        "query_relevance_score": query_score,
+        "platform_performance_score": platform_score,
+        "score": total_score,
+    }
+
+
 def _platform_result(
     query: dict[str, Any],
     platform_content_id: str,

+ 2 - 2
tests/test_case_replay.py

@@ -128,9 +128,9 @@ def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
     assert executed_tags
     assert all(row["budget_tier"] == "normal" for row in executed_tags)
     assert sorted(row["reason_code"] for row in skipped_tags) == [
-        "blocked_by_rule_decision",
         "budget_exhausted",
-        "review_tag_expansion_disabled",
+        "v4_allow_walk_denied",
+        "v4_allow_walk_denied",
     ]
     assert len(executed_tags) == 3
 

+ 2 - 1
tests/test_config_case_matrix.py

@@ -121,7 +121,8 @@ def test_judge_ok_block_blocks_all_walk_expansion(tmp_path):
     ]
     assert expansions
     assert all(row["walk_status"] == "skipped" for row in expansions)
-    assert all(row["reason_code"] == "blocked_by_rule_decision" for row in expansions)
+    assert all(row["reason_code"] == "v4_allow_walk_denied" for row in expansions)
+    assert all(row["raw_payload"]["allow_walk"] is False for row in expansions)
 
     path_stops = [row for row in walk_actions if row["edge_id"] == "path_stop"]
     assert len(path_stops) == 4

+ 114 - 0
tests/test_v4_m4_walk_replay.py

@@ -0,0 +1,114 @@
+import json
+
+from content_agent.business_modules import learning_review, result_source_lookup, run_record
+from content_agent.business_modules.run_record.validation import validate_run
+from content_agent.business_modules.walk_engine import run_bounded_walk
+from content_agent.integrations.database_runtime import DatabaseRuntimeStore
+from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context, set_v4_allow_walk
+from tests.test_database_runtime import FakeConnection, _config, _insert_values
+
+
+def test_v4_m4_walk_replay_blocks_allow_walk_false_expansions(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], False)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls == []
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") in {"query_next_page", "tag_query"}
+    ]
+    denied = [
+        row for row in result["walk_actions"]
+        if row["reason_code"] == "v4_allow_walk_denied"
+    ]
+    assert {row["edge_id"] for row in denied} == {
+        "query_next_page",
+        "hashtag_to_query",
+        "author_to_works",
+    }
+    assert all(row["raw_payload"]["allow_walk"] is False for row in denied)
+    assert all(row["raw_payload"]["walk_gate_snapshot"] for row in denied)
+
+
+def test_v4_m4_walk_replay_allows_true_and_validates_runtime(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], True)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls
+    assert [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "query_next_page"
+    ]
+    assert [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "tag_query"
+    ]
+    expansion_success = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] in {"query_next_page", "hashtag_to_query", "author_to_works"}
+        and row["walk_status"] == "success"
+    ]
+    assert expansion_success
+    assert all(row["raw_payload"]["allow_walk"] is True for row in expansion_success)
+    assert all(row["raw_payload"]["walk_gate_snapshot"] for row in expansion_success)
+
+    _record_runtime(context, result)
+    validation = validate_run(context["run_id"], context["runtime"])
+    fails = [finding for finding in validation["findings"] if finding["level"] == "fail"]
+    assert validation["status"] == "pass", fails
+
+
+def test_v4_m4_database_runtime_preserves_walk_gate_payload(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], False)
+    result = run_bounded_walk(platform_client=FakeWalkPlatformClient(), **context)
+    denied = next(
+        row for row in result["walk_actions"]
+        if row["reason_code"] == "v4_allow_walk_denied"
+    )
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    store.append_jsonl(context["run_id"], "rule_decisions.jsonl", context["rule_decisions"])
+    store.append_jsonl(context["run_id"], "walk_actions.jsonl", [denied])
+
+    sql, params = connection.statements[-1]
+    values = _insert_values(sql, params)
+    assert "INSERT INTO `content_agent_walk_actions`" in sql
+    payload = json.loads(values["raw_payload"])
+    assert payload["allow_walk"] is False
+    assert payload["allow_walk_reason"] == "v4_query_and_platform_pass"
+    assert payload["walk_gate_snapshot"]
+    assert payload["walk_gate_reason_code"] == "v4_allow_walk_denied"
+
+
+def _record_runtime(context, result):
+    record = run_record.run(
+        context["run_id"],
+        context["policy_run_id"],
+        result["search_queries"],
+        result["discovered_content_items"],
+        result["rule_decisions"],
+        result["source_path_record_basis"],
+        context["policy_bundle"],
+        context["runtime"],
+        walk_actions=result["walk_actions"],
+    )
+    result_source_lookup.run(
+        context["run_id"],
+        context["policy_run_id"],
+        context["policy_bundle"],
+        result["discovered_content_items"],
+        result["content_media_records"],
+        result["rule_decisions"],
+        record["source_path_records"],
+        record["search_clues"],
+        context["runtime"],
+    )
+    learning_review.run(context["run_id"], context["policy_run_id"], context["runtime"])

+ 92 - 0
tests/test_v4_validator_contract.py

@@ -62,6 +62,72 @@ def test_v4_walk_gate_rejects_allow_walk_below_threshold(tmp_path):
     assert "v4_allow_walk_threshold_mismatch" in _check_ids(result)
 
 
+def test_v4_walk_gate_rejects_allow_walk_false_at_passing_threshold(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        data["rule_decisions.jsonl"][0]["decision_replay_data"]["allow_walk"] = False
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert "v4_allow_walk_threshold_mismatch" in _check_ids(result)
+
+
+def test_v4_walk_action_rejects_success_when_allow_walk_false(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        decision = data["rule_decisions.jsonl"][0]
+        decision["score"] = 70
+        decision["scorecard"]["platform_performance_score"] = 60
+        decision["decision_replay_data"]["allow_walk"] = False
+        decision["decision_replay_data"]["allow_walk_reason"] = "v4_query_and_platform_pass"
+        decision["decision_replay_data"]["walk_gate_snapshot"] = {
+            "query_relevance_score": 80,
+            "platform_performance_score": 60,
+            "score": 70,
+        }
+        data["walk_actions.jsonl"].append(
+            _walk_action(
+                "walk_page_001",
+                "query_next_page",
+                "success",
+                {
+                    "decision_id": "decision_001",
+                    "allow_walk": False,
+                    "allow_walk_reason": "v4_query_and_platform_pass",
+                    "walk_gate_snapshot": decision["decision_replay_data"]["walk_gate_snapshot"],
+                },
+            )
+        )
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert "v4_walk_action_allow_walk_denied" in _check_ids(result)
+
+
+def test_v4_walk_action_rejects_missing_gate_snapshot(tmp_path):
+    def mutate(data: dict[str, Any]) -> None:
+        data["walk_actions.jsonl"].append(
+            _walk_action(
+                "walk_tag_001",
+                "hashtag_to_query",
+                "success",
+                {
+                    "decision_id": "decision_001",
+                    "allow_walk": True,
+                    "allow_walk_reason": "query>=70/platform>=65/score>=70",
+                },
+            )
+        )
+
+    runtime = _write_runtime(tmp_path, mutate)
+
+    result = validate_run(RUN_ID, runtime)
+
+    assert "v4_walk_action_gate_context_missing" in _check_ids(result)
+
+
 def test_v4_action_thresholds_reject_conflicting_action(tmp_path):
     def mutate(data: dict[str, Any]) -> None:
         decision = data["rule_decisions.jsonl"][0]
@@ -375,5 +441,31 @@ def _path(
     }
 
 
+def _walk_action(
+    action_id: str,
+    edge_id: str,
+    status: str,
+    raw_payload: dict[str, Any],
+) -> dict[str, Any]:
+    return {
+        "record_schema_version": "runtime_record.v1",
+        "run_id": RUN_ID,
+        "policy_run_id": POLICY_RUN_ID,
+        "walk_action_id": action_id,
+        "edge_id": edge_id,
+        "edge_type": "query",
+        "from_node_type": "SearchQuery",
+        "from_node_id": "query_001",
+        "to_node_type": "SearchQuery",
+        "to_node_id": "query_002",
+        "walk_action": "fetch_next_page",
+        "walk_status": status,
+        "budget_tier": "normal",
+        "depth": 1,
+        "reason_code": None,
+        "raw_payload": raw_payload,
+    }
+
+
 def _check_ids(result: dict[str, Any]) -> list[str]:
     return [finding["check_id"] for finding in result["findings"]]

+ 45 - 0
tests/test_v4_walk_contract.py

@@ -97,5 +97,50 @@ def test_v4_video_and_author_dedup_keys_are_separate():
     assert dedup["content_key"] != dedup["author_key"]
 
 
+def test_v4_walk_policy_defines_allow_walk_gate():
+    gate = _json(DATA_DIR / "walk_policy.json")["v4_walk_gate"]
+
+    assert gate["requires_allow_walk"] is True
+    assert gate["deny_reason_code"] == "v4_allow_walk_denied"
+    assert set(gate["applies_to_edges"]) == {
+        "query_next_page",
+        "hashtag_to_query",
+        "author_to_works",
+    }
+    assert {
+        "decision_id",
+        "allow_walk",
+        "allow_walk_reason",
+        "walk_gate_snapshot",
+    } <= set(gate["raw_payload_fields"])
+
+
+def test_v4_walk_strategy_json_and_excel_define_allow_walk_gate():
+    import pytest
+
+    openpyxl = pytest.importorskip("openpyxl")
+    strategy = _json(ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json")
+    gate = strategy["v4_walk_gate"][0]
+
+    assert gate["gate_id"] == "allow_walk_required"
+    assert gate["deny_reason_code"] == "v4_allow_walk_denied"
+
+    workbook = openpyxl.load_workbook(
+        ROOT / "tech_documents/游走策略/游走策略配置表.xlsx",
+        read_only=True,
+        data_only=True,
+    )
+    assert "v4_walk_gate" in workbook.sheetnames
+    rows = list(workbook["v4_walk_gate"].iter_rows(values_only=True))
+    assert rows[0][:4] == (
+        "gate_id",
+        "requires_allow_walk",
+        "source_field",
+        "deny_reason_code",
+    )
+    assert rows[2][0] == "allow_walk_required"
+    assert rows[2][3] == "v4_allow_walk_denied"
+
+
 def _json(path: Path):
     return json.loads(path.read_text(encoding="utf-8"))

+ 33 - 1
tests/test_walk_engine_author.py

@@ -1,5 +1,5 @@
 from content_agent.business_modules.walk_engine import run_bounded_walk
-from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
+from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context, set_v4_allow_walk
 
 
 def test_walk_engine_author_works_reenter_content_decision_flow(tmp_path):
@@ -70,6 +70,38 @@ def test_author_edge_allows_add_content_pool(tmp_path):
     assert author_actions[0]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
 
 
+def test_v4_author_edge_denies_allow_walk_false(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], False)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls == []
+    skipped = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "author_to_works" and row["walk_status"] == "skipped"
+    ]
+    assert skipped
+    assert skipped[0]["reason_code"] == "v4_allow_walk_denied"
+    assert skipped[0]["raw_payload"]["decision_id"] == context["rule_decisions"][0]["decision_id"]
+    assert skipped[0]["raw_payload"]["allow_walk"] is False
+    assert skipped[0]["raw_payload"]["walk_gate_snapshot"]
+
+
+def test_non_v4_author_edge_keeps_legacy_permission(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    decision = context["rule_decisions"][0]
+    decision["scorecard"]["schema_version"] = "v3_scorecard.v1"
+    decision["decision_replay_data"].pop("allow_walk", None)
+    _override_decisions(context, "ADD_TO_CONTENT_POOL", "success")
+    client = FakeWalkPlatformClient()
+
+    run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls
+
+
 def test_author_edge_keeps_review_low_budget(tmp_path):
     context = build_initial_walk_context(tmp_path)
     _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending")

+ 38 - 1
tests/test_walk_engine_pagination.py

@@ -1,5 +1,5 @@
 from content_agent.business_modules.walk_engine import run_bounded_walk
-from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
+from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context, set_v4_allow_walk
 
 
 def test_walk_engine_pagination_uses_explicit_cursor(tmp_path):
@@ -19,6 +19,43 @@ def test_walk_engine_pagination_uses_explicit_cursor(tmp_path):
     assert any(row["walk_action"] == "fetch_next_page" for row in result["walk_actions"])
 
 
+def test_v4_pagination_denies_allow_walk_false(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], False)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "query_next_page"
+    ]
+    skipped = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "query_next_page" and row["walk_status"] == "skipped"
+    ]
+    assert skipped
+    assert skipped[0]["reason_code"] == "v4_allow_walk_denied"
+    assert skipped[0]["raw_payload"]["allow_walk"] is False
+    assert skipped[0]["raw_payload"]["walk_gate_snapshot"]
+
+
+def test_v4_pagination_success_carries_walk_gate_context(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], True)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    page_actions = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "query_next_page" and row["walk_status"] == "success"
+    ]
+    assert page_actions
+    assert page_actions[0]["raw_payload"]["allow_walk"] is True
+    assert page_actions[0]["raw_payload"]["walk_gate_snapshot"]
+
+
 def test_walk_engine_pagination_skips_without_cursor(tmp_path):
     context = build_initial_walk_context(tmp_path)
     context["discovered_content_items"][0]["next_cursor"] = ""

+ 38 - 1
tests/test_walk_engine_tag.py

@@ -1,5 +1,5 @@
 from content_agent.business_modules.walk_engine import run_bounded_walk
-from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
+from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context, set_v4_allow_walk
 
 
 def test_walk_engine_tag_query_is_created_only_inside_p6(tmp_path):
@@ -20,6 +20,43 @@ def test_walk_engine_tag_query_is_created_only_inside_p6(tmp_path):
     assert any(row["previous_discovery_step"] == "hashtag_to_query" for row in result["search_queries"])
 
 
+def test_v4_tag_query_denies_allow_walk_false(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], False)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "tag_query"
+    ]
+    skipped = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "hashtag_to_query" and row["walk_status"] == "skipped"
+    ]
+    assert skipped
+    assert skipped[0]["reason_code"] == "v4_allow_walk_denied"
+    assert skipped[0]["raw_payload"]["allow_walk"] is False
+    assert skipped[0]["raw_payload"]["walk_gate_snapshot"]
+
+
+def test_v4_tag_query_success_carries_walk_gate_context(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    set_v4_allow_walk(context["rule_decisions"][0], True)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    success = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "hashtag_to_query" and row["walk_status"] == "success"
+    ]
+    assert success
+    assert success[0]["raw_payload"]["allow_walk"] is True
+    assert success[0]["raw_payload"]["walk_gate_snapshot"]
+
+
 def test_walk_engine_blocks_generic_tag_query(tmp_path):
     blocked = "#" + "热" + "点"
     context = build_initial_walk_context(tmp_path, tags=[blocked])

+ 5 - 2
tests/test_walk_strategy_config.py

@@ -32,16 +32,19 @@ def test_walk_strategy_config_sections_and_references_are_closed():
 
 
 def test_walk_strategy_config_keeps_only_consumed_sections():
-    # V3 清理受控变化: 13 段收窄到 3 个仍被运行时/校验消费的段;
+    # V4 M4 受控变化: 13 段收窄到 4 个仍被运行时/校验消费的段;
     # 其余 10 段(老预算/停止/重试/触发规则等)已被 walk_graph+walk_policy 取代并删除。
     strategy = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
 
     assert set(REQUIRED_SECTIONS) == {
         "walk_edge_catalog",
         "walk_rule_pack_binding",
+        "v4_walk_gate",
         "walk_fact_contract",
     }
-    assert {key for key in strategy if key.startswith("walk_")} == set(REQUIRED_SECTIONS)
+    assert {key for key in strategy if key.startswith("walk_") or key == "v4_walk_gate"} == set(
+        REQUIRED_SECTIONS
+    )
 
 
 def test_walk_strategy_config_uses_clue_id_and_real_rule_packs():