"""有界游走(V3-M4):配置驱动的统一 frontier 流程,取代三段硬编码。 每条边走同一套闸:platform_profiles 是否 supported(blocked 显式 skip 不调用平台) → walk_policy.edge_permissions 按判定结果放行(取代 _can_expand_from_decision 硬编码) → walk_policy.edge_budgets 预算(取代 [:2]/[:3]/>=1 散落硬限;预算耗尽静默,对齐 v1 行为)。 终端边(commit/downgrade/stop)与 3 类血缘并入本模块终端阶段(原 plan_walk 节点已删)。 """ from __future__ import annotations import hashlib from datetime import datetime, timezone from typing import Any from content_agent.business_modules import content_discovery, platform_access, rule_judgment from content_agent.business_modules import walk_strategy as walk_terminal from content_agent.business_modules.content_discovery import pattern_recall from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION from content_agent.errors import ContentAgentError from content_agent.integrations.walk_graph_json import ( WalkGraphStore, edge_permission, edge_supported, ) from content_agent.integrations.walk_strategy_json import WalkStrategyStore from content_agent.interfaces import ( GeminiVideoClient, PlatformSearchClient, RuntimeFileStore, ) from content_agent.record_payload import with_raw_payload BLOCKED_TAG_TERMS = { "h" + "ot", "trend" + "ing", "热" + "点", "热" + "榜", "养" + "号", "无" + "限", } def run_bounded_walk( *, run_id: str, policy_run_id: str, pattern_seed_pack: dict[str, Any], source_context: dict[str, Any], search_queries: list[dict[str, Any]], discovered_content_items: list[dict[str, Any]], content_media_records: list[dict[str, Any]], evidence_bundles: list[dict[str, Any]], rule_decisions: list[dict[str, Any]], policy_bundle: dict[str, Any], platform_client: PlatformSearchClient, runtime: RuntimeFileStore, gemini_video_client: GeminiVideoClient, ) -> dict[str, list[dict[str, Any]]]: created_at = datetime.now(timezone.utc).isoformat() walk_strategy = WalkStrategyStore().load_walk_strategy() store = WalkGraphStore() policy = store.load_policy() platform = next( (item["platform"] for item in discovered_content_items if item.get("platform")), "douyin", ) profile = store.load_profile(platform) content_pack = { "rule_pack_id": policy_bundle["rule_pack_id"], "rule_pack_version": policy_bundle["rule_pack_version"], } context = { "search_queries": list(search_queries), "discovered_content_items": list(discovered_content_items), "content_media_records": list(content_media_records), "evidence_bundles": list(evidence_bundles), "rule_decisions": list(rule_decisions), "walk_actions": [], } next_decision_index = len(rule_decisions) + 1 next_recall_index = len(discovered_content_items) + 1 query_rows, query_skipped_actions = _expand_queries( run_id, policy_run_id, context["search_queries"], discovered_content_items, rule_decisions, created_at, policy=policy, profile=profile, walk_strategy=walk_strategy, content_pack=content_pack, ) context["walk_actions"].extend(query_skipped_actions) if query_rows: runtime.append_jsonl(run_id, "search_queries.jsonl", query_rows) context["search_queries"].extend(query_rows) batch = _execute_query_batch( run_id=run_id, policy_run_id=policy_run_id, pattern_seed_pack=pattern_seed_pack, source_context=source_context, search_queries=query_rows, policy_bundle=policy_bundle, platform_client=platform_client, runtime=runtime, gemini_video_client=gemini_video_client, start_recall_index=next_recall_index, start_decision_index=next_decision_index, existing_content_ids={ item["platform_content_id"] for item in context["discovered_content_items"] }, ) _merge_batch(context, batch) next_decision_index += len(batch["rule_decisions"]) next_recall_index += len(batch["discovered_content_items"]) context["walk_actions"].extend( _query_actions( query_rows, batch.get("query_failures", []), created_at, walk_strategy=walk_strategy, content_pack=content_pack, ) ) author_batch = _expand_authors( run_id=run_id, policy_run_id=policy_run_id, source_context=source_context, discovered_content_items=context["discovered_content_items"], rule_decisions=context["rule_decisions"], policy=policy, profile=profile, walk_strategy=walk_strategy, policy_bundle=policy_bundle, content_pack=content_pack, platform_client=platform_client, runtime=runtime, gemini_video_client=gemini_video_client, start_recall_index=next_recall_index, start_decision_index=next_decision_index, created_at=created_at, ) _merge_batch(context, author_batch) context["walk_actions"].extend(author_batch.get("walk_actions", [])) # 作者作品的血缘 query 行:落盘后终端阶段(pattern_to_search_query 路径)与 # recorder(search_clue 聚合)即可经既有机制覆盖作者内容,validate_run 不再断链。 author_queries = author_batch.get("search_queries", []) if author_queries: runtime.append_jsonl(run_id, "search_queries.jsonl", author_queries) context["search_queries"].extend(author_queries) terminal = _terminal_stage( pattern_seed_pack, context["search_queries"], context["discovered_content_items"], context["rule_decisions"], walk_strategy, created_at, ) context["walk_actions"].extend(terminal["walk_actions"]) context["source_path_record_basis"] = terminal["source_path_record_basis"] return context def _execute_query_batch( *, run_id: str, policy_run_id: str, pattern_seed_pack: dict[str, Any], source_context: dict[str, Any], search_queries: list[dict[str, Any]], policy_bundle: dict[str, Any], platform_client: PlatformSearchClient, runtime: RuntimeFileStore, gemini_video_client: GeminiVideoClient, start_recall_index: int, start_decision_index: int, existing_content_ids: set[str], ) -> dict[str, list[dict[str, Any]]]: try: result = platform_access.run(search_queries, platform_client) except ContentAgentError as exc: return { "discovered_content_items": [], "content_media_records": [], "evidence_bundles": [], "rule_decisions": [], "query_failures": exc.detail.get("query_failures", []), } platform_results = _new_platform_results(result["platform_results"], existing_content_ids) if not platform_results: return { "discovered_content_items": [], "content_media_records": [], "evidence_bundles": [], "rule_decisions": [], "query_failures": result.get("query_failures", []), } discovered = content_discovery.run( run_id, policy_run_id, platform_results, source_context, runtime, ) recalled = pattern_recall.run( run_id, policy_run_id, discovered["discovered_content_items"], discovered["content_media_records"], discovered["evidence_bundles"], source_context, runtime, gemini_video_client, start_index=start_recall_index, ) decisions = rule_judgment.run( run_id, policy_run_id, recalled["evidence_bundles"], policy_bundle, runtime, start_index=start_decision_index, ) return { "discovered_content_items": recalled["discovered_content_items"], "content_media_records": discovered["content_media_records"], "evidence_bundles": recalled["evidence_bundles"], "rule_decisions": decisions, "query_failures": result.get("query_failures", []), } def _expand_queries( run_id: str, policy_run_id: str, search_queries: list[dict[str, Any]], discovered_content_items: list[dict[str, Any]], rule_decisions: list[dict[str, Any]], created_at: str, *, policy: dict[str, Any], profile: dict[str, Any], walk_strategy: dict[str, Any], content_pack: dict[str, Any], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: budgets = policy["edge_budgets_by_id"] decision_by_content_id = _decision_by_content_id(rule_decisions) skipped_actions: list[dict[str, Any]] = [] # 边 query_next_page:has_more+cursor、查询 effect=success 才翻页; # 预算耗尽不再静默(R8,2026-06-12 拍板):合格候选排不上队时补显式 skip 留痕。 page_rows: list[dict[str, Any]] = [] if edge_supported(profile, "query_next_page"): page_budget = budgets["query_next_page"]["max_total_actions"] page_binding, _ = _resolve_edge_binding("query_next_page", walk_strategy) query_by_id = {row["search_query_id"]: row for row in search_queries} query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions) seen_queries: set[str] = set() for item in discovered_content_items: search_query_id = item.get("search_query_id") cursor = item.get("next_cursor") if not item.get("has_more") or not cursor or search_query_id in seen_queries: continue if not _can_fetch_next_page(search_query_id, query_effect_by_id): continue source = query_by_id.get(search_query_id) if not source: continue seen_queries.add(search_query_id) if len(page_rows) >= page_budget: skipped_actions.append( _walk_action( run_id, policy_run_id, _walk_action_id( run_id, policy_run_id, "query_next_page", search_query_id, "page_budget" ), "query_next_page", "query", "SearchQuery", search_query_id, "SearchQuery", "next_page_skipped", "fetch_next_page", "skipped", created_at, reason_code="budget_exhausted", budget_tier="blocked", rule_pack_binding=page_binding, rule_pack_execution=_execution_record( decision_by_content_id.get(item.get("platform_content_id")), content_pack_id=content_pack["rule_pack_id"], ), fallback_rule_pack=content_pack, raw_extra={"parent_search_query_id": search_query_id}, ) ) continue page_rows.append( _search_query_row( run_id, policy_run_id, f"{search_query_id}_page_002", source["search_query"], "query_next_page", source.get("discovery_start_source", "pattern_itemset"), "query_next_page", created_at, page_cursor=str(cursor), raw_extra={"parent_search_query_id": search_query_id}, ) ) # 回灌链 video_to_hashtag → hashtag_to_query:keep_only 门由 edge_permissions 查表。 tag_rows: list[dict[str, Any]] = [] if edge_supported(profile, "video_to_hashtag") and edge_supported(profile, "hashtag_to_query"): tag_budget = budgets["hashtag_to_query"]["max_total_actions"] tag_binding, _ = _resolve_edge_binding("hashtag_to_query", walk_strategy) seen_tags: set[str] = set() for item in discovered_content_items: decision = decision_by_content_id.get(item.get("platform_content_id")) if not decision: continue if _edge_permission_for(decision, "video_to_hashtag", policy) == "deny": if item.get("tags"): reason_code = ( "review_tag_expansion_disabled" if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW" else "blocked_by_rule_decision" ) skipped_actions.append( _walk_action( run_id, policy_run_id, _walk_action_id( run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tags" ), "hashtag_to_query", "tag_query", "Content", item["platform_content_id"], "SearchQuery", "tag_query_skipped", "create_tag_query", "skipped", created_at, reason_code=reason_code, budget_tier="blocked", rule_pack_binding=tag_binding, rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]), fallback_rule_pack=content_pack, raw_extra=_decision_context(decision), ) ) continue for tag in item.get("tags") or []: normalized = str(tag).lstrip("#").strip() if not normalized or normalized in seen_tags or _blocked_tag(normalized): continue if len(tag_rows) >= tag_budget: # R8:该内容首个合格标签排不上队,补一条 budget_exhausted skip # (每内容至多一条;不占 seen_tags,不影响已执行集合)。 skipped_actions.append( _walk_action( run_id, policy_run_id, _walk_action_id( run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tag_budget" ), "hashtag_to_query", "tag_query", "Content", item["platform_content_id"], "SearchQuery", "tag_query_skipped", "create_tag_query", "skipped", created_at, reason_code="budget_exhausted", budget_tier="blocked", rule_pack_binding=tag_binding, rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]), fallback_rule_pack=content_pack, raw_extra={**_decision_context(decision), "hashtag": normalized}, ) ) break seen_tags.add(normalized) tag_rows.append( _search_query_row( run_id, policy_run_id, f"tag_{_short_hash(normalized)}", normalized, "tag_query", item.get("discovery_start_source", "pattern_itemset"), "hashtag_to_query", created_at, raw_extra={ "hashtag": normalized, "source_content_id": item.get("platform_content_id"), }, ) ) return [*page_rows, *tag_rows], skipped_actions def _expand_authors( *, run_id: str, policy_run_id: str, source_context: dict[str, Any], discovered_content_items: list[dict[str, Any]], rule_decisions: list[dict[str, Any]], policy: dict[str, Any], profile: dict[str, Any], walk_strategy: dict[str, Any], policy_bundle: dict[str, Any], content_pack: dict[str, Any], platform_client: PlatformSearchClient, runtime: RuntimeFileStore, gemini_video_client: GeminiVideoClient, start_recall_index: int, start_decision_index: int, created_at: str, ) -> dict[str, list[dict[str, Any]]]: budgets = policy["edge_budgets_by_id"]["author_to_works"] decision_by_content_id = _decision_by_content_id(rule_decisions) binding, _ = _resolve_edge_binding("author_to_works", walk_strategy) unique_author_items = _unique_authors(discovered_content_items) author_items = unique_author_items[: budgets["max_total_actions"]] # R8(2026-06-12 拍板):预算外作者不再静默消失,补 budget_exhausted skip 留痕。 overflow_author_items = unique_author_items[budgets["max_total_actions"]:] walk_actions: list[dict[str, Any]] = [] # 平台不支持作者边(如视频号 blogger blocked):显式 skip 留痕、不调用平台,游走自然退化。 if not edge_supported(profile, "author_to_works"): for item in author_items: author_id = item.get("platform_author_id") if not author_id: continue decision = decision_by_content_id.get(item.get("platform_content_id")) walk_actions.append( _author_walk_action( run_id, policy_run_id, author_id, "skipped", created_at, reason_code="edge_blocked_by_platform_profile", budget_tier="blocked", binding=binding, decision=decision, content_pack=content_pack, ) ) return {**_empty_batch(), "walk_actions": walk_actions} fetch_author_works = getattr(platform_client, "fetch_author_works", None) or getattr( platform_client, "author_works", None ) if not callable(fetch_author_works): return _empty_batch() # 作者近期作品天然可能包含首轮已发现的同一条视频;不去重会撞 # uk_ca_items_run_policy_content 唯一索引(真实 E2E v1_run_e6ba21f7543b 实证)。 seen_content_ids = { str(item.get("platform_content_id")) for item in discovered_content_items if item.get("platform_content_id") } platform_results: list[dict[str, Any]] = [] author_search_queries: list[dict[str, Any]] = [] for item in author_items: author_id = item.get("platform_author_id") if not author_id: continue decision = decision_by_content_id.get(item.get("platform_content_id")) permission = _edge_permission_for(decision, "author_to_works", policy) if permission == "deny": walk_actions.append( _author_walk_action( run_id, policy_run_id, author_id, "skipped", created_at, reason_code="blocked_by_rule_decision", budget_tier="blocked", binding=binding, decision=decision, content_pack=content_pack, ) ) continue budget_tier = "low_budget" if permission == "allow_low_budget" else "normal" try: works = fetch_author_works( { "platform_author_id": author_id, "search_query_id": f"author_{_short_hash(author_id)}", "discovery_start_source": item.get("discovery_start_source", "pattern_itemset"), } ) except Exception as exc: walk_actions.append( _author_walk_action( run_id, policy_run_id, author_id, "failed", created_at, reason_code=type(exc).__name__, budget_tier=budget_tier, binding=binding, decision=decision, content_pack=content_pack, ) ) continue walk_actions.append( _author_walk_action( run_id, policy_run_id, author_id, "success", created_at, budget_tier=budget_tier, binding=binding, decision=decision, content_pack=content_pack, ) ) new_works = [ work for work in works if str(work.get("platform_content_id") or "") and str(work.get("platform_content_id")) not in seen_content_ids ] if new_works: # 血缘补全:作者作品内容引用的合成 query id 必须真实存在于 search_queries, # 否则 validate_run 断链(真实 E2E v1_run_3a3bc9f0d72d 实证:missing_search_query_ref 等 8 条 fail)。 author_search_queries.append( _search_query_row( run_id, policy_run_id, f"author_{_short_hash(author_id)}", f"author:{author_id}", "author_works", item.get("discovery_start_source", "pattern_itemset"), "author_to_works", created_at, raw_extra={"platform_author_id": author_id}, ) ) for index, work in enumerate(new_works[: budgets["max_works_per_author"]], start=1): seen_content_ids.add(str(work["platform_content_id"])) platform_results.append( { **work, "search_query_id": work.get("search_query_id") or f"author_{_short_hash(author_id)}", "content_discovery_id": work.get("content_discovery_id") or f"author_{_short_hash(author_id)}_content_{index:03d}", "discovery_start_source": work.get("discovery_start_source") or item.get("discovery_start_source", "pattern_itemset"), "previous_discovery_step": "author_works", } ) for item in overflow_author_items: author_id = item.get("platform_author_id") if not author_id: continue walk_actions.append( _author_walk_action( run_id, policy_run_id, author_id, "skipped", created_at, reason_code="budget_exhausted", budget_tier="blocked", binding=binding, decision=decision_by_content_id.get(item.get("platform_content_id")), content_pack=content_pack, ) ) if not platform_results: return {**_empty_batch(), "walk_actions": walk_actions, "search_queries": author_search_queries} discovered = content_discovery.run(run_id, policy_run_id, platform_results, source_context, runtime) recalled = pattern_recall.run( run_id, policy_run_id, discovered["discovered_content_items"], discovered["content_media_records"], discovered["evidence_bundles"], source_context, runtime, gemini_video_client, start_index=start_recall_index, ) decisions = rule_judgment.run( run_id, policy_run_id, recalled["evidence_bundles"], policy_bundle, runtime, start_index=start_decision_index, ) return { "discovered_content_items": recalled["discovered_content_items"], "content_media_records": discovered["content_media_records"], "evidence_bundles": recalled["evidence_bundles"], "rule_decisions": decisions, "search_queries": author_search_queries, "query_failures": [], "walk_actions": walk_actions, } def _author_walk_action( run_id: str, policy_run_id: str, author_id: str, walk_status: str, created_at: str, *, budget_tier: str, binding: dict[str, Any], decision: dict[str, Any] | None, content_pack: dict[str, Any], reason_code: str | None = None, ) -> dict[str, Any]: return _walk_action( run_id, policy_run_id, _walk_action_id(run_id, policy_run_id, "author_to_works", author_id, "works"), "author_to_works", "author", "Author", author_id, "AuthorWorksPage", author_id, "fetch_author_works", walk_status, created_at, reason_code=reason_code, budget_tier=budget_tier, rule_pack_binding=binding, rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]), fallback_rule_pack=content_pack, raw_extra=_decision_context(decision), ) def _terminal_stage( pattern_seed_pack: dict[str, Any], search_queries: list[dict[str, Any]], discovered_content_items: list[dict[str, Any]], decisions: list[dict[str, Any]], walk_strategy: dict[str, Any], created_at: str, ) -> dict[str, list[dict[str, Any]]]: """终端边(decision_to_asset/budget_downgrade/path_stop)+ 3 类血缘(原 plan_walk 语义)。""" binding_by_edge = _binding_by_edge_id(walk_strategy) decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions} walk_actions: list[dict[str, Any]] = [] source_path_record_basis: list[dict[str, Any]] = [] for search_query in search_queries: source_path_record_basis.append( { "policy_run_id": search_query["policy_run_id"], "record_schema_version": search_query["record_schema_version"], "from_node_type": "PatternSeed", "from_node_id": pattern_seed_pack["pattern_execution_id"], "to_node_type": "SearchQuery", "to_node_id": search_query["search_query_id"], "source_path_type": "pattern_to_search_query", "rule_pack_id": None, "decision_id": None, "discovery_start_source": search_query["discovery_start_source"], "previous_discovery_step": search_query["previous_discovery_step"], "origin_path_id": f"pattern_to_search_query:{search_query['search_query_id']}", "source_evidence_ref": "source_context.json#ext_data.evidence_pack", } ) for item in discovered_content_items: # 无判定内容不产终端动作;判定覆盖完整性由 validate_run 把关,这里不掩盖。 decision = decision_by_target_id.get(item["platform_content_id"]) if not decision: continue decision_action = walk_terminal._action_for_decision(decision["decision_action"]) binding = binding_by_edge.get(decision_action["edge_id"]) or {} execution = { "executed": True, "executed_rule_pack_id": decision["rule_pack_id"], "reason": "content_decision_reused_for_walk_gate", } walk_action_id = walk_terminal._walk_action_id( decision["run_id"], decision["policy_run_id"], decision_action["edge_id"], item["platform_content_id"], decision["decision_id"], ) query_sources = item.get("query_sources") or [ { "search_query_id": item["search_query_id"], "search_query": item.get("search_query"), "search_query_generation_method": item.get("search_query_generation_method"), } ] for query_source in query_sources: search_query_id = query_source["search_query_id"] source_path_record_basis.append( { "policy_run_id": decision["policy_run_id"], "record_schema_version": decision["record_schema_version"], "from_node_type": "SearchQuery", "from_node_id": search_query_id, "to_node_type": "Content", "to_node_id": item["platform_content_id"], "source_path_type": "search_query_to_content", "rule_pack_id": decision["rule_pack_id"], "decision_id": decision["decision_id"], "discovery_start_source": item["discovery_start_source"], "previous_discovery_step": item["previous_discovery_step"], "origin_path_id": ( f"search_query_to_content:{search_query_id}:" f"{item['platform_content_id']}" ), "source_evidence_ref": decision["decision_input_snapshot_id"], "walk_action_id": walk_action_id, "rule_pack_binding": binding, "rule_pack_execution": execution, } ) walk_actions.append( walk_terminal._walk_action_row( decision, item, decision_action, walk_action_id, created_at, binding, execution ) ) if decision["decision_action"] == "ADD_TO_CONTENT_POOL": source_path_record_basis.append( { "policy_run_id": decision["policy_run_id"], "record_schema_version": decision["record_schema_version"], "from_node_type": "RuleDecision", "from_node_id": decision["decision_id"], "to_node_type": "ContentAsset", "to_node_id": item["platform_content_id"], "source_path_type": "decision_to_asset", "rule_pack_id": decision["rule_pack_id"], "decision_id": decision["decision_id"], "discovery_start_source": item["discovery_start_source"], "previous_discovery_step": "asset_commit", "origin_path_id": ( f"decision_to_asset:{decision['decision_id']}:" f"{item['platform_content_id']}" ), "source_evidence_ref": decision["decision_input_snapshot_id"], "walk_action_id": walk_action_id, "rule_pack_binding": binding, "rule_pack_execution": execution, } ) return {"walk_actions": walk_actions, "source_path_record_basis": source_path_record_basis} def _query_actions( query_rows: list[dict[str, Any]], query_failures: list[dict[str, Any]], created_at: str, *, walk_strategy: dict[str, Any], content_pack: dict[str, Any], ) -> list[dict[str, Any]]: failure_ids = {row["search_query_id"] for row in query_failures} actions: list[dict[str, Any]] = [] for row in query_rows: edge_id = ( "hashtag_to_query" if row.get("search_query_generation_method") == "tag_query" else "query_next_page" ) binding, _ = _resolve_edge_binding(edge_id, walk_strategy) actions.append( _walk_action( row["run_id"], row["policy_run_id"], _walk_action_id( row["run_id"], row["policy_run_id"], edge_id, row["search_query_id"], "query", ), edge_id, "query", "SearchQuery", row.get("raw_payload", {}).get("parent_search_query_id") or row.get("raw_payload", {}).get("hashtag") or row["search_query_id"], "SearchQuery", row["search_query_id"], "create_tag_query" if edge_id == "hashtag_to_query" else "fetch_next_page", "failed" if row["search_query_id"] in failure_ids else "success", created_at, page_cursor=row.get("page_cursor"), rule_pack_binding=binding, rule_pack_execution={ "executed": True, "executed_rule_pack_id": content_pack["rule_pack_id"], "reason": "content_decision_reused_for_walk_gate", }, fallback_rule_pack=content_pack, ) ) return actions def _search_query_row( run_id: str, policy_run_id: str, search_query_id: str, search_query: str, method: str, discovery_start_source: str, previous_discovery_step: str, created_at: str, *, page_cursor: str | None = None, raw_extra: dict[str, Any] | None = None, ) -> dict[str, Any]: row = { "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION, "run_id": run_id, "policy_run_id": policy_run_id, "search_query_id": search_query_id, "search_query": search_query, "search_query_generation_method": method, "discovery_start_source": discovery_start_source, "previous_discovery_step": previous_discovery_step, "pattern_seed_ref": {}, "page_cursor": page_cursor, "created_at": created_at, **(raw_extra or {}), } return with_raw_payload(row) def _walk_action( run_id: str, policy_run_id: str, walk_action_id: str, edge_id: str, edge_type: str, from_node_type: str, from_node_id: str, to_node_type: str, to_node_id: str, walk_action: str, walk_status: str, created_at: str, *, page_cursor: str | None = None, reason_code: str | None = None, budget_tier: str | None = None, rule_pack_binding: dict[str, Any] | None = None, rule_pack_execution: dict[str, Any] | None = None, fallback_rule_pack: dict[str, Any] | None = None, raw_extra: dict[str, Any] | None = None, ) -> dict[str, Any]: row = with_raw_payload( { "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION, "run_id": run_id, "policy_run_id": policy_run_id, "walk_action_id": walk_action_id, "edge_id": edge_id, "edge_type": edge_type, "from_node_type": from_node_type, "from_node_id": from_node_id, "to_node_type": to_node_type, "to_node_id": to_node_id, "walk_action": walk_action, "walk_status": walk_status, "budget_tier": budget_tier or ("normal" if walk_status == "success" else "low_budget"), "depth": 1, "page_cursor": page_cursor, "reason_code": reason_code, "rule_pack_id": (rule_pack_binding or {}).get("rule_pack_id") or (fallback_rule_pack or {}).get("rule_pack_id"), "rule_pack_version": (rule_pack_binding or {}).get("rule_pack_version") or (fallback_rule_pack or {}).get("rule_pack_version"), "created_at": created_at, } ) row["raw_payload"]["rule_pack_binding"] = rule_pack_binding or {} row["raw_payload"]["rule_pack_execution"] = rule_pack_execution or {} if raw_extra: row["raw_payload"].update(raw_extra) return row def _decision_by_content_id(rule_decisions: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: return {row["decision_target_id"]: row for row in rule_decisions} def _query_effect_by_search_query_id( discovered_content_items: list[dict[str, Any]], rule_decisions: list[dict[str, Any]], ) -> dict[str, str]: decision_by_content_id = _decision_by_content_id(rule_decisions) statuses_by_query: dict[str, set[str]] = {} for item in discovered_content_items: decision = decision_by_content_id.get(item.get("platform_content_id")) if not decision: continue query_sources = item.get("query_sources") or [{"search_query_id": item.get("search_query_id")}] for query_source in query_sources: search_query_id = query_source.get("search_query_id") if search_query_id: statuses_by_query.setdefault(search_query_id, set()).add( decision.get("search_query_effect_status") ) effects: dict[str, str] = {} for search_query_id, statuses in statuses_by_query.items(): for status in ("success", "pending", "rule_blocked", "failed"): if status in statuses: effects[search_query_id] = status break return effects def _can_fetch_next_page(search_query_id: str, query_effect_by_id: dict[str, str]) -> bool: return query_effect_by_id.get(search_query_id) == "success" def _edge_permission_for( decision: dict[str, Any] | None, edge_id: str, policy: dict[str, Any] ) -> str: """判定→边通行证:无判定 / 查询 rule_blocked 一律 deny,其余查 edge_permissions。""" if not decision or decision.get("search_query_effect_status") == "rule_blocked": return "deny" return edge_permission(policy, decision.get("decision_action"), edge_id) def _binding_by_edge_id(walk_strategy: dict[str, Any]) -> dict[str, dict[str, Any]]: return {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])} def _resolve_edge_binding( edge_id: str, walk_strategy: dict[str, Any] ) -> tuple[dict[str, Any], str | None]: binding = _binding_by_edge_id(walk_strategy).get(edge_id) if not binding: return {}, "edge_binding_missing" return binding, None def _execution_record(decision: dict[str, Any] | None, *, content_pack_id: str) -> dict[str, Any]: if decision: return { "executed": True, "executed_rule_pack_id": decision.get("rule_pack_id") or content_pack_id, "reason": "content_decision_reused_for_walk_gate", } return { "executed": False, "executed_rule_pack_id": None, "reason": "future_pack_not_enabled", } def _decision_context(decision: dict[str, Any] | None) -> dict[str, Any]: if not decision: return {"decision_action": None, "search_query_effect_status": None} return { "decision_action": decision.get("decision_action"), "search_query_effect_status": decision.get("search_query_effect_status"), } def _merge_batch(context: dict[str, list[dict[str, Any]]], batch: dict[str, list[dict[str, Any]]]) -> None: for key in [ "discovered_content_items", "content_media_records", "evidence_bundles", "rule_decisions", ]: context[key].extend(batch.get(key, [])) def _new_platform_results( platform_results: list[dict[str, Any]], existing_content_ids: set[str], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] seen = set(existing_content_ids) for row in platform_results: content_id = row.get("platform_content_id") if content_id and content_id in seen: continue if content_id: seen.add(content_id) rows.append(row) return rows def _empty_batch() -> dict[str, list[dict[str, Any]]]: return { "discovered_content_items": [], "content_media_records": [], "evidence_bundles": [], "rule_decisions": [], "search_queries": [], "query_failures": [], "walk_actions": [], } def _unique_authors(items: list[dict[str, Any]]) -> list[dict[str, Any]]: seen: set[str] = set() unique: list[dict[str, Any]] = [] for item in items: author_id = item.get("platform_author_id") if not author_id or author_id in seen: continue seen.add(author_id) unique.append(item) return unique def _blocked_tag(tag: str) -> bool: lowered = tag.lower() return any(term in lowered for term in BLOCKED_TAG_TERMS) def _walk_action_id( run_id: str, policy_run_id: str, edge_id: str, target_id: str, suffix: str, ) -> str: raw = f"{run_id}:{policy_run_id}:{edge_id}:{target_id}:{suffix}" return f"wa_{hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]}" def _short_hash(value: str) -> str: return hashlib.sha1(value.encode("utf-8")).hexdigest()[:10]