Browse Source

feat(v3-m4): 边表驱动统一游走(删三段硬编码)+ 砍 4 future 包 + 行为等价收口

M4B 游走重写(行为等价,real_id45 决策/血缘/walk_actions 集合全等):
- walk_engine 删 _pagination_queries/_tag_queries/_execute_author_edges/_can_expand_from_decision/_walk_budget_for_decision;新 _expand_queries/_expand_authors/_terminal_stage 读 WalkGraphStore(profile supported→edge_permissions 通行证→edge_budgets 预算),复用 _execute_query_batch 等全部旧 helper
- 预算耗尽静默(对齐 v1);显式 skip 仅 permission-deny(旧 reason_code)+ profile blocked(新 edge_blocked_by_platform_profile,不调平台)
- 终端边+3类血缘并入 _terminal_stage(原 plan_walk 节点删除,graph 改 execute_walk→record_run 直连);walk_strategy.py 删 run 留 _action_for_decision/_walk_action_row/_walk_action_id;wa_id 两套后缀冻结
- rule_pack_id 戳统一 binding or content_pack 回退(fallback_rule_pack)

M4C 砍包(M3 推迟项,先删 binding 后删包防 fk 闭合性炸):
- rule_packs 5→1、dispatch 5→1、walk_rule_pack_binding 8→1(留 content);Excel 同步删 55 行/6 sheet;config gate 5 闸全过
- validation 删 source_evidence_missing_binding 分类树门槛 + SOURCE_EVIDENCE_FIELDS 去 category/element_bindings;decision_to_asset_* 保留

M4D 测试收口(受控变化均注明):
- KEEP/REJECT/扩展边 rule_pack_id 断言 future 包→内容包(test_case_replay/config_case_matrix/walk_engine_author/walk_actions_runtime/walk_strategy_config/api)
- 直调 walk_strategy.run 的 3 处改 _terminal_stage/run_bounded_walk 一体产物
- 新增 test_walk_profile_degradation:real_id45 walk_actions 指纹快照(实跑钉死,M5 并发一致性基线)+ 视频号 author 边 blocked 显式退化

基线 318→320 passed;只读验收岗 5/5 PASS。

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee 2 days ago
parent
commit
4738de0e2d

+ 1 - 4
content_agent/business_modules/run_record/validation.py

