Просмотр исходного кода

feat(M4): walk engine obeys rule decisions with edge binding ownership records

- M4A: query_next_page 只放行 search_query_effect_status=="success" 的 query,
  闸输入为内存 rule_decisions(经 content→query 关联,decision 无 search_query_id 字段)
- M4B: 作者/tag 边按 decision_action 分层——ADD 正常、KEEP 低预算扩作者不扩 tag、
  REJECT/rule_blocked 全停;被过滤动作写 skipped + 固定 reason 枚举
- M4C: walk_rule_pack_binding 增补 path_stop(Path 包)/decision_to_asset(Content 包)
  两条(JSON+Excel 双侧 byte-equal);walk action 顶层 rule_pack_id=边归属包,
  raw_payload.rule_pack_execution=实际执行事实,future 包不伪装已执行
- M4D: path_stop/budget_downgrade/decision_to_asset 归属修正(path_stop 不再错挂
  Content 包);source path 与 walk action 复用同一 binding/execution 对象;
  M4D brief 示例按 M4C 口径修正
- M4E: brief 逐字 19 个单测 + real_id45 回放受控断言(零翻页/低预算作者/tag 停)
  + portrait_reject 全停反证;真实事故三反例(幽灵翻页/作者失败/path_stop 错挂)回放中全部消除

256 passed;walk/config/schema 闸全 pass;快照零漂移;无 DB/runtime/graph 改动

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee 3 дней назад
Родитель
Сommit
410cd725bb

+ 215 - 12
content_agent/business_modules/walk_engine.py

@@ -8,6 +8,7 @@ from content_agent.business_modules import content_discovery, platform_access, r
 from content_agent.business_modules.content_discovery import pattern_recall
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
 from content_agent.errors import ContentAgentError
+from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 from content_agent.interfaces import (
     CategoryMatchClient,
     DecodeClient,
@@ -47,6 +48,8 @@ def run_bounded_walk(
     poll_interval_seconds: float = 5.0,
 ) -> dict[str, list[dict[str, Any]]]:
     created_at = datetime.now(timezone.utc).isoformat()
+    walk_strategy = WalkStrategyStore().load_walk_strategy()
+    content_pack_id = policy_bundle["rule_pack_id"]
     context = {
         "search_queries": list(search_queries),
         "discovered_content_items": list(discovered_content_items),
@@ -58,10 +61,15 @@ def run_bounded_walk(
     next_decision_index = len(rule_decisions) + 1
     next_recall_index = len(discovered_content_items) + 1
 
-    query_rows = _pagination_queries(run_id, policy_run_id, context["search_queries"], discovered_content_items, created_at)
-    query_rows.extend(
-        _tag_queries(run_id, policy_run_id, discovered_content_items, rule_decisions, created_at)
+    query_rows = _pagination_queries(
+        run_id, policy_run_id, context["search_queries"], discovered_content_items, rule_decisions, created_at
     )
+    tag_rows, tag_skipped_actions = _tag_queries(
+        run_id, policy_run_id, discovered_content_items, rule_decisions, created_at,
+        walk_strategy=walk_strategy, content_pack_id=content_pack_id,
+    )
+    query_rows.extend(tag_rows)
+    context["walk_actions"].extend(tag_skipped_actions)
     if query_rows:
         runtime.append_jsonl(run_id, "search_queries.jsonl", query_rows)
         context["search_queries"].extend(query_rows)
@@ -88,7 +96,10 @@ def run_bounded_walk(
         next_decision_index += len(batch["rule_decisions"])
         next_recall_index += len(batch["discovered_content_items"])
         context["walk_actions"].extend(
-            _query_actions(query_rows, batch.get("query_failures", []), created_at)
+            _query_actions(
+                query_rows, batch.get("query_failures", []), created_at,
+                walk_strategy=walk_strategy, content_pack_id=content_pack_id,
+            )
         )
 
     author_batch = _execute_author_edges(
@@ -97,6 +108,8 @@ def run_bounded_walk(
         pattern_seed_pack=pattern_seed_pack,
         source_context=source_context,
         discovered_content_items=context["discovered_content_items"],
+        rule_decisions=context["rule_decisions"],
+        walk_strategy=walk_strategy,
         policy_bundle=policy_bundle,
         platform_client=platform_client,
         runtime=runtime,
@@ -197,6 +210,8 @@ def _execute_author_edges(
     pattern_seed_pack: dict[str, Any],
     source_context: dict[str, Any],
     discovered_content_items: list[dict[str, Any]],
+    rule_decisions: list[dict[str, Any]],
+    walk_strategy: dict[str, Any],
     policy_bundle: dict[str, Any],
     platform_client: PlatformSearchClient,
     runtime: RuntimeFileStore,
@@ -214,6 +229,8 @@ def _execute_author_edges(
     if not callable(fetch_author_works):
         return _empty_batch()
 
+    decision_by_content_id = _decision_by_content_id(rule_decisions)
+    binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
     author_items = _unique_authors(discovered_content_items)[:2]
     platform_results: list[dict[str, Any]] = []
     walk_actions: list[dict[str, Any]] = []
@@ -222,6 +239,31 @@ def _execute_author_edges(
         if not author_id:
             continue
         action_id = _walk_action_id(run_id, policy_run_id, "author_to_works", author_id, "works")
+        decision = decision_by_content_id.get(item.get("platform_content_id"))
+        if not _can_expand_from_decision(decision, "author_to_works"):
+            walk_actions.append(
+                _walk_action(
+                    run_id,
+                    policy_run_id,
+                    action_id,
+                    "author_to_works",
+                    "author",
+                    "Author",
+                    author_id,
+                    "AuthorWorksPage",
+                    author_id,
+                    "fetch_author_works",
+                    "skipped",
+                    created_at,
+                    reason_code="blocked_by_rule_decision",
+                    budget_tier="blocked",
+                    rule_pack_binding=binding,
+                    rule_pack_execution=_execution_record(decision, content_pack_id=policy_bundle["rule_pack_id"]),
+                    raw_extra=_decision_context(decision),
+                )
+            )
+            continue
+        budget_tier = _walk_budget_for_decision(decision)
         try:
             works = fetch_author_works(
                 {
@@ -246,6 +288,10 @@ def _execute_author_edges(
                     "failed",
                     created_at,
                     reason_code=type(exc).__name__,
+                    budget_tier=budget_tier,
+                    rule_pack_binding=binding,
+                    rule_pack_execution=_execution_record(decision, content_pack_id=policy_bundle["rule_pack_id"]),
+                    raw_extra=_decision_context(decision),
                 )
             )
             continue
@@ -263,6 +309,10 @@ def _execute_author_edges(
                 "fetch_author_works",
                 "success",
                 created_at,
+                budget_tier=budget_tier,
+                rule_pack_binding=binding,
+                rule_pack_execution=_execution_record(decision, content_pack_id=policy_bundle["rule_pack_id"]),
+                raw_extra=_decision_context(decision),
             )
         )
         for index, work in enumerate(works[:3], start=1):
@@ -319,9 +369,11 @@ def _pagination_queries(
     policy_run_id: str,
     search_queries: list[dict[str, Any]],
     discovered_content_items: list[dict[str, Any]],
+    rule_decisions: list[dict[str, Any]],
     created_at: str,
 ) -> list[dict[str, Any]]:
     query_by_id = {row["search_query_id"]: row for row in search_queries}
+    query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions)
     seen: set[str] = set()
     rows: list[dict[str, Any]] = []
     for item in discovered_content_items:
@@ -329,6 +381,8 @@ def _pagination_queries(
         cursor = item.get("next_cursor")
         if not item.get("has_more") or not cursor or search_query_id in seen:
             continue
+        if not _can_fetch_next_page(search_query_id, query_effect_by_id):
+            continue
         source = query_by_id.get(search_query_id)
         if not source:
             continue
@@ -356,13 +410,49 @@ def _tag_queries(
     items: list[dict[str, Any]],
     decisions: list[dict[str, Any]],
     created_at: str,
-) -> list[dict[str, Any]]:
-    decision_by_content_id = {row["decision_target_id"]: row for row in decisions}
+    *,
+    walk_strategy: dict[str, Any],
+    content_pack_id: str,
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
+    decision_by_content_id = _decision_by_content_id(decisions)
+    binding, _ = _resolve_edge_binding("hashtag_to_query", walk_strategy)
     rows: list[dict[str, Any]] = []
+    skipped_actions: list[dict[str, Any]] = []
     seen: set[str] = set()
     for item in items:
-        decision = decision_by_content_id.get(item.get("platform_content_id")) or {}
-        if decision.get("search_query_effect_status") not in {"success", "pending"}:
+        decision = decision_by_content_id.get(item.get("platform_content_id"))
+        if not decision:
+            continue
+        if not _can_expand_from_decision(decision, "hashtag_to_query"):
+            if item.get("tags"):
+                reason_code = (
+                    "review_tag_expansion_disabled"
+                    if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"
+                    else "blocked_by_rule_decision"
+                )
+                skipped_actions.append(
+                    _walk_action(
+                        run_id,
+                        policy_run_id,
+                        _walk_action_id(
+                            run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tags"
+                        ),
+                        "hashtag_to_query",
+                        "tag_query",
+                        "Content",
+                        item["platform_content_id"],
+                        "SearchQuery",
+                        "tag_query_skipped",
+                        "create_tag_query",
+                        "skipped",
+                        created_at,
+                        reason_code=reason_code,
+                        budget_tier="blocked",
+                        rule_pack_binding=binding,
+                        rule_pack_execution=_execution_record(decision, content_pack_id=content_pack_id),
+                        raw_extra=_decision_context(decision),
+                    )
+                )
             continue
         for tag in item.get("tags") or []:
             normalized = str(tag).lstrip("#").strip()
@@ -386,14 +476,17 @@ def _tag_queries(
                 )
             )
             if len(rows) >= 1:
-                return rows
-    return rows
+                return rows, skipped_actions
+    return rows, skipped_actions
 
 
 def _query_actions(
     query_rows: list[dict[str, Any]],
     query_failures: list[dict[str, Any]],
     created_at: str,
+    *,
+    walk_strategy: dict[str, Any],
+    content_pack_id: str,
 ) -> list[dict[str, Any]]:
     failure_ids = {row["search_query_id"] for row in query_failures}
     actions: list[dict[str, Any]] = []
@@ -403,6 +496,7 @@ def _query_actions(
             if row.get("search_query_generation_method") == "tag_query"
             else "query_next_page"
         )
+        binding, _ = _resolve_edge_binding(edge_id, walk_strategy)
         actions.append(
             _walk_action(
                 row["run_id"],
@@ -426,6 +520,12 @@ def _query_actions(
                 "failed" if row["search_query_id"] in failure_ids else "success",
                 created_at,
                 page_cursor=row.get("page_cursor"),
+                rule_pack_binding=binding,
+                rule_pack_execution={
+                    "executed": True,
+                    "executed_rule_pack_id": content_pack_id,
+                    "reason": "content_decision_reused_for_walk_gate",
+                },
             )
         )
     return actions
@@ -477,8 +577,12 @@ def _walk_action(
     *,
     page_cursor: str | None = None,
     reason_code: str | None = None,
+    budget_tier: str | None = None,
+    rule_pack_binding: dict[str, Any] | None = None,
+    rule_pack_execution: dict[str, Any] | None = None,
+    raw_extra: dict[str, Any] | None = None,
 ) -> dict[str, Any]:
-    return with_raw_payload(
+    row = with_raw_payload(
         {
             "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
             "run_id": run_id,
@@ -492,13 +596,112 @@ def _walk_action(
             "to_node_id": to_node_id,
             "walk_action": walk_action,
             "walk_status": walk_status,
-            "budget_tier": "normal" if walk_status == "success" else "low_budget",
+            "budget_tier": budget_tier or ("normal" if walk_status == "success" else "low_budget"),
             "depth": 1,
             "page_cursor": page_cursor,
             "reason_code": reason_code,
+            "rule_pack_id": (rule_pack_binding or {}).get("rule_pack_id"),
+            "rule_pack_version": (rule_pack_binding or {}).get("rule_pack_version"),
             "created_at": created_at,
         }
     )
+    row["raw_payload"]["rule_pack_binding"] = rule_pack_binding or {}
+    row["raw_payload"]["rule_pack_execution"] = rule_pack_execution or {}
+    if raw_extra:
+        row["raw_payload"].update(raw_extra)
+    return row
+
+
+def _decision_by_content_id(rule_decisions: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
+    return {row["decision_target_id"]: row for row in rule_decisions}
+
+
+def _query_effect_by_search_query_id(
+    discovered_content_items: list[dict[str, Any]],
+    rule_decisions: list[dict[str, Any]],
+) -> dict[str, str]:
+    decision_by_content_id = _decision_by_content_id(rule_decisions)
+    statuses_by_query: dict[str, set[str]] = {}
+    for item in discovered_content_items:
+        decision = decision_by_content_id.get(item.get("platform_content_id"))
+        if not decision:
+            continue
+        query_sources = item.get("query_sources") or [{"search_query_id": item.get("search_query_id")}]
+        for query_source in query_sources:
+            search_query_id = query_source.get("search_query_id")
+            if search_query_id:
+                statuses_by_query.setdefault(search_query_id, set()).add(
+                    decision.get("search_query_effect_status")
+                )
+    effects: dict[str, str] = {}
+    for search_query_id, statuses in statuses_by_query.items():
+        for status in ("success", "pending", "rule_blocked", "failed"):
+            if status in statuses:
+                effects[search_query_id] = status
+                break
+    return effects
+
+
+def _can_fetch_next_page(search_query_id: str, query_effect_by_id: dict[str, str]) -> bool:
+    return query_effect_by_id.get(search_query_id) == "success"
+
+
+def _walk_budget_for_decision(decision: dict[str, Any] | None) -> str:
+    if not decision or decision.get("search_query_effect_status") == "rule_blocked":
+        return "blocked"
+    action = decision.get("decision_action")
+    if action == "ADD_TO_CONTENT_POOL":
+        return "normal"
+    if action == "KEEP_CONTENT_FOR_REVIEW":
+        return "low_budget"
+    return "blocked"
+
+
+def _can_expand_from_decision(decision: dict[str, Any] | None, edge_id: str) -> bool:
+    if not decision or decision.get("search_query_effect_status") == "rule_blocked":
+        return False
+    action = decision.get("decision_action")
+    if edge_id == "author_to_works":
+        return action in {"ADD_TO_CONTENT_POOL", "KEEP_CONTENT_FOR_REVIEW"}
+    if edge_id == "hashtag_to_query":
+        return action == "ADD_TO_CONTENT_POOL"
+    return False
+
+
+def _binding_by_edge_id(walk_strategy: dict[str, Any]) -> dict[str, dict[str, Any]]:
+    return {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])}
+
+
+def _resolve_edge_binding(
+    edge_id: str, walk_strategy: dict[str, Any]
+) -> tuple[dict[str, Any], str | None]:
+    binding = _binding_by_edge_id(walk_strategy).get(edge_id)
+    if not binding:
+        return {}, "edge_binding_missing"
+    return binding, None
+
+
+def _execution_record(decision: dict[str, Any] | None, *, content_pack_id: str) -> dict[str, Any]:
+    if decision:
+        return {
+            "executed": True,
+            "executed_rule_pack_id": decision.get("rule_pack_id") or content_pack_id,
+            "reason": "content_decision_reused_for_walk_gate",
+        }
+    return {
+        "executed": False,
+        "executed_rule_pack_id": None,
+        "reason": "future_pack_not_enabled",
+    }
+
+
+def _decision_context(decision: dict[str, Any] | None) -> dict[str, Any]:
+    if not decision:
+        return {"decision_action": None, "search_query_effect_status": None}
+    return {
+        "decision_action": decision.get("decision_action"),
+        "search_query_effect_status": decision.get("search_query_effect_status"),
+    }
 
 
 def _merge_batch(context: dict[str, list[dict[str, Any]]], batch: dict[str, list[dict[str, Any]]]) -> None:

+ 25 - 4
content_agent/business_modules/walk_strategy.py

@@ -5,6 +5,7 @@ from datetime import datetime, timezone
 from typing import Any
 
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
+from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 from content_agent.record_payload import with_raw_payload
 
 
@@ -13,7 +14,10 @@ def run(
     search_queries: list[dict[str, Any]],
     discovered_content_items: list[dict[str, Any]],
     decisions: list[dict[str, Any]],
+    walk_strategy: dict[str, Any] | None = None,
 ) -> dict[str, list[dict[str, Any]]]:
+    walk_strategy = walk_strategy or WalkStrategyStore().load_walk_strategy()
+    binding_by_edge = {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])}
     decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
     walk_actions: list[dict[str, Any]] = []
     source_path_record_basis: list[dict[str, Any]] = []
@@ -41,6 +45,12 @@ def run(
     for item in discovered_content_items:
         decision = decision_by_target_id[item["platform_content_id"]]
         decision_action = _action_for_decision(decision["decision_action"])
+        binding = binding_by_edge.get(decision_action["edge_id"]) or {}
+        execution = {
+            "executed": True,
+            "executed_rule_pack_id": decision["rule_pack_id"],
+            "reason": "content_decision_reused_for_walk_gate",
+        }
         walk_action_id = _walk_action_id(
             decision["run_id"],
             decision["policy_run_id"],
@@ -78,9 +88,13 @@ def run(
                     ),
                     "source_evidence_ref": decision["decision_input_snapshot_id"],
                     "walk_action_id": walk_action_id,
+                    "rule_pack_binding": binding,
+                    "rule_pack_execution": execution,
                 }
             )
-        walk_actions.append(_walk_action_row(decision, item, decision_action, walk_action_id, created_at))
+        walk_actions.append(
+            _walk_action_row(decision, item, decision_action, walk_action_id, created_at, binding, execution)
+        )
         if decision["decision_action"] == "ADD_TO_CONTENT_POOL":
             source_path_record_basis.append(
                 {
@@ -101,6 +115,8 @@ def run(
                     ),
                     "source_evidence_ref": decision["decision_input_snapshot_id"],
                     "walk_action_id": walk_action_id,
+                    "rule_pack_binding": binding,
+                    "rule_pack_execution": execution,
                 }
             )
 
@@ -139,8 +155,10 @@ def _walk_action_row(
     decision_action: dict[str, str],
     walk_action_id: str,
     created_at: str,
+    binding: dict[str, Any],
+    execution: dict[str, Any],
 ) -> dict[str, Any]:
-    return with_raw_payload(
+    row = with_raw_payload(
         {
             "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
             "run_id": decision["run_id"],
@@ -159,8 +177,8 @@ def _walk_action_row(
             "page_cursor": item.get("page_cursor"),
             "next_cursor": item.get("next_cursor"),
             "decision_id": decision["decision_id"],
-            "rule_pack_id": decision["rule_pack_id"],
-            "rule_pack_version": decision["rule_pack_version"],
+            "rule_pack_id": binding.get("rule_pack_id") or decision["rule_pack_id"],
+            "rule_pack_version": binding.get("rule_pack_version") or decision["rule_pack_version"],
             "reason_code": decision["decision_reason_code"],
             "content_effect_status": decision["search_query_effect_status"],
             "decision_target_type": decision.get("decision_target_type"),
@@ -168,6 +186,9 @@ def _walk_action_row(
             "created_at": created_at,
         }
     )
+    row["raw_payload"]["rule_pack_binding"] = binding
+    row["raw_payload"]["rule_pack_execution"] = execution
+    return row
 
 
 def _walk_action_id(

+ 20 - 0
product_documents/抖音游走策略/douyin_walk_strategy.v1.json

@@ -341,6 +341,26 @@
       "required": false,
       "dispatch_policy": "advisory",
       "notes": "Budget pack controls downgrade."
+    },
+    {
+      "binding_id": "bind_path_stop_path_pack",
+      "edge_id": "path_stop",
+      "target_entity": "Path",
+      "rule_pack_id": "douyin_path_stop_rule_pack_v1",
+      "rule_pack_version": "1.0.0",
+      "required": false,
+      "dispatch_policy": "advisory",
+      "notes": "Path pack owns stop edges; execution facts live in rule_pack_execution."
+    },
+    {
+      "binding_id": "bind_decision_asset_content_pack",
+      "edge_id": "decision_to_asset",
+      "target_entity": "Content",
+      "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
+      "rule_pack_version": "1.0.0",
+      "required": false,
+      "dispatch_policy": "advisory",
+      "notes": "Asset commit is owned by the Content pack decision."
     }
   ],
   "walk_budget_policy": [

+ 5 - 3
tech_documents/工程落地/v2_implementation_briefs/M4/M4D_Source_Path_Records.md

@@ -63,14 +63,16 @@
       "target_entity": "Path"
     },
     "rule_pack_execution": {
-      "executed": false,
-      "executed_rule_pack_id": null,
-      "reason": "future_pack_not_enabled"
+      "executed": true,
+      "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1",
+      "reason": "content_decision_reused_for_walk_gate"
     }
   }
 }
 ```
 
+注:path_stop 由真实 Content decision 驱动,按 M4C 口径 execution 为 `executed=true` 复用 Content 包;`executed=false / future_pack_not_enabled` 只用于无 decision 参与门控的动作。
+
 当 M4B 因 RuleDecision 跳过扩展时,`walk_actions.status="skipped"`,`source_path_records` 只在当前已有映射能表达该边时写入,不新增文件。
 
 ## 施工步骤

BIN
tech_documents/游走策略/游走策略配置表.xlsx


+ 26 - 0
tests/test_case_replay.py

@@ -102,3 +102,29 @@ def test_replay_synthetic_review_case(tmp_path):
     assert artifacts.summary["review_content_count"] >= 1
     assert artifacts.summary["pooled_content_count"] == 0
     assert_matches("syn_review/decision_summary", artifacts.summary, subset_keys=_SUMMARY_KEYS)
+
+
+def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
+    # M4 受控变化: KEEP/pending 的 query 不翻页、tag 不扩、作者只走低预算,动作全部带归属包与执行事实。
+    artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
+    walk_actions = artifacts.files["walk_actions.jsonl"]
+
+    assert not [row for row in walk_actions if row["edge_id"] == "query_next_page"]
+
+    tag_actions = [row for row in walk_actions if row["edge_id"] == "hashtag_to_query"]
+    assert tag_actions
+    assert all(row["walk_status"] == "skipped" for row in tag_actions)
+    assert all(row["reason_code"] == "review_tag_expansion_disabled" for row in tag_actions)
+
+    author_actions = [row for row in walk_actions if row["edge_id"] == "author_to_works"]
+    assert author_actions
+    assert all(row["budget_tier"] == "low_budget" for row in author_actions)
+
+    downgrades = [row for row in walk_actions if row["edge_id"] == "budget_downgrade"]
+    assert len(downgrades) == 4
+    assert all(row["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1" for row in downgrades)
+
+    for row in walk_actions:
+        execution = row["raw_payload"]["rule_pack_execution"]
+        assert execution["executed"] is True
+        assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"

+ 26 - 0
tests/test_config_case_matrix.py

@@ -111,3 +111,29 @@ def test_decoupling_counterproof():
     # so a non-Content (e.g. Author) pack can be routed without falling back.
     source = (ROOT / "content_agent/integrations/policy_json.py").read_text(encoding="utf-8")
     assert 'target_entity") == "Content"' not in source
+
+
+def test_portrait_reject_blocks_all_walk_expansion(tmp_path):
+    # M4 受控变化: 全拦截(rule_blocked)时翻页/作者/tag 全停,path_stop 归属 Path 包但执行包是 Content。
+    artifacts = replay_case(
+        "real_id45",
+        runtime_root=tmp_path / "rt",
+        config_overrides={"policy_store": _portrait_reject_store(tmp_path / "cfg")},
+    )
+    walk_actions = artifacts.files["walk_actions.jsonl"]
+
+    assert not [row for row in walk_actions if row["edge_id"] == "query_next_page"]
+    expansions = [
+        row for row in walk_actions if row["edge_id"] in {"author_to_works", "hashtag_to_query"}
+    ]
+    assert expansions
+    assert all(row["walk_status"] == "skipped" for row in expansions)
+    assert all(row["reason_code"] == "blocked_by_rule_decision" for row in expansions)
+
+    path_stops = [row for row in walk_actions if row["edge_id"] == "path_stop"]
+    assert len(path_stops) == 4
+    for row in path_stops:
+        assert row["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+        assert row["raw_payload"]["rule_pack_execution"]["executed_rule_pack_id"] == (
+            "douyin_content_discovery_rule_pack_v1"
+        )

+ 164 - 0
tests/test_walk_actions_runtime.py

@@ -63,3 +63,167 @@ def test_database_runtime_writes_walk_actions_to_formal_table():
     assert values["edge_id"] == "query_next_page"
     assert values["walk_status"] == "success"
     assert json.loads(values["raw_payload"])["record_schema_version"] == "runtime_record.v1"
+
+
+def _m4_decision(decision_id, target_id, action, effect_status):
+    return {
+        "record_schema_version": "runtime_record.v1",
+        "run_id": "run_001",
+        "policy_run_id": "policy_run_001",
+        "decision_id": decision_id,
+        "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
+        "rule_pack_version": "1.0.0",
+        "decision_target_type": "content",
+        "decision_target_id": target_id,
+        "decision_action": action,
+        "decision_reason_code": effect_status,
+        "search_query_effect_status": effect_status,
+        "decision_input_snapshot_id": f"evidence:{target_id}",
+    }
+
+
+def _m4_item(content_id):
+    return {
+        "platform_content_id": content_id,
+        "search_query_id": "q_001",
+        "discovery_start_source": "pattern_itemset",
+        "previous_discovery_step": "search_query_direct",
+    }
+
+
+def _run_walk_strategy(action, effect_status):
+    from content_agent.business_modules import walk_strategy
+
+    return walk_strategy.run(
+        pattern_seed_pack={"pattern_execution_id": 581},
+        search_queries=[],
+        discovered_content_items=[_m4_item("content_001")],
+        decisions=[_m4_decision("d_001", "content_001", action, effect_status)],
+    )
+
+
+def test_walk_action_records_edge_binding_rule_pack_id():
+    from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
+    from content_agent.integrations.walk_strategy_json import WalkStrategyStore
+
+    walk_strategy = WalkStrategyStore().load_walk_strategy()
+    binding, missing_reason = _resolve_edge_binding("query_next_page", walk_strategy)
+
+    assert missing_reason is None
+    row = _walk_action(
+        "run_001", "policy_run_001", "wa_test", "query_next_page", "pagination",
+        "SearchQuery", "q_001", "SearchPage", "page_002", "fetch_next_page", "success",
+        "2026-06-10T00:00:00+00:00",
+        rule_pack_binding=binding,
+        rule_pack_execution={"executed": True, "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1",
+                             "reason": "content_decision_reused_for_walk_gate"},
+    )
+    assert row["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    assert row["rule_pack_version"] == "1.0.0"
+    assert row["raw_payload"]["rule_pack_binding"]["edge_id"] == "query_next_page"
+
+
+def test_future_binding_records_not_executed():
+    from content_agent.business_modules.walk_engine import (
+        _execution_record,
+        _resolve_edge_binding,
+        _walk_action,
+    )
+    from content_agent.integrations.walk_strategy_json import WalkStrategyStore
+
+    walk_strategy = WalkStrategyStore().load_walk_strategy()
+    binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
+    execution = _execution_record(None, content_pack_id="douyin_content_discovery_rule_pack_v1")
+
+    row = _walk_action(
+        "run_001", "policy_run_001", "wa_test", "author_to_works", "author",
+        "Author", "a_001", "AuthorWorksPage", "a_001", "fetch_author_works", "skipped",
+        "2026-06-10T00:00:00+00:00",
+        rule_pack_binding=binding,
+        rule_pack_execution=execution,
+    )
+    assert row["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1"
+    assert row["raw_payload"]["rule_pack_execution"] == {
+        "executed": False,
+        "executed_rule_pack_id": None,
+        "reason": "future_pack_not_enabled",
+    }
+
+
+def test_content_decision_execution_does_not_overwrite_edge_owner():
+    result = _run_walk_strategy("REJECT_CONTENT", "rule_blocked")
+
+    action = result["walk_actions"][0]
+    assert action["edge_id"] == "path_stop"
+    assert action["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    execution = action["raw_payload"]["rule_pack_execution"]
+    assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
+    assert action["rule_pack_id"] != execution["executed_rule_pack_id"]
+
+
+def test_unknown_edge_binding_leaves_rule_pack_id_null_with_reason():
+    from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
+    from content_agent.integrations.walk_strategy_json import WalkStrategyStore
+
+    walk_strategy = WalkStrategyStore().load_walk_strategy()
+    binding, missing_reason = _resolve_edge_binding("unknown_edge", walk_strategy)
+
+    assert binding == {}
+    assert missing_reason == "edge_binding_missing"
+    row = _walk_action(
+        "run_001", "policy_run_001", "wa_test", "unknown_edge", "unknown",
+        "Node", "n_001", "Node", "n_002", "noop", "skipped",
+        "2026-06-10T00:00:00+00:00",
+        reason_code=missing_reason,
+        rule_pack_binding=binding,
+    )
+    assert row["rule_pack_id"] is None
+    assert row["reason_code"] == "edge_binding_missing"
+
+
+def test_path_stop_records_path_owner_without_claiming_execution():
+    result = _run_walk_strategy("REJECT_CONTENT", "rule_blocked")
+
+    action = result["walk_actions"][0]
+    assert action["edge_id"] == "path_stop"
+    assert action["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    binding = action["raw_payload"]["rule_pack_binding"]
+    assert binding["target_entity"] == "Path"
+    # 归属包是 Path 包,但实际执行判断的是 Content 包,不得伪装成 Path 包已执行。
+    assert action["raw_payload"]["rule_pack_execution"]["executed_rule_pack_id"] == (
+        "douyin_content_discovery_rule_pack_v1"
+    )
+
+
+def test_budget_downgrade_records_budget_owner():
+    result = _run_walk_strategy("KEEP_CONTENT_FOR_REVIEW", "pending")
+
+    action = result["walk_actions"][0]
+    assert action["edge_id"] == "budget_downgrade"
+    assert action["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1"
+    assert action["raw_payload"]["rule_pack_binding"]["target_entity"] == "Budget"
+
+
+def test_decision_to_asset_keeps_content_execution_and_asset_edge_owner():
+    result = _run_walk_strategy("ADD_TO_CONTENT_POOL", "success")
+
+    action = result["walk_actions"][0]
+    assert action["edge_id"] == "decision_to_asset"
+    assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
+    execution = action["raw_payload"]["rule_pack_execution"]
+    assert execution["executed"] is True
+    assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
+
+
+def test_walk_action_and_source_path_share_binding_contract():
+    result = _run_walk_strategy("ADD_TO_CONTENT_POOL", "success")
+
+    action = result["walk_actions"][0]
+    linked_paths = [
+        basis for basis in result["source_path_record_basis"]
+        if basis.get("walk_action_id") == action["walk_action_id"]
+    ]
+    assert linked_paths
+    for basis in linked_paths:
+        assert basis["rule_pack_binding"] == action["raw_payload"]["rule_pack_binding"]
+        assert basis["rule_pack_execution"] == action["raw_payload"]["rule_pack_execution"]

+ 57 - 0
tests/test_walk_engine_author.py

@@ -24,3 +24,60 @@ def test_walk_engine_author_edge_skips_missing_author_id(tmp_path):
     run_bounded_walk(platform_client=client, **context)
 
     assert client.author_calls == []
+
+
+def _override_decisions(context, action, effect_status):
+    for decision in context["rule_decisions"]:
+        decision["decision_action"] = action
+        decision["search_query_effect_status"] = effect_status
+
+
+def test_author_edge_skips_rejected_content(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    _override_decisions(context, "REJECT_CONTENT", "rule_blocked")
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls == []
+    skipped = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "author_to_works" and row["walk_status"] == "skipped"
+    ]
+    assert len(skipped) == 1
+    assert skipped[0]["reason_code"] == "blocked_by_rule_decision"
+    assert skipped[0]["budget_tier"] == "blocked"
+    assert skipped[0]["raw_payload"]["decision_action"] == "REJECT_CONTENT"
+
+
+def test_author_edge_allows_add_content_pool(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls
+    author_actions = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "author_to_works" and row["walk_status"] == "success"
+    ]
+    assert author_actions
+    assert author_actions[0]["budget_tier"] == "normal"
+    assert author_actions[0]["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1"
+
+
+def test_author_edge_keeps_review_low_budget(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending")
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls
+    author_actions = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "author_to_works" and row["walk_status"] == "success"
+    ]
+    assert author_actions
+    assert author_actions[0]["budget_tier"] == "low_budget"
+    assert author_actions[0]["raw_payload"]["rule_pack_execution"]["executed"] is True

+ 19 - 0
tests/test_walk_engine_budget.py

@@ -56,3 +56,22 @@ def _decision(decision_id, target_id, action, effect_status):
         "search_query_effect_status": effect_status,
         "decision_input_snapshot_id": f"evidence:{target_id}",
     }
+
+
+def test_keep_review_author_edge_uses_low_budget(tmp_path):
+    from content_agent.business_modules.walk_engine import run_bounded_walk
+    from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context
+
+    context = build_initial_walk_context(tmp_path)
+    for decision in context["rule_decisions"]:
+        decision["decision_action"] = "KEEP_CONTENT_FOR_REVIEW"
+        decision["search_query_effect_status"] = "pending"
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    author_actions = [
+        row for row in result["walk_actions"] if row["edge_id"] == "author_to_works"
+    ]
+    assert author_actions
+    assert all(row["budget_tier"] == "low_budget" for row in author_actions)

+ 67 - 0
tests/test_walk_engine_pagination.py

@@ -27,3 +27,70 @@ def test_walk_engine_pagination_skips_without_cursor(tmp_path):
         call for call in client.search_calls
         if call.get("search_query_generation_method") == "query_next_page"
     ]
+
+
+def _override_decisions(context, action, effect_status):
+    for decision in context["rule_decisions"]:
+        decision["decision_action"] = action
+        decision["search_query_effect_status"] = effect_status
+
+
+def test_pagination_requires_success_effect_status(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending")
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "query_next_page"
+    ]
+    assert not [row for row in result["walk_actions"] if row["edge_id"] == "query_next_page"]
+
+
+def test_rule_blocked_query_does_not_page_even_with_cursor(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    _override_decisions(context, "REJECT_CONTENT", "rule_blocked")
+    assert context["discovered_content_items"][0]["has_more"]
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "query_next_page"
+    ]
+    assert not [row for row in result["walk_actions"] if row["edge_id"] == "query_next_page"]
+
+
+def test_missing_decision_does_not_page(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    context["rule_decisions"] = []
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "query_next_page"
+    ]
+    assert not [row for row in result["walk_actions"] if row["edge_id"] == "query_next_page"]
+
+
+def test_success_query_with_cursor_pages_once(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    page_actions = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "query_next_page" and row["walk_status"] == "success"
+    ]
+    assert len(page_actions) == 1
+    assert page_actions[0]["raw_payload"]["rule_pack_execution"] == {
+        "executed": True,
+        "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1",
+        "reason": "content_decision_reused_for_walk_gate",
+    }

+ 44 - 0
tests/test_walk_engine_tag.py

@@ -28,3 +28,47 @@ def test_walk_engine_blocks_generic_tag_query(tmp_path):
         call for call in client.search_calls
         if call.get("search_query_generation_method") == "tag_query"
     ]
+
+
+def _override_decisions(context, action, effect_status):
+    for decision in context["rule_decisions"]:
+        decision["decision_action"] = action
+        decision["search_query_effect_status"] = effect_status
+
+
+def test_tag_query_skips_rejected_and_rule_blocked(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    _override_decisions(context, "REJECT_CONTENT", "rule_blocked")
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "tag_query"
+    ]
+    skipped = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "hashtag_to_query" and row["walk_status"] == "skipped"
+    ]
+    assert skipped
+    assert skipped[0]["reason_code"] == "blocked_by_rule_decision"
+
+
+def test_keep_review_does_not_expand_tag_by_default(tmp_path):
+    context = build_initial_walk_context(tmp_path)
+    _override_decisions(context, "KEEP_CONTENT_FOR_REVIEW", "pending")
+    client = FakeWalkPlatformClient()
+
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert not [
+        call for call in client.search_calls
+        if call.get("search_query_generation_method") == "tag_query"
+    ]
+    skipped = [
+        row for row in result["walk_actions"]
+        if row["edge_id"] == "hashtag_to_query" and row["walk_status"] == "skipped"
+    ]
+    assert skipped
+    assert skipped[0]["reason_code"] == "review_tag_expansion_disabled"

+ 23 - 0
tests/test_walk_strategy_config.py

@@ -40,6 +40,7 @@ def test_walk_strategy_config_uses_clue_id_and_real_rule_packs():
         "policy_run_id",
         "clue_id",
     ]
+    # M4 受控变化: path_stop/decision_to_asset binding 增补后,Content 包进入归属集合。
     assert {
         (row["rule_pack_id"], row["rule_pack_version"])
         for row in strategy["walk_rule_pack_binding"]
@@ -48,4 +49,26 @@ def test_walk_strategy_config_uses_clue_id_and_real_rule_packs():
         ("douyin_budget_observe_rule_pack_v1", "1.0.0"),
         ("douyin_tag_expansion_rule_pack_v1", "1.0.0"),
         ("douyin_path_stop_rule_pack_v1", "1.0.0"),
+        ("douyin_content_discovery_rule_pack_v1", "1.0.0"),
     }
+
+
+def test_walk_strategy_binding_is_loaded_from_walk_strategy():
+    from content_agent.business_modules.walk_engine import _binding_by_edge_id
+    from content_agent.integrations.walk_strategy_json import WalkStrategyStore
+
+    walk_strategy = WalkStrategyStore().load_walk_strategy()
+    binding_by_edge = _binding_by_edge_id(walk_strategy)
+
+    assert set(binding_by_edge) == {
+        "video_to_author",
+        "author_to_works",
+        "video_to_hashtag",
+        "hashtag_to_query",
+        "query_next_page",
+        "budget_downgrade",
+        "path_stop",
+        "decision_to_asset",
+    }
+    assert binding_by_edge["path_stop"]["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    assert binding_by_edge["decision_to_asset"]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"