@@ -79,8 +79,6 @@ SOURCE_EVIDENCE_FIELDS = {
     "mining_config_id",
     "itemset_ids",
     "itemset_items",
-    "category_bindings",
-    "element_bindings",
     "support",
     "absolute_support",
     "matched_post_ids",
@@ -592,8 +590,7 @@ def _check_one_source_evidence(
     missing_fields = sorted(field for field in SOURCE_EVIDENCE_FIELDS if field not in source_evidence)
     if missing_fields:
         _fail(findings, "source_evidence_missing_fields", f"{label} missing {missing_fields}")
-    if not source_evidence.get("category_bindings") and not source_evidence.get("element_bindings"):
-        _fail(findings, "source_evidence_missing_binding", f"{label} lacks category or element binding")
+    # V3(M4):分类树 category/element binding 已随 decode 链路退役,不再作为血缘门槛。
 
     scalar_fields = [
         "pattern_execution_id",

+ 373 - 199
content_agent/business_modules/walk_engine.py

@@ -1,3 +1,11 @@
+"""有界游走(V3-M4):配置驱动的统一 frontier 流程,取代三段硬编码。
+
+每条边走同一套闸:platform_profiles 是否 supported(blocked 显式 skip 不调用平台)
+→ walk_policy.edge_permissions 按判定结果放行(取代 _can_expand_from_decision 硬编码)
+→ walk_policy.edge_budgets 预算(取代 [:2]/[:3]/>=1 散落硬限;预算耗尽静默,对齐 v1 行为)。
+终端边(commit/downgrade/stop)与 3 类血缘并入本模块终端阶段(原 plan_walk 节点已删)。
+"""
+
 from __future__ import annotations
 
 import hashlib
@@ -5,9 +13,15 @@ from datetime import datetime, timezone
 from typing import Any
 
 from content_agent.business_modules import content_discovery, platform_access, rule_judgment
+from content_agent.business_modules import walk_strategy as walk_terminal
 from content_agent.business_modules.content_discovery import pattern_recall
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
 from content_agent.errors import ContentAgentError
+from content_agent.integrations.walk_graph_json import (
+    WalkGraphStore,
+    edge_permission,
+    edge_supported,
+)
 from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 from content_agent.interfaces import (
     GeminiVideoClient,
@@ -45,7 +59,17 @@ def run_bounded_walk(
 ) -> dict[str, list[dict[str, Any]]]:
     created_at = datetime.now(timezone.utc).isoformat()
     walk_strategy = WalkStrategyStore().load_walk_strategy()
-    content_pack_id = policy_bundle["rule_pack_id"]
+    store = WalkGraphStore()
+    policy = store.load_policy()
+    platform = next(
+        (item["platform"] for item in discovered_content_items if item.get("platform")),
+        "douyin",
+    )
+    profile = store.load_profile(platform)
+    content_pack = {
+        "rule_pack_id": policy_bundle["rule_pack_id"],
+        "rule_pack_version": policy_bundle["rule_pack_version"],
+    }
     context = {
         "search_queries": list(search_queries),
         "discovered_content_items": list(discovered_content_items),
@@ -57,15 +81,12 @@ def run_bounded_walk(
     next_decision_index = len(rule_decisions) + 1
     next_recall_index = len(discovered_content_items) + 1
 
-    query_rows = _pagination_queries(
-        run_id, policy_run_id, context["search_queries"], discovered_content_items, rule_decisions, created_at
+    query_rows, query_skipped_actions = _expand_queries(
+        run_id, policy_run_id, context["search_queries"], discovered_content_items,
+        rule_decisions, created_at,
+        policy=policy, profile=profile, walk_strategy=walk_strategy, content_pack=content_pack,
     )
-    tag_rows, tag_skipped_actions = _tag_queries(
-        run_id, policy_run_id, discovered_content_items, rule_decisions, created_at,
-        walk_strategy=walk_strategy, content_pack_id=content_pack_id,
-    )
-    query_rows.extend(tag_rows)
-    context["walk_actions"].extend(tag_skipped_actions)
+    context["walk_actions"].extend(query_skipped_actions)
     if query_rows:
         runtime.append_jsonl(run_id, "search_queries.jsonl", query_rows)
         context["search_queries"].extend(query_rows)
@@ -91,19 +112,21 @@ def run_bounded_walk(
         context["walk_actions"].extend(
             _query_actions(
                 query_rows, batch.get("query_failures", []), created_at,
-                walk_strategy=walk_strategy, content_pack_id=content_pack_id,
+                walk_strategy=walk_strategy, content_pack=content_pack,
             )
         )
 
-    author_batch = _execute_author_edges(
+    author_batch = _expand_authors(
         run_id=run_id,
         policy_run_id=policy_run_id,
-        pattern_seed_pack=pattern_seed_pack,
         source_context=source_context,
         discovered_content_items=context["discovered_content_items"],
         rule_decisions=context["rule_decisions"],
+        policy=policy,
+        profile=profile,
         walk_strategy=walk_strategy,
         policy_bundle=policy_bundle,
+        content_pack=content_pack,
         platform_client=platform_client,
         runtime=runtime,
         gemini_video_client=gemini_video_client,
@@ -113,13 +136,23 @@ def run_bounded_walk(
     )
     _merge_batch(context, author_batch)
     context["walk_actions"].extend(author_batch.get("walk_actions", []))
-    # 作者作品的血缘 query 行:落盘后 walk_strategy(pattern_to_search_query 路径)与
+    # 作者作品的血缘 query 行:落盘后终端阶段(pattern_to_search_query 路径)与
     # recorder(search_clue 聚合)即可经既有机制覆盖作者内容,validate_run 不再断链。
     author_queries = author_batch.get("search_queries", [])
     if author_queries:
         runtime.append_jsonl(run_id, "search_queries.jsonl", author_queries)
         context["search_queries"].extend(author_queries)
 
+    terminal = _terminal_stage(
+        pattern_seed_pack,
+        context["search_queries"],
+        context["discovered_content_items"],
+        context["rule_decisions"],
+        walk_strategy,
+        created_at,
+    )
+    context["walk_actions"].extend(terminal["walk_actions"])
+    context["source_path_record_basis"] = terminal["source_path_record_basis"]
     return context
 
 
@@ -192,16 +225,141 @@ def _execute_query_batch(
     }
 
 
-def _execute_author_edges(
+def _expand_queries(
+    run_id: str,
+    policy_run_id: str,
+    search_queries: list[dict[str, Any]],
+    discovered_content_items: list[dict[str, Any]],
+    rule_decisions: list[dict[str, Any]],
+    created_at: str,
+    *,
+    policy: dict[str, Any],
+    profile: dict[str, Any],
+    walk_strategy: dict[str, Any],
+    content_pack: dict[str, Any],
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
+    budgets = policy["edge_budgets_by_id"]
+    decision_by_content_id = _decision_by_content_id(rule_decisions)
+    skipped_actions: list[dict[str, Any]] = []
+
+    # 边 query_next_page:has_more+cursor、查询 effect=success 才翻页;预算耗尽静默(v1 行为)。
+    page_rows: list[dict[str, Any]] = []
+    if edge_supported(profile, "query_next_page"):
+        page_budget = budgets["query_next_page"]["max_total_actions"]
+        query_by_id = {row["search_query_id"]: row for row in search_queries}
+        query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions)
+        seen_queries: set[str] = set()
+        for item in discovered_content_items:
+            if len(page_rows) >= page_budget:
+                break
+            search_query_id = item.get("search_query_id")
+            cursor = item.get("next_cursor")
+            if not item.get("has_more") or not cursor or search_query_id in seen_queries:
+                continue
+            if not _can_fetch_next_page(search_query_id, query_effect_by_id):
+                continue
+            source = query_by_id.get(search_query_id)
+            if not source:
+                continue
+            seen_queries.add(search_query_id)
+            page_rows.append(
+                _search_query_row(
+                    run_id,
+                    policy_run_id,
+                    f"{search_query_id}_page_002",
+                    source["search_query"],
+                    "query_next_page",
+                    source.get("discovery_start_source", "pattern_itemset"),
+                    "query_next_page",
+                    created_at,
+                    page_cursor=str(cursor),
+                    raw_extra={"parent_search_query_id": search_query_id},
+                )
+            )
+
+    # 回灌链 video_to_hashtag → hashtag_to_query:keep_only 门由 edge_permissions 查表。
+    tag_rows: list[dict[str, Any]] = []
+    if edge_supported(profile, "video_to_hashtag") and edge_supported(profile, "hashtag_to_query"):
+        tag_budget = budgets["hashtag_to_query"]["max_total_actions"]
+        tag_binding, _ = _resolve_edge_binding("hashtag_to_query", walk_strategy)
+        seen_tags: set[str] = set()
+        for item in discovered_content_items:
+            if len(tag_rows) >= tag_budget:
+                break
+            decision = decision_by_content_id.get(item.get("platform_content_id"))
+            if not decision:
+                continue
+            if _edge_permission_for(decision, "video_to_hashtag", policy) == "deny":
+                if item.get("tags"):
+                    reason_code = (
+                        "review_tag_expansion_disabled"
+                        if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"
+                        else "blocked_by_rule_decision"
+                    )
+                    skipped_actions.append(
+                        _walk_action(
+                            run_id,
+                            policy_run_id,
+                            _walk_action_id(
+                                run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tags"
+                            ),
+                            "hashtag_to_query",
+                            "tag_query",
+                            "Content",
+                            item["platform_content_id"],
+                            "SearchQuery",
+                            "tag_query_skipped",
+                            "create_tag_query",
+                            "skipped",
+                            created_at,
+                            reason_code=reason_code,
+                            budget_tier="blocked",
+                            rule_pack_binding=tag_binding,
+                            rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
+                            fallback_rule_pack=content_pack,
+                            raw_extra=_decision_context(decision),
+                        )
+                    )
+                continue
+            for tag in item.get("tags") or []:
+                normalized = str(tag).lstrip("#").strip()
+                if not normalized or normalized in seen_tags or _blocked_tag(normalized):
+                    continue
+                seen_tags.add(normalized)
+                tag_rows.append(
+                    _search_query_row(
+                        run_id,
+                        policy_run_id,
+                        f"tag_{_short_hash(normalized)}",
+                        normalized,
+                        "tag_query",
+                        item.get("discovery_start_source", "pattern_itemset"),
+                        "hashtag_to_query",
+                        created_at,
+                        raw_extra={
+                            "hashtag": normalized,
+                            "source_content_id": item.get("platform_content_id"),
+                        },
+                    )
+                )
+                if len(tag_rows) >= tag_budget:
+                    break
+
+    return [*page_rows, *tag_rows], skipped_actions
+
+
+def _expand_authors(
     *,
     run_id: str,
     policy_run_id: str,
-    pattern_seed_pack: dict[str, Any],
     source_context: dict[str, Any],
     discovered_content_items: list[dict[str, Any]],
     rule_decisions: list[dict[str, Any]],
+    policy: dict[str, Any],
+    profile: dict[str, Any],
     walk_strategy: dict[str, Any],
     policy_bundle: dict[str, Any],
+    content_pack: dict[str, Any],
     platform_client: PlatformSearchClient,
     runtime: RuntimeFileStore,
     gemini_video_client: GeminiVideoClient,
@@ -209,15 +367,37 @@ def _execute_author_edges(
     start_decision_index: int,
     created_at: str,
 ) -> dict[str, list[dict[str, Any]]]:
+    budgets = policy["edge_budgets_by_id"]["author_to_works"]
+    decision_by_content_id = _decision_by_content_id(rule_decisions)
+    binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
+    author_items = _unique_authors(discovered_content_items)[: budgets["max_total_actions"]]
+    walk_actions: list[dict[str, Any]] = []
+
+    # 平台不支持作者边(如视频号 blogger blocked):显式 skip 留痕、不调用平台,游走自然退化。
+    if not edge_supported(profile, "author_to_works"):
+        for item in author_items:
+            author_id = item.get("platform_author_id")
+            if not author_id:
+                continue
+            decision = decision_by_content_id.get(item.get("platform_content_id"))
+            walk_actions.append(
+                _author_walk_action(
+                    run_id, policy_run_id, author_id, "skipped", created_at,
+                    reason_code="edge_blocked_by_platform_profile",
+                    budget_tier="blocked",
+                    binding=binding,
+                    decision=decision,
+                    content_pack=content_pack,
+                )
+            )
+        return {**_empty_batch(), "walk_actions": walk_actions}
+
     fetch_author_works = getattr(platform_client, "fetch_author_works", None) or getattr(
         platform_client, "author_works", None
     )
     if not callable(fetch_author_works):
         return _empty_batch()
 
-    decision_by_content_id = _decision_by_content_id(rule_decisions)
-    binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
-    author_items = _unique_authors(discovered_content_items)[:2]
     # 作者近期作品天然可能包含首轮已发现的同一条视频;不去重会撞
     # uk_ca_items_run_policy_content 唯一索引(真实 E2E v1_run_e6ba21f7543b 实证)。
     seen_content_ids = {
@@ -226,38 +406,26 @@ def _execute_author_edges(
         if item.get("platform_content_id")
     }
     platform_results: list[dict[str, Any]] = []
-    walk_actions: list[dict[str, Any]] = []
     author_search_queries: list[dict[str, Any]] = []
     for item in author_items:
         author_id = item.get("platform_author_id")
         if not author_id:
             continue
-        action_id = _walk_action_id(run_id, policy_run_id, "author_to_works", author_id, "works")
         decision = decision_by_content_id.get(item.get("platform_content_id"))
-        if not _can_expand_from_decision(decision, "author_to_works"):
+        permission = _edge_permission_for(decision, "author_to_works", policy)
+        if permission == "deny":
             walk_actions.append(
-                _walk_action(
-                    run_id,
-                    policy_run_id,
-                    action_id,
-                    "author_to_works",
-                    "author",
-                    "Author",
-                    author_id,
-                    "AuthorWorksPage",
-                    author_id,
-                    "fetch_author_works",
-                    "skipped",
-                    created_at,
+                _author_walk_action(
+                    run_id, policy_run_id, author_id, "skipped", created_at,
                     reason_code="blocked_by_rule_decision",
                     budget_tier="blocked",
-                    rule_pack_binding=binding,
-                    rule_pack_execution=_execution_record(decision, content_pack_id=policy_bundle["rule_pack_id"]),
-                    raw_extra=_decision_context(decision),
+                    binding=binding,
+                    decision=decision,
+                    content_pack=content_pack,
                 )
             )
             continue
-        budget_tier = _walk_budget_for_decision(decision)
+        budget_tier = "low_budget" if permission == "allow_low_budget" else "normal"
         try:
             works = fetch_author_works(
                 {
@@ -268,45 +436,23 @@ def _execute_author_edges(
             )
         except Exception as exc:
             walk_actions.append(
-                _walk_action(
-                    run_id,
-                    policy_run_id,
-                    action_id,
-                    "author_to_works",
-                    "author",
-                    "Author",
-                    author_id,
-                    "AuthorWorksPage",
-                    author_id,
-                    "fetch_author_works",
-                    "failed",
-                    created_at,
+                _author_walk_action(
+                    run_id, policy_run_id, author_id, "failed", created_at,
                     reason_code=type(exc).__name__,
                     budget_tier=budget_tier,
-                    rule_pack_binding=binding,
-                    rule_pack_execution=_execution_record(decision, content_pack_id=policy_bundle["rule_pack_id"]),
-                    raw_extra=_decision_context(decision),
+                    binding=binding,
+                    decision=decision,
+                    content_pack=content_pack,
                 )
             )
             continue
         walk_actions.append(
-            _walk_action(
-                run_id,
-                policy_run_id,
-                action_id,
-                "author_to_works",
-                "author",
-                "Author",
-                author_id,
-                "AuthorWorksPage",
-                author_id,
-                "fetch_author_works",
-                "success",
-                created_at,
+            _author_walk_action(
+                run_id, policy_run_id, author_id, "success", created_at,
                 budget_tier=budget_tier,
-                rule_pack_binding=binding,
-                rule_pack_execution=_execution_record(decision, content_pack_id=policy_bundle["rule_pack_id"]),
-                raw_extra=_decision_context(decision),
+                binding=binding,
+                decision=decision,
+                content_pack=content_pack,
             )
         )
         new_works = [
@@ -330,7 +476,7 @@ def _execute_author_edges(
                     raw_extra={"platform_author_id": author_id},
                 )
             )
-        for index, work in enumerate(new_works[:3], start=1):
+        for index, work in enumerate(new_works[: budgets["max_works_per_author"]], start=1):
             seen_content_ids.add(str(work["platform_content_id"]))
             platform_results.append(
                 {
@@ -344,7 +490,7 @@ def _execute_author_edges(
                 }
             )
     if not platform_results:
-        return {**_empty_batch(), "walk_actions": walk_actions}
+        return {**_empty_batch(), "walk_actions": walk_actions, "search_queries": author_search_queries}
 
     discovered = content_discovery.run(run_id, policy_run_id, platform_results, source_context, runtime)
     recalled = pattern_recall.run(
@@ -377,120 +523,156 @@ def _execute_author_edges(
     }
 
 
-def _pagination_queries(
+def _author_walk_action(
     run_id: str,
     policy_run_id: str,
-    search_queries: list[dict[str, Any]],
-    discovered_content_items: list[dict[str, Any]],
-    rule_decisions: list[dict[str, Any]],
+    author_id: str,
+    walk_status: str,
     created_at: str,
-) -> list[dict[str, Any]]:
-    query_by_id = {row["search_query_id"]: row for row in search_queries}
-    query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions)
-    seen: set[str] = set()
-    rows: list[dict[str, Any]] = []
-    for item in discovered_content_items:
-        search_query_id = item.get("search_query_id")
-        cursor = item.get("next_cursor")
-        if not item.get("has_more") or not cursor or search_query_id in seen:
-            continue
-        if not _can_fetch_next_page(search_query_id, query_effect_by_id):
-            continue
-        source = query_by_id.get(search_query_id)
-        if not source:
-            continue
-        seen.add(search_query_id)
-        rows.append(
-            _search_query_row(
-                run_id,
-                policy_run_id,
-                f"{search_query_id}_page_002",
-                source["search_query"],
-                "query_next_page",
-                source.get("discovery_start_source", "pattern_itemset"),
-                "query_next_page",
-                created_at,
-                page_cursor=str(cursor),
-                raw_extra={"parent_search_query_id": search_query_id},
-            )
-        )
-    return rows[:3]
+    *,
+    budget_tier: str,
+    binding: dict[str, Any],
+    decision: dict[str, Any] | None,
+    content_pack: dict[str, Any],
+    reason_code: str | None = None,
+) -> dict[str, Any]:
+    return _walk_action(
+        run_id,
+        policy_run_id,
+        _walk_action_id(run_id, policy_run_id, "author_to_works", author_id, "works"),
+        "author_to_works",
+        "author",
+        "Author",
+        author_id,
+        "AuthorWorksPage",
+        author_id,
+        "fetch_author_works",
+        walk_status,
+        created_at,
+        reason_code=reason_code,
+        budget_tier=budget_tier,
+        rule_pack_binding=binding,
+        rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
+        fallback_rule_pack=content_pack,
+        raw_extra=_decision_context(decision),
+    )
 
 
-def _tag_queries(
-    run_id: str,
-    policy_run_id: str,
-    items: list[dict[str, Any]],
+def _terminal_stage(
+    pattern_seed_pack: dict[str, Any],
+    search_queries: list[dict[str, Any]],
+    discovered_content_items: list[dict[str, Any]],
     decisions: list[dict[str, Any]],
-    created_at: str,
-    *,
     walk_strategy: dict[str, Any],
-    content_pack_id: str,
-) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
-    decision_by_content_id = _decision_by_content_id(decisions)
-    binding, _ = _resolve_edge_binding("hashtag_to_query", walk_strategy)
-    rows: list[dict[str, Any]] = []
-    skipped_actions: list[dict[str, Any]] = []
-    seen: set[str] = set()
-    for item in items:
-        decision = decision_by_content_id.get(item.get("platform_content_id"))
+    created_at: str,
+) -> dict[str, list[dict[str, Any]]]:
+    """终端边(decision_to_asset/budget_downgrade/path_stop)+ 3 类血缘(原 plan_walk 语义)。"""
+    binding_by_edge = _binding_by_edge_id(walk_strategy)
+    decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
+    walk_actions: list[dict[str, Any]] = []
+    source_path_record_basis: list[dict[str, Any]] = []
+
+    for search_query in search_queries:
+        source_path_record_basis.append(
+            {
+                "policy_run_id": search_query["policy_run_id"],
+                "record_schema_version": search_query["record_schema_version"],
+                "from_node_type": "PatternSeed",
+                "from_node_id": pattern_seed_pack["pattern_execution_id"],
+                "to_node_type": "SearchQuery",
+                "to_node_id": search_query["search_query_id"],
+                "source_path_type": "pattern_to_search_query",
+                "rule_pack_id": None,
+                "decision_id": None,
+                "discovery_start_source": search_query["discovery_start_source"],
+                "previous_discovery_step": search_query["previous_discovery_step"],
+                "origin_path_id": f"pattern_to_search_query:{search_query['search_query_id']}",
+                "source_evidence_ref": "source_context.json#ext_data.evidence_pack",
+            }
+        )
+
+    for item in discovered_content_items:
+        # 无判定内容不产终端动作;判定覆盖完整性由 validate_run 把关,这里不掩盖。
+        decision = decision_by_target_id.get(item["platform_content_id"])
         if not decision:
             continue
-        if not _can_expand_from_decision(decision, "hashtag_to_query"):
-            if item.get("tags"):
-                reason_code = (
-                    "review_tag_expansion_disabled"
-                    if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"
-                    else "blocked_by_rule_decision"
-                )
-                skipped_actions.append(
-                    _walk_action(
-                        run_id,
-                        policy_run_id,
-                        _walk_action_id(
-                            run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tags"
-                        ),
-                        "hashtag_to_query",
-                        "tag_query",
-                        "Content",
-                        item["platform_content_id"],
-                        "SearchQuery",
-                        "tag_query_skipped",
-                        "create_tag_query",
-                        "skipped",
-                        created_at,
-                        reason_code=reason_code,
-                        budget_tier="blocked",
-                        rule_pack_binding=binding,
-                        rule_pack_execution=_execution_record(decision, content_pack_id=content_pack_id),
-                        raw_extra=_decision_context(decision),
-                    )
-                )
-            continue
-        for tag in item.get("tags") or []:
-            normalized = str(tag).lstrip("#").strip()
-            if not normalized or normalized in seen or _blocked_tag(normalized):
-                continue
-            seen.add(normalized)
-            rows.append(
-                _search_query_row(
-                    run_id,
-                    policy_run_id,
-                    f"tag_{_short_hash(normalized)}",
-                    normalized,
-                    "tag_query",
-                    item.get("discovery_start_source", "pattern_itemset"),
-                    "hashtag_to_query",
-                    created_at,
-                    raw_extra={
-                        "hashtag": normalized,
-                        "source_content_id": item.get("platform_content_id"),
-                    },
-                )
+        decision_action = walk_terminal._action_for_decision(decision["decision_action"])
+        binding = binding_by_edge.get(decision_action["edge_id"]) or {}
+        execution = {
+            "executed": True,
+            "executed_rule_pack_id": decision["rule_pack_id"],
+            "reason": "content_decision_reused_for_walk_gate",
+        }
+        walk_action_id = walk_terminal._walk_action_id(
+            decision["run_id"],
+            decision["policy_run_id"],
+            decision_action["edge_id"],
+            item["platform_content_id"],
+            decision["decision_id"],
+        )
+        query_sources = item.get("query_sources") or [
+            {
+                "search_query_id": item["search_query_id"],
+                "search_query": item.get("search_query"),
+                "search_query_generation_method": item.get("search_query_generation_method"),
+            }
+        ]
+        for query_source in query_sources:
+            search_query_id = query_source["search_query_id"]
+            source_path_record_basis.append(
+                {
+                    "policy_run_id": decision["policy_run_id"],
+                    "record_schema_version": decision["record_schema_version"],
+                    "from_node_type": "SearchQuery",
+                    "from_node_id": search_query_id,
+                    "to_node_type": "Content",
+                    "to_node_id": item["platform_content_id"],
+                    "source_path_type": "search_query_to_content",
+                    "rule_pack_id": decision["rule_pack_id"],
+                    "decision_id": decision["decision_id"],
+                    "discovery_start_source": item["discovery_start_source"],
+                    "previous_discovery_step": item["previous_discovery_step"],
+                    "origin_path_id": (
+                        f"search_query_to_content:{search_query_id}:"
+                        f"{item['platform_content_id']}"
+                    ),
+                    "source_evidence_ref": decision["decision_input_snapshot_id"],
+                    "walk_action_id": walk_action_id,
+                    "rule_pack_binding": binding,
+                    "rule_pack_execution": execution,
+                }
+            )
+        walk_actions.append(
+            walk_terminal._walk_action_row(
+                decision, item, decision_action, walk_action_id, created_at, binding, execution
             )
-            if len(rows) >= 1:
-                return rows, skipped_actions
-    return rows, skipped_actions
+        )
+        if decision["decision_action"] == "ADD_TO_CONTENT_POOL":
+            source_path_record_basis.append(
+                {
+                    "policy_run_id": decision["policy_run_id"],
+                    "record_schema_version": decision["record_schema_version"],
+                    "from_node_type": "RuleDecision",
+                    "from_node_id": decision["decision_id"],
+                    "to_node_type": "ContentAsset",
+                    "to_node_id": item["platform_content_id"],
+                    "source_path_type": "decision_to_asset",
+                    "rule_pack_id": decision["rule_pack_id"],
+                    "decision_id": decision["decision_id"],
+                    "discovery_start_source": item["discovery_start_source"],
+                    "previous_discovery_step": "asset_commit",
+                    "origin_path_id": (
+                        f"decision_to_asset:{decision['decision_id']}:"
+                        f"{item['platform_content_id']}"
+                    ),
+                    "source_evidence_ref": decision["decision_input_snapshot_id"],
+                    "walk_action_id": walk_action_id,
+                    "rule_pack_binding": binding,
+                    "rule_pack_execution": execution,
+                }
+            )
+
+    return {"walk_actions": walk_actions, "source_path_record_basis": source_path_record_basis}
 
 
 def _query_actions(
@@ -499,7 +681,7 @@ def _query_actions(
     created_at: str,
     *,
     walk_strategy: dict[str, Any],
-    content_pack_id: str,
+    content_pack: dict[str, Any],
 ) -> list[dict[str, Any]]:
     failure_ids = {row["search_query_id"] for row in query_failures}
     actions: list[dict[str, Any]] = []
@@ -536,9 +718,10 @@ def _query_actions(
                 rule_pack_binding=binding,
                 rule_pack_execution={
                     "executed": True,
-                    "executed_rule_pack_id": content_pack_id,
+                    "executed_rule_pack_id": content_pack["rule_pack_id"],
                     "reason": "content_decision_reused_for_walk_gate",
                 },
+                fallback_rule_pack=content_pack,
             )
         )
     return actions
@@ -593,6 +776,7 @@ def _walk_action(
     budget_tier: str | None = None,
     rule_pack_binding: dict[str, Any] | None = None,
     rule_pack_execution: dict[str, Any] | None = None,
+    fallback_rule_pack: dict[str, Any] | None = None,
     raw_extra: dict[str, Any] | None = None,
 ) -> dict[str, Any]:
     row = with_raw_payload(
@@ -613,8 +797,10 @@ def _walk_action(
             "depth": 1,
             "page_cursor": page_cursor,
             "reason_code": reason_code,
-            "rule_pack_id": (rule_pack_binding or {}).get("rule_pack_id"),
-            "rule_pack_version": (rule_pack_binding or {}).get("rule_pack_version"),
+            "rule_pack_id": (rule_pack_binding or {}).get("rule_pack_id")
+            or (fallback_rule_pack or {}).get("rule_pack_id"),
+            "rule_pack_version": (rule_pack_binding or {}).get("rule_pack_version")
+            or (fallback_rule_pack or {}).get("rule_pack_version"),
             "created_at": created_at,
         }
     )
@@ -659,26 +845,13 @@ def _can_fetch_next_page(search_query_id: str, query_effect_by_id: dict[str, str
     return query_effect_by_id.get(search_query_id) == "success"
 
 
-def _walk_budget_for_decision(decision: dict[str, Any] | None) -> str:
-    if not decision or decision.get("search_query_effect_status") == "rule_blocked":
-        return "blocked"
-    action = decision.get("decision_action")
-    if action == "ADD_TO_CONTENT_POOL":
-        return "normal"
-    if action == "KEEP_CONTENT_FOR_REVIEW":
-        return "low_budget"
-    return "blocked"
-
-
-def _can_expand_from_decision(decision: dict[str, Any] | None, edge_id: str) -> bool:
+def _edge_permission_for(
+    decision: dict[str, Any] | None, edge_id: str, policy: dict[str, Any]
+) -> str:
+    """判定→边通行证:无判定 / 查询 rule_blocked 一律 deny,其余查 edge_permissions。"""
     if not decision or decision.get("search_query_effect_status") == "rule_blocked":
-        return False
-    action = decision.get("decision_action")
-    if edge_id == "author_to_works":
-        return action in {"ADD_TO_CONTENT_POOL", "KEEP_CONTENT_FOR_REVIEW"}
-    if edge_id == "hashtag_to_query":
-        return action == "ADD_TO_CONTENT_POOL"
-    return False
+        return "deny"
+    return edge_permission(policy, decision.get("decision_action"), edge_id)
 
 
 def _binding_by_edge_id(walk_strategy: dict[str, Any]) -> dict[str, dict[str, Any]]:
@@ -749,6 +922,7 @@ def _empty_batch() -> dict[str, list[dict[str, Any]]]:
         "content_media_records": [],
         "evidence_bundles": [],
         "rule_decisions": [],
+        "search_queries": [],
         "query_failures": [],
         "walk_actions": [],
     }

+ 6 - 116
content_agent/business_modules/walk_strategy.py

@@ -1,128 +1,18 @@
+"""终端边映射与血缘模板(V3-M4 后被 walk_engine._terminal_stage 消费)。
+
+原 run(plan_walk 节点)的编排已并入 walk_engine 终端阶段;本模块只留
+决策→终端动作映射与终端边 wa_id(decision_id 后缀,与扩展边的语义后缀是两套约定,冻结)。
+"""
+
 from __future__ import annotations
 
 import hashlib
-from datetime import datetime, timezone
 from typing import Any
 
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
-from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 from content_agent.record_payload import with_raw_payload
 
 
-def run(
-    pattern_seed_pack: dict[str, Any],
-    search_queries: list[dict[str, Any]],
-    discovered_content_items: list[dict[str, Any]],
-    decisions: list[dict[str, Any]],
-    walk_strategy: dict[str, Any] | None = None,
-) -> dict[str, list[dict[str, Any]]]:
-    walk_strategy = walk_strategy or WalkStrategyStore().load_walk_strategy()
-    binding_by_edge = {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])}
-    decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
-    walk_actions: list[dict[str, Any]] = []
-    source_path_record_basis: list[dict[str, Any]] = []
-    created_at = datetime.now(timezone.utc).isoformat()
-
-    for search_query in search_queries:
-        source_path_record_basis.append(
-            {
-                "policy_run_id": search_query["policy_run_id"],
-                "record_schema_version": search_query["record_schema_version"],
-                "from_node_type": "PatternSeed",
-                "from_node_id": pattern_seed_pack["pattern_execution_id"],
-                "to_node_type": "SearchQuery",
-                "to_node_id": search_query["search_query_id"],
-                "source_path_type": "pattern_to_search_query",
-                "rule_pack_id": None,
-                "decision_id": None,
-                "discovery_start_source": search_query["discovery_start_source"],
-                "previous_discovery_step": search_query["previous_discovery_step"],
-                "origin_path_id": f"pattern_to_search_query:{search_query['search_query_id']}",
-                "source_evidence_ref": "source_context.json#ext_data.evidence_pack",
-            }
-        )
-
-    for item in discovered_content_items:
-        decision = decision_by_target_id[item["platform_content_id"]]
-        decision_action = _action_for_decision(decision["decision_action"])
-        binding = binding_by_edge.get(decision_action["edge_id"]) or {}
-        execution = {
-            "executed": True,
-            "executed_rule_pack_id": decision["rule_pack_id"],
-            "reason": "content_decision_reused_for_walk_gate",
-        }
-        walk_action_id = _walk_action_id(
-            decision["run_id"],
-            decision["policy_run_id"],
-            decision_action["edge_id"],
-            item["platform_content_id"],
-            decision["decision_id"],
-        )
-        query_sources = item.get("query_sources") or [
-            {
-                "search_query_id": item["search_query_id"],
-                "search_query": item.get("search_query"),
-                "search_query_generation_method": item.get(
-                    "search_query_generation_method"
-                ),
-            }
-        ]
-        for query_source in query_sources:
-            search_query_id = query_source["search_query_id"]
-            source_path_record_basis.append(
-                {
-                    "policy_run_id": decision["policy_run_id"],
-                    "record_schema_version": decision["record_schema_version"],
-                    "from_node_type": "SearchQuery",
-                    "from_node_id": search_query_id,
-                    "to_node_type": "Content",
-                    "to_node_id": item["platform_content_id"],
-                    "source_path_type": "search_query_to_content",
-                    "rule_pack_id": decision["rule_pack_id"],
-                    "decision_id": decision["decision_id"],
-                    "discovery_start_source": item["discovery_start_source"],
-                    "previous_discovery_step": item["previous_discovery_step"],
-                    "origin_path_id": (
-                        f"search_query_to_content:{search_query_id}:"
-                        f"{item['platform_content_id']}"
-                    ),
-                    "source_evidence_ref": decision["decision_input_snapshot_id"],
-                    "walk_action_id": walk_action_id,
-                    "rule_pack_binding": binding,
-                    "rule_pack_execution": execution,
-                }
-            )
-        walk_actions.append(
-            _walk_action_row(decision, item, decision_action, walk_action_id, created_at, binding, execution)
-        )
-        if decision["decision_action"] == "ADD_TO_CONTENT_POOL":
-            source_path_record_basis.append(
-                {
-                    "policy_run_id": decision["policy_run_id"],
-                    "record_schema_version": decision["record_schema_version"],
-                    "from_node_type": "RuleDecision",
-                    "from_node_id": decision["decision_id"],
-                    "to_node_type": "ContentAsset",
-                    "to_node_id": item["platform_content_id"],
-                    "source_path_type": "decision_to_asset",
-                    "rule_pack_id": decision["rule_pack_id"],
-                    "decision_id": decision["decision_id"],
-                    "discovery_start_source": item["discovery_start_source"],
-                    "previous_discovery_step": "asset_commit",
-                    "origin_path_id": (
-                        f"decision_to_asset:{decision['decision_id']}:"
-                        f"{item['platform_content_id']}"
-                    ),
-                    "source_evidence_ref": decision["decision_input_snapshot_id"],
-                    "walk_action_id": walk_action_id,
-                    "rule_pack_binding": binding,
-                    "rule_pack_execution": execution,
-                }
-            )
-
-    return {"walk_actions": walk_actions, "source_path_record_basis": source_path_record_basis}
-
-
 def _action_for_decision(decision_action: str) -> dict[str, str]:
     if decision_action == "ADD_TO_CONTENT_POOL":
         return {

+ 1 - 14
content_agent/graph.py

@@ -20,7 +20,6 @@ from content_agent.business_modules import (
     search_intent,
     walk_engine,
     source_seed,
-    walk_strategy,
 )
 from content_agent.business_modules.content_discovery import pattern_recall
 from content_agent.interfaces import (
@@ -157,16 +156,6 @@ def build_run_graph(deps: RunDependencies):
         )
         return {**result, "current_step": "execute_walk"}
 
-    def plan_walk(state: RunState) -> dict[str, Any]:
-        result = walk_strategy.run(
-            state["pattern_seed_pack"],
-            state["search_queries"],
-            state["discovered_content_items"],
-            state["rule_decisions"],
-        )
-        result["walk_actions"] = [*state.get("walk_actions", []), *result["walk_actions"]]
-        return {**result, "current_step": "plan_walk"}
-
     def record_run(state: RunState) -> dict[str, Any]:
         result = run_record.run(
             state["run_id"],
@@ -209,7 +198,6 @@ def build_run_graph(deps: RunDependencies):
         "load_policy": load_policy,
         "evaluate_rules": evaluate_rules,
         "execute_walk": execute_walk,
-        "plan_walk": plan_walk,
         "record_run": record_run,
         "commit_results": commit_results,
         "review_strategy": review_strategy,
@@ -225,8 +213,7 @@ def build_run_graph(deps: RunDependencies):
     graph.add_edge("recall_pattern", "load_policy")
     graph.add_edge("load_policy", "evaluate_rules")
     graph.add_edge("evaluate_rules", "execute_walk")
-    graph.add_edge("execute_walk", "plan_walk")
-    graph.add_edge("plan_walk", "record_run")
+    graph.add_edge("execute_walk", "record_run")
     graph.add_edge("record_run", "commit_results")
     graph.add_edge("commit_results", "review_strategy")
     graph.add_edge("review_strategy", END)

+ 0 - 70
product_documents/抖音游走策略/douyin_walk_strategy.v1.json

@@ -282,76 +282,6 @@
     }
   ],
   "walk_rule_pack_binding": [
-    {
-      "binding_id": "bind_video_author_author_pack",
-      "edge_id": "video_to_author",
-      "target_entity": "Author",
-      "rule_pack_id": "douyin_author_expand_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "optional",
-      "notes": "Future Author dispatch."
-    },
-    {
-      "binding_id": "bind_author_works_budget_pack",
-      "edge_id": "author_to_works",
-      "target_entity": "Budget",
-      "rule_pack_id": "douyin_budget_observe_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "advisory",
-      "notes": "Budget may limit author works."
-    },
-    {
-      "binding_id": "bind_video_hashtag_hashtag_pack",
-      "edge_id": "video_to_hashtag",
-      "target_entity": "Hashtag",
-      "rule_pack_id": "douyin_tag_expansion_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "optional",
-      "notes": "Future Hashtag dispatch."
-    },
-    {
-      "binding_id": "bind_hashtag_query_budget_pack",
-      "edge_id": "hashtag_to_query",
-      "target_entity": "Budget",
-      "rule_pack_id": "douyin_budget_observe_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "advisory",
-      "notes": "Budget limits tag query."
-    },
-    {
-      "binding_id": "bind_query_next_page_path_pack",
-      "edge_id": "query_next_page",
-      "target_entity": "Path",
-      "rule_pack_id": "douyin_path_stop_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "advisory",
-      "notes": "Path pack may stop pagination."
-    },
-    {
-      "binding_id": "bind_budget_downgrade_budget_pack",
-      "edge_id": "budget_downgrade",
-      "target_entity": "Budget",
-      "rule_pack_id": "douyin_budget_observe_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "advisory",
-      "notes": "Budget pack controls downgrade."
-    },
-    {
-      "binding_id": "bind_path_stop_path_pack",
-      "edge_id": "path_stop",
-      "target_entity": "Path",
-      "rule_pack_id": "douyin_path_stop_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "required": false,
-      "dispatch_policy": "advisory",
-      "notes": "Path pack owns stop edges; execution facts live in rule_pack_execution."
-    },
     {
       "binding_id": "bind_decision_asset_content_pack",
       "edge_id": "decision_to_asset",

+ 0 - 628
product_documents/规则包/douyin_rule_packs.v1.json

@@ -32,62 +32,6 @@
       "priority": 1,
       "fallback_policy": "fail_if_not_matched_or_multi_matched",
       "notes": "V1.0 runtime only dispatches Content."
-    },
-    {
-      "dispatch_id": "dispatch_author_future",
-      "platform": "douyin",
-      "runtime_stage": "future",
-      "strategy_version": "V1",
-      "target_entity": "Author",
-      "content_format": "not_applicable",
-      "rule_pack_id": "douyin_author_expand_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "dispatch_enabled": false,
-      "priority": 100,
-      "fallback_policy": "skip_until_future_stage",
-      "notes": "Future rule pack retained but not dispatched in V1.0."
-    },
-    {
-      "dispatch_id": "dispatch_hashtag_future",
-      "platform": "douyin",
-      "runtime_stage": "future",
-      "strategy_version": "V1",
-      "target_entity": "Hashtag",
-      "content_format": "not_applicable",
-      "rule_pack_id": "douyin_tag_expansion_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "dispatch_enabled": false,
-      "priority": 100,
-      "fallback_policy": "skip_until_future_stage",
-      "notes": "Future tag expansion retained but not dispatched in V1.0."
-    },
-    {
-      "dispatch_id": "dispatch_path_future",
-      "platform": "douyin",
-      "runtime_stage": "future",
-      "strategy_version": "V1",
-      "target_entity": "Path",
-      "content_format": "not_applicable",
-      "rule_pack_id": "douyin_path_stop_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "dispatch_enabled": false,
-      "priority": 100,
-      "fallback_policy": "skip_until_future_stage",
-      "notes": "Future path rule pack retained but not dispatched in V1.0."
-    },
-    {
-      "dispatch_id": "dispatch_budget_future",
-      "platform": "douyin",
-      "runtime_stage": "future",
-      "strategy_version": "V1",
-      "target_entity": "Budget",
-      "content_format": "not_applicable",
-      "rule_pack_id": "douyin_budget_observe_rule_pack_v1",
-      "rule_pack_version": "1.0.0",
-      "dispatch_enabled": false,
-      "priority": 100,
-      "fallback_policy": "skip_until_future_stage",
-      "notes": "Future budget rule pack retained but not dispatched in V1.0."
     }
   ],
   "decision_action_catalog": [
@@ -609,578 +553,6 @@
         "content_score_reject",
         "high_risk_content"
       ]
-    },
-    {
-      "rule_pack_id": "douyin_author_expand_rule_pack_v1",
-      "pack_name": "作者扩展判断规则包",
-      "version": "1.0.0",
-      "enabled": true,
-      "applies_to": {
-        "target_entity": "Author",
-        "input_node": "EvidenceBundle",
-        "decision_node": "RuleDecision",
-        "strategy_link_ids": [
-          "e07",
-          "e08",
-          "e18",
-          "e19"
-        ],
-        "decision_output_keys": [
-          "author_expand_decision",
-          "author_asset_decision",
-          "path_stop_decision",
-          "budget_observe_decision"
-        ]
-      },
-      "input_contract": {
-        "required_fields": [
-          "content.author.platform_author_id",
-          "author_audience_profile",
-          "author_audience_profile.age_50_plus_level",
-          "content_risk_check",
-          "run_context.run_id",
-          "source_evidence"
-        ],
-        "missing_policy": "fail_hard_gate"
-      },
-      "hard_gates": [
-        {
-          "gate_id": "missing_platform_author_id",
-          "label": "无 platform_author_id",
-          "when": {
-            "field": "content.author.platform_author_id",
-            "op": "is_empty"
-          },
-          "decision_action": "DO_NOT_EXPAND_AUTHOR",
-          "decision_reason_code": "missing_platform_author_id",
-          "severity": "fatal",
-          "stop_scoring": true,
-          "priority": 90
-        },
-        {
-          "gate_id": "works_unavailable",
-          "label": "作品不可爬",
-          "when": {
-            "field": "content_risk_check.author_works_available",
-            "op": "eq",
-            "value": false
-          },
-          "decision_action": "DO_NOT_EXPAND_AUTHOR",
-          "decision_reason_code": "works_unavailable",
-          "severity": "fatal",
-          "stop_scoring": true,
-          "priority": 100
-        },
-        {
-          "gate_id": "author_high_risk",
-          "label": "账号风险明显",
-          "when": {
-            "field": "content_risk_check.author_risk_level",
-            "op": "in",
-            "value": [
-              "high",
-              "blocked"
-            ]
-          },
-          "decision_action": "DO_NOT_EXPAND_AUTHOR",
-          "decision_reason_code": "author_high_risk",
-          "severity": "fatal",
-          "stop_scoring": true,
-          "priority": 100
-        },
-        {
-          "gate_id": "author_homepage_drift",
-          "label": "主页明显跑偏",
-          "when": {
-            "field": "pattern_match_result.author_homepage_level",
-            "op": "in",
-            "value": [
-              "drift",
-              "unrelated"
-            ]
-          },
-          "decision_action": "DO_NOT_EXPAND_AUTHOR",
-          "decision_reason_code": "author_homepage_drift",
-          "severity": "fatal",
-          "stop_scoring": true,
-          "priority": 100
-        }
-      ],
-      "scorecard": {
-        "total_score": 100,
-        "dimensions": [
-          {
-            "key": "account_50_plus_portrait",
-            "label": "账号 50+ 画像",
-            "max_score": 35,
-            "evidence_paths": [
-              "author_audience_profile.age_50_plus_level",
-              "author_audience_profile.age_distribution",
-              "author_audience_profile.tgi"
-            ]
-          },
-          {
-            "key": "vertical_stability",
-            "label": "垂类稳定性",
-            "max_score": 20,
-            "evidence_paths": [
-              "pattern_match_result.vertical_stability"
-            ]
-          },
-          {
-            "key": "similar_work_count",
-            "label": "相似作品数量",
-            "max_score": 20,
-            "evidence_paths": [
-              "pattern_match_result.similar_work_count"
-            ]
-          },
-          {
-            "key": "hit_stability",
-            "label": "爆款稳定性",
-            "max_score": 10,
-            "evidence_paths": [
-              "content_engagement_metrics.author_hit_stability"
-            ]
-          },
-          {
-            "key": "update_frequency",
-            "label": "更新频率",
-            "max_score": 10,
-            "evidence_paths": [
-              "content_engagement_metrics.update_frequency"
-            ]
-          },
-          {
-            "key": "crawl_stability",
-            "label": "可爬稳定性",
-            "max_score": 5,
-            "evidence_paths": [
-              "content_risk_check.crawl_stability"
-            ]
-          }
-        ]
-      },
-      "thresholds": [
-        {
-          "min_score": 80,
-          "decision_action": "EXPAND_AUTHOR_WORKS",
-          "decision_reason_code": "author_score_expand"
-        },
-        {
-          "min_score": 60,
-          "max_score": 79,
-          "decision_action": "EXPAND_AUTHOR_WORKS_LOW_BUDGET",
-          "decision_reason_code": "author_score_small_budget"
-        },
-        {
-          "min_score": 45,
-          "max_score": 59,
-          "decision_action": "HOLD_AUTHOR_PENDING",
-          "decision_reason_code": "author_score_pending"
-        },
-        {
-          "max_score": 44,
-          "decision_action": "DO_NOT_EXPAND_AUTHOR",
-          "decision_reason_code": "author_score_no_expand"
-        }
-      ],
-      "author_asset_rule": {
-        "enabled": true,
-        "decision_output_key": "author_asset_decision",
-        "required_conditions": [
-          {
-            "field": "author_audience_profile.works_sample_count",
-            "op": "gte",
-            "value": 9,
-            "decision_reason_code": "author_sample_count_required"
-          },
-          {
-            "field": "pattern_match_result.pattern_recall_work_count",
-            "op": "gte",
-            "value": 3,
-            "decision_reason_code": "author_pattern_work_count_required"
-          },
-          {
-            "field": "pattern_match_result.pattern_recall_work_ratio",
-            "op": "gte",
-            "value": 0.3334,
-            "decision_reason_code": "author_pattern_ratio_required"
-          },
-          {
-            "field": "author_audience_profile.age_50_plus_level",
-            "op": "in",
-            "value": [
-              "strong",
-              "medium"
-            ],
-            "decision_reason_code": "author_50_plus_required"
-          },
-          {
-            "field": "content_risk_check.author_risk_level",
-            "op": "not_in",
-            "value": [
-              "high",
-              "blocked"
-            ],
-            "decision_reason_code": "author_low_risk_required"
-          }
-        ],
-        "actions": [
-          {
-            "when": "all_required_conditions_pass",
-            "decision_action": "STORE_AUTHOR_ASSET"
-          },
-          {
-            "when": "expandable_but_deposit_conditions_not_met",
-            "decision_action": "HOLD_AUTHOR_PENDING"
-          },
-          {
-            "when": "hard_gate_failed",
-            "decision_action": "REJECT_AUTHOR"
-          }
-        ]
-      },
-      "runtime_stage": "future",
-      "dispatch_enabled": false,
-      "future_status": "retained_not_dispatched_in_v1_0"
-    },
-    {
-      "rule_pack_id": "douyin_tag_expansion_rule_pack_v1",
-      "pack_name": "tag 扩散规则包",
-      "version": "1.0.0",
-      "enabled": true,
-      "applies_to": {
-        "target_entity": "Hashtag",
-        "input_node": "EvidenceBundle",
-        "decision_node": "RuleDecision",
-        "strategy_link_ids": [
-          "e12",
-          "e13",
-          "e18",
-          "e19"
-        ],
-        "decision_output_keys": [
-          "tag_search_query_decision",
-          "path_stop_decision",
-          "budget_observe_decision"
-        ]
-      },
-      "budget": {
-        "max_tag_expansion_hops": 10,
-        "hop_definition": "Video -> Hashtag -> Query"
-      },
-      "input_contract": {
-        "required_fields": [
-          "content.tag_text",
-          "pattern_match_result.tag_recall_level",
-          "content_risk_check",
-          "run_context.tag_expansion_hop_count",
-          "source_evidence"
-        ],
-        "missing_policy": "no_generate_search_query"
-      },
-      "hard_gates": [
-        {
-          "gate_id": "empty_tag",
-          "label": "空 tag",
-          "when": {
-            "field": "content.tag_text",
-            "op": "is_empty"
-          },
-          "decision_action": "DO_NOT_GENERATE_TAG_QUERY",
-          "decision_reason_code": "empty_tag",
-          "priority": 100
-        },
-        {
-          "gate_id": "generic_tag",
-          "label": "泛 tag",
-          "when": {
-            "field": "content.tag_text",
-            "op": "in",
-            "value": [
-              "#热门",
-              "#上热门",
-              "#搞笑段子",
-              "#推荐",
-              "#流量"
-            ]
-          },
-          "decision_action": "DO_NOT_GENERATE_TAG_QUERY",
-          "decision_reason_code": "generic_tag",
-          "priority": 100
-        },
-        {
-          "gate_id": "risk_tag",
-          "label": "风险 tag",
-          "when": {
-            "field": "content_risk_check.tag_risk_level",
-            "op": "in",
-            "value": [
-              "high",
-              "blocked"
-            ]
-          },
-          "decision_action": "DO_NOT_GENERATE_TAG_QUERY",
-          "decision_reason_code": "risk_tag",
-          "priority": 100
-        },
-        {
-          "gate_id": "tag_not_recall_pattern",
-          "label": "tag 必须回扣 Pattern / seed",
-          "when": {
-            "field": "pattern_match_result.tag_recall_level",
-            "op": "not_in",
-            "value": [
-              "matched"
-            ]
-          },
-          "decision_action": "DO_NOT_GENERATE_TAG_QUERY",
-          "decision_reason_code": "tag_not_recall_pattern",
-          "priority": 100
-        },
-        {
-          "gate_id": "similar_search_query_seen",
-          "label": "已生成过相同或近似 search_query",
-          "when": {
-            "field": "run_context.similar_search_query_seen",
-            "op": "eq",
-            "value": true
-          },
-          "decision_action": "DO_NOT_GENERATE_TAG_QUERY",
-          "decision_reason_code": "similar_search_query_seen",
-          "priority": 100
-        },
-        {
-          "gate_id": "tag_hop_limit_reached",
-          "label": "tag 扩散跳跃已达 10 次",
-          "when": {
-            "field": "run_context.tag_expansion_hop_count",
-            "op": "gte",
-            "value": 10
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "tag_hop_limit_reached",
-          "priority": 100
-        }
-      ],
-      "scorecard": {
-        "total_score": 100,
-        "dimensions": [
-          {
-            "key": "tag_specificity",
-            "label": "tag 具体度",
-            "max_score": 35,
-            "evidence_paths": [
-              "pattern_match_result.tag_specificity"
-            ]
-          },
-          {
-            "key": "tag_platform_fit",
-            "label": "抖音搜索可用性",
-            "max_score": 25,
-            "evidence_paths": [
-              "pattern_match_result.platform_fit"
-            ]
-          },
-          {
-            "key": "tag_risk_clean",
-            "label": "风险干净度",
-            "max_score": 20,
-            "evidence_paths": [
-              "content_risk_check.tag_risk_level"
-            ]
-          },
-          {
-            "key": "tag_run_context_value",
-            "label": "路径探索价值",
-            "max_score": 20,
-            "evidence_paths": [
-              "run_context.prior_tag_result_quality"
-            ]
-          }
-        ]
-      },
-      "thresholds": [
-        {
-          "min_score": 80,
-          "decision_action": "GENERATE_TAG_SEARCH_QUERY",
-          "decision_reason_code": "tag_strong_generate_search_query"
-        },
-        {
-          "min_score": 60,
-          "max_score": 79,
-          "decision_action": "HOLD_LOW_BUDGET_PENDING",
-          "decision_reason_code": "tag_low_budget_pending"
-        },
-        {
-          "max_score": 59,
-          "decision_action": "DO_NOT_GENERATE_TAG_QUERY",
-          "decision_reason_code": "tag_score_no_generate"
-        }
-      ],
-      "runtime_stage": "future",
-      "dispatch_enabled": false,
-      "future_status": "retained_not_dispatched_in_v1_0"
-    },
-    {
-      "rule_pack_id": "douyin_path_stop_rule_pack_v1",
-      "pack_name": "路径停止规则包",
-      "version": "1.0.0",
-      "enabled": true,
-      "applies_to": {
-        "target_entity": "Path",
-        "input_node": "EvidenceBundle",
-        "decision_node": "RuleDecision",
-        "strategy_link_ids": [
-          "e02",
-          "e04",
-          "e08",
-          "e10",
-          "e12",
-          "e13",
-          "e18",
-          "e19"
-        ],
-        "decision_output_keys": [
-          "path_stop_decision"
-        ]
-      },
-      "hard_gates": [
-        {
-          "gate_id": "continuous_low_quality_pages",
-          "label": "连续 3 页低质",
-          "when": {
-            "field": "run_context.continuous_low_quality_pages",
-            "op": "gte",
-            "value": 3
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "continuous_low_quality_pages",
-          "priority": 100
-        },
-        {
-          "gate_id": "continuous_empty_recall",
-          "label": "连续 3 页空召回",
-          "when": {
-            "field": "run_context.continuous_empty_recall",
-            "op": "gte",
-            "value": 3
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "continuous_empty_recall",
-          "priority": 100
-        },
-        {
-          "gate_id": "continuous_three_pages",
-          "label": "连续三页即停",
-          "when": {
-            "field": "run_context.continuous_pages",
-            "op": "gte",
-            "value": 3
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "continuous_three_pages",
-          "priority": 100
-        },
-        {
-          "gate_id": "author_works_low_relevance",
-          "label": "作者作品连续 3 条低相关",
-          "when": {
-            "field": "run_context.author_works_low_relevance_count",
-            "op": "gte",
-            "value": 3
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "author_works_low_relevance",
-          "priority": 100
-        },
-        {
-          "gate_id": "tag_hop_limit_reached",
-          "label": "tag 扩散达到 10 次",
-          "when": {
-            "field": "run_context.tag_expansion_hop_count",
-            "op": "gte",
-            "value": 10
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "tag_hop_limit_reached",
-          "priority": 100
-        },
-        {
-          "gate_id": "obvious_path_drift",
-          "label": "明显漂移",
-          "when": {
-            "field": "pattern_match_result.path_drift_level",
-            "op": "in",
-            "value": [
-              "high",
-              "blocked"
-            ]
-          },
-          "decision_action": "STOP_DISCOVERY_PATH",
-          "decision_reason_code": "obvious_path_drift",
-          "priority": 100
-        }
-      ],
-      "default_decision_action": "CONTINUE",
-      "runtime_stage": "future",
-      "dispatch_enabled": false,
-      "future_status": "retained_not_dispatched_in_v1_0"
-    },
-    {
-      "rule_pack_id": "douyin_budget_observe_rule_pack_v1",
-      "pack_name": "预算待观察规则包",
-      "version": "1.0.0",
-      "enabled": true,
-      "applies_to": {
-        "target_entity": "Budget",
-        "input_node": "EvidenceBundle",
-        "decision_node": "RuleDecision",
-        "strategy_link_ids": [
-          "e02",
-          "e04",
-          "e08",
-          "e10",
-          "e12",
-          "e13",
-          "e18",
-          "e19"
-        ],
-        "decision_output_keys": [
-          "budget_observe_decision"
-        ]
-      },
-      "hard_gates": [
-        {
-          "gate_id": "high_duplicate_rate",
-          "label": "重复率过高",
-          "when": {
-            "field": "run_context.duplicate_rate",
-            "op": "gte",
-            "value": 0.6
-          },
-          "decision_action": "HOLD_LOW_BUDGET_PENDING",
-          "decision_reason_code": "high_duplicate_rate",
-          "priority": 100
-        },
-        {
-          "gate_id": "near_tag_hop_limit",
-          "label": "接近 tag 扩散上限",
-          "when": {
-            "field": "run_context.tag_expansion_hop_count",
-            "op": "gte",
-            "value": 8
-          },
-          "decision_action": "HOLD_LOW_BUDGET_PENDING",
-          "decision_reason_code": "near_tag_hop_limit",
-          "priority": 100
-        }
-      ],
-      "default_decision_action": "KEEP_NORMAL_BUDGET",
-      "runtime_stage": "future",
-      "dispatch_enabled": false,
-      "future_status": "retained_not_dispatched_in_v1_0"
     }
   ],
   "run_record_policy": {

BIN
tech_documents/规则包映射/规则包映射配置表.xlsx


+ 65 - 0
tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json

@@ -0,0 +1,65 @@
+[
+  [
+    "author_to_works",
+    "<scrubbed>",
+    "<scrubbed>",
+    "fetch_author_works",
+    "success",
+    "normal",
+    ""
+  ],
+  [
+    "budget_downgrade",
+    "d_003",
+    "7406990358799732018",
+    "downgrade_budget",
+    "success",
+    "low_budget",
+    "content_score_review"
+  ],
+  [
+    "budget_downgrade",
+    "d_004",
+    "7577667864522907506",
+    "downgrade_budget",
+    "success",
+    "low_budget",
+    "content_score_review"
+  ],
+  [
+    "decision_to_asset",
+    "d_001",
+    "7590384169986572223",
+    "commit_asset",
+    "success",
+    "normal",
+    "content_score_pool"
+  ],
+  [
+    "decision_to_asset",
+    "d_002",
+    "7595957496808770826",
+    "commit_asset",
+    "success",
+    "normal",
+    "content_score_pool"
+  ],
+  [
+    "hashtag_to_query",
+    "黄帝内经",
+    "tag_838c83a538",
+    "create_tag_query",
+    "success",
+    "normal",
+    ""
+  ],
+  [
+    "query_next_page",
+    "q_001",
+    "q_001_page_002",
+    "fetch_next_page",
+    "success",
+    "normal",
+    ""
+  ]
+]

+ 3 - 2
tests/test_api.py

@@ -235,11 +235,12 @@ def test_api_config_readonly_endpoints():
 
     rule_packs = client.get("/config/rule-packs").json()
     assert rule_packs["source_file"].endswith("douyin_rule_packs.v1.json")
-    assert len(rule_packs["data"]["rule_pack_dispatch"]) == 5
+    # M4 砍包受控变化:4 future dispatch/binding 已删,仅剩 content。
+    assert len(rule_packs["data"]["rule_pack_dispatch"]) == 1
     assert len(rule_packs["data"]["effect_status_mapping"]) == 5
 
     walk = client.get("/config/walk-strategy").json()
-    assert len(walk["data"]["walk_rule_pack_binding"]) == 8
+    assert len(walk["data"]["walk_rule_pack_binding"]) == 1
     assert len(walk["data"]["walk_edge_catalog"]) == 10
 
     prompts = client.get("/config/query-prompts").json()

+ 2 - 1
tests/test_case_replay.py

@@ -143,7 +143,8 @@ def test_replay_id45_walk_obeys_decisions_after_m4(tmp_path):
     assert len(downgrades) == 2
     assert all(row["budget_tier"] == "low_budget" for row in downgrades)
     assert all(row["reason_code"] == "content_score_review" for row in downgrades)
-    assert all(row["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1" for row in downgrades)
+    # M4 砍包受控变化:Budget 包及 binding 已删,KEEP 的戳回退内容包(=executed_rule_pack_id)。
+    assert all(row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1" for row in downgrades)
 
     for row in walk_actions:
         execution = row["raw_payload"]["rule_pack_execution"]

+ 2 - 2
tests/test_config_case_matrix.py

@@ -104,7 +104,7 @@ def test_decoupling_counterproof():
 
 
 def test_senior_block_blocks_all_walk_expansion(tmp_path):
-    # M4 受控变化: 全拦截(rule_blocked)时翻页/作者/tag 全停,path_stop 归属 Path 包但执行包是 Content
+    # M4 受控变化: 全拦截(rule_blocked)时翻页/作者/tag 全停;砍包后 path_stop 戳=内容包
     artifacts = replay_case(
         "real_id45",
         runtime_root=tmp_path / "rt",
@@ -123,7 +123,7 @@ def test_senior_block_blocks_all_walk_expansion(tmp_path):
     path_stops = [row for row in walk_actions if row["edge_id"] == "path_stop"]
     assert len(path_stops) == 4
     for row in path_stops:
-        assert row["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+        assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
         assert row["raw_payload"]["rule_pack_execution"]["executed_rule_pack_id"] == (
             "douyin_content_discovery_rule_pack_v1"
         )

+ 24 - 13
tests/test_policy_dispatch.py

@@ -75,19 +75,31 @@ def test_select_dispatch_still_returns_content_for_default_bundle():
     assert dispatch["target_entity"] == "Content"
 
 
+def _synthetic_author_dispatch(rule_package):
+    # M4 砍包后无 future dispatch 行;合成一条 Author dispatch 验证选择器仍是实体参数化的。
+    author = deepcopy(next(d for d in rule_package["rule_pack_dispatch"] if d["dispatch_id"] == "dispatch_content"))
+    author.update(
+        dispatch_id="dispatch_author_test",
+        target_entity="Author",
+        content_format="not_applicable",
+        rule_pack_id="author_test_rule_pack_v1",
+        dispatch_enabled=True,
+        runtime_stage="V1.0",
+        strategy_version="V1",
+    )
+    return author
+
+
 def test_select_dispatch_can_select_non_content_when_enabled():
-    rule_package = json.loads(RULE_PACK_JSON.read_text(encoding="utf-8"))
-    rule_package = deepcopy(rule_package)
-    author = next(d for d in rule_package["rule_pack_dispatch"] if d["target_entity"] == "Author")
-    author["dispatch_enabled"] = True
-    author["runtime_stage"] = "V1.0"
-    author["strategy_version"] = "V1"
+    rule_package = deepcopy(json.loads(RULE_PACK_JSON.read_text(encoding="utf-8")))
+    author = _synthetic_author_dispatch(rule_package)
+    rule_package["rule_pack_dispatch"].append(author)
 
     dispatch = _select_dispatch(
         rule_package, "V1", target_entity="Author", content_format=author["content_format"]
     )
 
-    assert dispatch["rule_pack_id"] == "douyin_author_expand_rule_pack_v1"
+    assert dispatch["rule_pack_id"] == "author_test_rule_pack_v1"
 
 
 def test_load_policy_bundle_keeps_content_shim():
@@ -111,18 +123,17 @@ def test_enabled_author_dispatch_can_be_found_by_entity_without_replacing_conten
     root = _copy_policy_files(tmp_path)
     path = root / "product_documents/规则包/douyin_rule_packs.v1.json"
     data = json.loads(path.read_text(encoding="utf-8"))
-    author = next(d for d in data["rule_pack_dispatch"] if d["target_entity"] == "Author")
-    author["dispatch_enabled"] = True
-    author["runtime_stage"] = "V1.0"
-    author["strategy_version"] = "V1"
+    data["rule_pack_dispatch"].append(_synthetic_author_dispatch(data))
+    author_pack = deepcopy(data["rule_packs"][0])
+    author_pack["rule_pack_id"] = "author_test_rule_pack_v1"
+    data["rule_packs"].append(author_pack)
     path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
 
     bundle = JsonPolicyBundleStore(root).load_policy_bundle("V1")
 
     assert bundle["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
     assert set(bundle["rule_pack_by_entity"]) == {"Content", "Author"}
-    author_pack = bundle["rule_pack_by_entity"]["Author"]["rule_pack"]
-    assert author_pack["rule_pack_id"] == "douyin_author_expand_rule_pack_v1"
+    assert bundle["rule_pack_by_entity"]["Author"]["rule_pack"]["rule_pack_id"] == "author_test_rule_pack_v1"
 
 
 def test_policy_bundle_fails_when_dispatch_points_to_missing_rule_pack(tmp_path):

+ 43 - 42
tests/test_walk_actions_runtime.py

@@ -24,7 +24,7 @@ def _walk_action_row():
         "page_cursor": "10",
         "next_cursor": "20",
         "decision_id": "d_001",
-        "rule_pack_id": "douyin_path_stop_rule_pack_v1",
+        "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
         "rule_pack_version": "1.0.0",
         "reason_code": "has_more",
         "raw_payload": {
@@ -91,25 +91,30 @@ def _m4_item(content_id):
     }
 
 
-def _run_walk_strategy(action, effect_status):
-    from content_agent.business_modules import walk_strategy
+def _run_terminal_stage(action, effect_status):
+    from content_agent.business_modules.walk_engine import _terminal_stage
+    from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 
-    return walk_strategy.run(
-        pattern_seed_pack={"pattern_execution_id": 581},
-        search_queries=[],
-        discovered_content_items=[_m4_item("content_001")],
-        decisions=[_m4_decision("d_001", "content_001", action, effect_status)],
+    return _terminal_stage(
+        {"pattern_execution_id": 581},
+        [],
+        [_m4_item("content_001")],
+        [_m4_decision("d_001", "content_001", action, effect_status)],
+        WalkStrategyStore().load_walk_strategy(),
+        "2026-06-11T00:00:00+00:00",
     )
 
 
-def test_walk_action_records_edge_binding_rule_pack_id():
+def test_walk_action_missing_binding_falls_back_to_content_pack():
+    # M4 砍包受控变化:扩展边 binding 已删,_walk_action 经 fallback_rule_pack 回退内容包。
     from content_agent.business_modules.walk_engine import _resolve_edge_binding, _walk_action
     from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 
     walk_strategy = WalkStrategyStore().load_walk_strategy()
     binding, missing_reason = _resolve_edge_binding("query_next_page", walk_strategy)
 
-    assert missing_reason is None
+    assert binding == {}
+    assert missing_reason == "edge_binding_missing"
     row = _walk_action(
         "run_001", "policy_run_001", "wa_test", "query_next_page", "pagination",
         "SearchQuery", "q_001", "SearchPage", "page_002", "fetch_next_page", "success",
@@ -117,32 +122,27 @@ def test_walk_action_records_edge_binding_rule_pack_id():
         rule_pack_binding=binding,
         rule_pack_execution={"executed": True, "executed_rule_pack_id": "douyin_content_discovery_rule_pack_v1",
                              "reason": "content_decision_reused_for_walk_gate"},
+        fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
     )
-    assert row["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
     assert row["rule_pack_version"] == "1.0.0"
-    assert row["raw_payload"]["rule_pack_binding"]["edge_id"] == "query_next_page"
+    assert row["raw_payload"]["rule_pack_binding"] == {}
 
 
-def test_future_binding_records_not_executed():
-    from content_agent.business_modules.walk_engine import (
-        _execution_record,
-        _resolve_edge_binding,
-        _walk_action,
-    )
-    from content_agent.integrations.walk_strategy_json import WalkStrategyStore
+def test_missing_decision_records_not_executed():
+    from content_agent.business_modules.walk_engine import _execution_record, _walk_action
 
-    walk_strategy = WalkStrategyStore().load_walk_strategy()
-    binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
     execution = _execution_record(None, content_pack_id="douyin_content_discovery_rule_pack_v1")
 
     row = _walk_action(
         "run_001", "policy_run_001", "wa_test", "author_to_works", "author",
         "Author", "a_001", "AuthorWorksPage", "a_001", "fetch_author_works", "skipped",
         "2026-06-10T00:00:00+00:00",
-        rule_pack_binding=binding,
+        rule_pack_binding={},
         rule_pack_execution=execution,
+        fallback_rule_pack={"rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0"},
     )
-    assert row["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1"
+    assert row["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
     assert row["raw_payload"]["rule_pack_execution"] == {
         "executed": False,
         "executed_rule_pack_id": None,
@@ -150,15 +150,16 @@ def test_future_binding_records_not_executed():
     }
 
 
-def test_content_decision_execution_does_not_overwrite_edge_owner():
-    result = _run_walk_strategy("REJECT_CONTENT", "rule_blocked")
+def test_content_decision_execution_stamp_is_consistent_after_pack_cut():
+    # M4 砍包受控变化:path_stop 的 future 归属包已删,戳回退到执行包(内容包),两者一致。
+    result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked")
 
     action = result["walk_actions"][0]
     assert action["edge_id"] == "path_stop"
-    assert action["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
     execution = action["raw_payload"]["rule_pack_execution"]
     assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
-    assert action["rule_pack_id"] != execution["executed_rule_pack_id"]
+    assert action["raw_payload"]["rule_pack_binding"] == {}
 
 
 def test_unknown_edge_binding_leaves_rule_pack_id_null_with_reason():
@@ -181,42 +182,42 @@ def test_unknown_edge_binding_leaves_rule_pack_id_null_with_reason():
     assert row["reason_code"] == "edge_binding_missing"
 
 
-def test_path_stop_records_path_owner_without_claiming_execution():
-    result = _run_walk_strategy("REJECT_CONTENT", "rule_blocked")
+def test_path_stop_stamps_content_pack_after_cut():
+    # M4 砍包受控变化:Path 包已删,path_stop 戳=decision 的内容包。
+    result = _run_terminal_stage("REJECT_CONTENT", "rule_blocked")
 
     action = result["walk_actions"][0]
     assert action["edge_id"] == "path_stop"
-    assert action["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
-    binding = action["raw_payload"]["rule_pack_binding"]
-    assert binding["target_entity"] == "Path"
-    # 归属包是 Path 包,但实际执行判断的是 Content 包,不得伪装成 Path 包已执行。
-    assert action["raw_payload"]["rule_pack_execution"]["executed_rule_pack_id"] == (
-        "douyin_content_discovery_rule_pack_v1"
-    )
+    assert action["walk_action"] == "stop_path"
+    assert action["walk_status"] == "skipped"
+    assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
 
 
-def test_budget_downgrade_records_budget_owner():
-    result = _run_walk_strategy("KEEP_CONTENT_FOR_REVIEW", "pending")
+def test_budget_downgrade_stamps_content_pack_after_cut():
+    # M4 砍包受控变化:Budget 包已删,budget_downgrade 戳=decision 的内容包。
+    result = _run_terminal_stage("KEEP_CONTENT_FOR_REVIEW", "pending")
 
     action = result["walk_actions"][0]
     assert action["edge_id"] == "budget_downgrade"
-    assert action["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1"
-    assert action["raw_payload"]["rule_pack_binding"]["target_entity"] == "Budget"
+    assert action["budget_tier"] == "low_budget"
+    assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
 
 
 def test_decision_to_asset_keeps_content_execution_and_asset_edge_owner():
-    result = _run_walk_strategy("ADD_TO_CONTENT_POOL", "success")
+    result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success")
 
     action = result["walk_actions"][0]
     assert action["edge_id"] == "decision_to_asset"
     assert action["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
+    # 唯一保留的 binding(content)仍随终端边落 raw_payload。
+    assert action["raw_payload"]["rule_pack_binding"]["target_entity"] == "Content"
     execution = action["raw_payload"]["rule_pack_execution"]
     assert execution["executed"] is True
     assert execution["executed_rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
 
 
 def test_walk_action_and_source_path_share_binding_contract():
-    result = _run_walk_strategy("ADD_TO_CONTENT_POOL", "success")
+    result = _run_terminal_stage("ADD_TO_CONTENT_POOL", "success")
 
     action = result["walk_actions"][0]
     linked_paths = [

+ 6 - 11
tests/test_walk_engine_author.py

@@ -66,7 +66,8 @@ def test_author_edge_allows_add_content_pool(tmp_path):
     ]
     assert author_actions
     assert author_actions[0]["budget_tier"] == "normal"
-    assert author_actions[0]["rule_pack_id"] == "douyin_budget_observe_rule_pack_v1"
+    # M4 砍包受控变化:future 包 binding 已删,戳回退内容包(=executed_rule_pack_id)。
+    assert author_actions[0]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
 
 
 def test_author_edge_keeps_review_low_budget(tmp_path):
@@ -121,7 +122,7 @@ def test_author_works_have_search_query_lineage(tmp_path):
     # 真实 E2E(v1_run_3a3bc9f0d72d)实证:作者作品内容引用合成 query id 但
     # search_queries 无此行,validate_run 报 missing_search_query_ref 等 8 条 fail。
     # 血缘补全后:query 行落盘 → pattern_to_search_query 路径与 search_clue 经既有机制生成。
-    from content_agent.business_modules import run_record, walk_strategy
+    from content_agent.business_modules import run_record
     from content_agent.business_modules.run_record.validation import validate_run
 
     context = build_initial_walk_context(tmp_path)
@@ -147,23 +148,17 @@ def test_author_works_have_search_query_lineage(tmp_path):
     assert author_items
     assert all(i["search_query_id"] == author_query["search_query_id"] for i in author_items)
 
-    # 端到端血缘:plan_walk + record_run 后 validate_run 必须 pass。
-    strategy_result = walk_strategy.run(
-        context["pattern_seed_pack"],
-        result["search_queries"],
-        result["discovered_content_items"],
-        result["rule_decisions"],
-    )
+    # 端到端血缘:M4 后终端边+血缘由 run_bounded_walk 一体产出,record_run 后 validate_run 必须 pass。
     record = run_record.run(
         context["run_id"],
         context["policy_run_id"],
         result["search_queries"],
         result["discovered_content_items"],
         result["rule_decisions"],
-        strategy_result["source_path_record_basis"],
+        result["source_path_record_basis"],
         context["policy_bundle"],
         context["runtime"],
-        walk_actions=[*result["walk_actions"], *strategy_result["walk_actions"]],
+        walk_actions=result["walk_actions"],
     )
     from content_agent.business_modules import learning_review, result_source_lookup
 

+ 10 - 7
tests/test_walk_engine_budget.py

@@ -1,12 +1,13 @@
-from content_agent.business_modules import walk_strategy
+from content_agent.business_modules.walk_engine import _terminal_stage
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
+from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 
 
-def test_walk_strategy_maps_p5_decisions_to_p6_budget_and_stop_actions():
-    result = walk_strategy.run(
-        pattern_seed_pack={"pattern_execution_id": 581},
-        search_queries=[],
-        discovered_content_items=[
+def test_terminal_stage_maps_p5_decisions_to_p6_budget_and_stop_actions():
+    result = _terminal_stage(
+        {"pattern_execution_id": 581},
+        [],
+        [
             {
                 "platform_content_id": "content_success",
                 "search_query_id": "q_001",
@@ -26,11 +27,13 @@ def test_walk_strategy_maps_p5_decisions_to_p6_budget_and_stop_actions():
                 "previous_discovery_step": "search_query_direct",
             },
         ],
-        decisions=[
+        [
             _decision("d_001", "content_success", "ADD_TO_CONTENT_POOL", "success"),
             _decision("d_002", "content_pending", "KEEP_CONTENT_FOR_REVIEW", "pending"),
             _decision("d_003", "content_blocked", "REJECT_CONTENT", "rule_blocked"),
         ],
+        WalkStrategyStore().load_walk_strategy(),
+        "2026-06-11T00:00:00+00:00",
     )
 
     by_target = {row["decision_target_id"]: row for row in result["walk_actions"]}

+ 53 - 0
tests/test_walk_profile_degradation.py

@@ -0,0 +1,53 @@
+"""V3-M4: profile 驱动的平台退化 + walk_actions 指纹基线。
+
+视频号 author_to_works 在 platform_profiles 标 blocked → loop 产显式 skip、不调平台、
+不抛错,游走退化为"搜索→内容→tag 回灌"。real_id45 指纹快照由实跑钉死,
+是 M5 并发改造"结果与串行一致"的对照基线。
+"""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+
+from tests.gemini_helpers import FakeGeminiVideoClient
+from tests.replay_harness import replay_case
+
+_FINGERPRINT_PATH = Path("tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json")
+
+
+def _fingerprint(walk_actions):
+    # wa_id 含随机 run_id 不能跨次钉死;其确定性由 sha1(run:policy:edge:target:suffix)算法保证。
+    return sorted(
+        [row["edge_id"], row["from_node_id"], row["to_node_id"], row["walk_action"], row["walk_status"], row["budget_tier"], row.get("reason_code") or ""]
+        for row in walk_actions
+    )
+
+
+def test_replay_id45_walk_actions_fingerprint_is_stable(tmp_path):
+    # 同输入必须产出同 walk_actions 集合(M5 并发一致性的对照基线;快照由实跑钉死)。
+    artifacts = replay_case("real_id45", runtime_root=tmp_path / "rt")
+    walk_actions = artifacts.files["walk_actions.jsonl"]
+    expected = json.loads(_FINGERPRINT_PATH.read_text(encoding="utf-8"))
+    assert _fingerprint(walk_actions) == expected
+    assert len({row["walk_action_id"] for row in walk_actions}) == len(walk_actions)
+
+
+def test_shipinhao_author_edge_blocked_emits_explicit_skip(tmp_path):
+    artifacts = replay_case(
+        "sph_caihong",
+        runtime_root=tmp_path / "rt",
+        gemini_video_client=FakeGeminiVideoClient(),
+    )
+    assert artifacts.state["status"] == "success"
+    walk_actions = artifacts.files["walk_actions.jsonl"]
+
+    author_actions = [row for row in walk_actions if row["edge_id"] == "author_to_works"]
+    assert author_actions
+    assert all(row["walk_status"] == "skipped" for row in author_actions)
+    assert all(row["reason_code"] == "edge_blocked_by_platform_profile" for row in author_actions)
+    # 退化不是漏抓:无作者作品内容混入。
+    assert not [
+        item for item in artifacts.files["discovered_content_items.jsonl"]
+        if item.get("previous_discovery_step") == "author_works"
+    ]

+ 3 - 16
tests/test_walk_strategy_config.py

@@ -40,15 +40,11 @@ def test_walk_strategy_config_uses_clue_id_and_real_rule_packs():
         "policy_run_id",
         "clue_id",
     ]
-    # M4 受控变化: path_stop/decision_to_asset binding 增补后,Content 包进入归属集合
+    # M4 砍包受控变化:4 future 包及其 7 条 binding 已物理删除,仅剩内容包归属
     assert {
         (row["rule_pack_id"], row["rule_pack_version"])
         for row in strategy["walk_rule_pack_binding"]
     } == {
-        ("douyin_author_expand_rule_pack_v1", "1.0.0"),
-        ("douyin_budget_observe_rule_pack_v1", "1.0.0"),
-        ("douyin_tag_expansion_rule_pack_v1", "1.0.0"),
-        ("douyin_path_stop_rule_pack_v1", "1.0.0"),
         ("douyin_content_discovery_rule_pack_v1", "1.0.0"),
     }
 
@@ -60,15 +56,6 @@ def test_walk_strategy_binding_is_loaded_from_walk_strategy():
     walk_strategy = WalkStrategyStore().load_walk_strategy()
     binding_by_edge = _binding_by_edge_id(walk_strategy)
 
-    assert set(binding_by_edge) == {
-        "video_to_author",
-        "author_to_works",
-        "video_to_hashtag",
-        "hashtag_to_query",
-        "query_next_page",
-        "budget_downgrade",
-        "path_stop",
-        "decision_to_asset",
-    }
-    assert binding_by_edge["path_stop"]["rule_pack_id"] == "douyin_path_stop_rule_pack_v1"
+    # M4 砍包受控变化:仅 decision_to_asset 保留 content binding,其余边走 fallback 戳。
+    assert set(binding_by_edge) == {"decision_to_asset"}
     assert binding_by_edge["decision_to_asset"]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"