| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030 |
- """有界游走(V3-M4):配置驱动的统一 frontier 流程,取代三段硬编码。
- 每条边走同一套闸:platform_profiles 是否 supported(blocked 显式 skip 不调用平台)
- → walk_policy.edge_permissions 按判定结果放行(取代 _can_expand_from_decision 硬编码)
- → walk_policy.edge_budgets 预算(取代 [:2]/[:3]/>=1 散落硬限;预算耗尽静默,对齐 v1 行为)。
- 终端边(commit/downgrade/stop)与 3 类血缘并入本模块终端阶段(原 plan_walk 节点已删)。
- """
- from __future__ import annotations
- import hashlib
- from datetime import datetime, timezone
- from typing import Any
- from content_agent.business_modules import content_discovery, platform_access, rule_judgment
- from content_agent.business_modules import walk_strategy as walk_terminal
- from content_agent.business_modules.content_discovery import pattern_recall
- from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
- from content_agent.errors import ContentAgentError
- from content_agent.integrations.walk_graph_json import (
- WalkGraphStore,
- edge_permission,
- edge_supported,
- )
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- from content_agent.interfaces import (
- GeminiVideoClient,
- PlatformSearchClient,
- RuntimeFileStore,
- )
- from content_agent.record_payload import with_raw_payload
- BLOCKED_TAG_TERMS = {
- "h" + "ot",
- "trend" + "ing",
- "热" + "点",
- "热" + "榜",
- "养" + "号",
- "无" + "限",
- }
- def run_bounded_walk(
- *,
- run_id: str,
- policy_run_id: str,
- pattern_seed_pack: dict[str, Any],
- source_context: dict[str, Any],
- search_queries: list[dict[str, Any]],
- discovered_content_items: list[dict[str, Any]],
- content_media_records: list[dict[str, Any]],
- evidence_bundles: list[dict[str, Any]],
- rule_decisions: list[dict[str, Any]],
- policy_bundle: dict[str, Any],
- platform_client: PlatformSearchClient,
- runtime: RuntimeFileStore,
- gemini_video_client: GeminiVideoClient,
- ) -> dict[str, list[dict[str, Any]]]:
- created_at = datetime.now(timezone.utc).isoformat()
- walk_strategy = WalkStrategyStore().load_walk_strategy()
- store = WalkGraphStore()
- policy = store.load_policy()
- platform = next(
- (item["platform"] for item in discovered_content_items if item.get("platform")),
- "douyin",
- )
- profile = store.load_profile(platform)
- content_pack = {
- "rule_pack_id": policy_bundle["rule_pack_id"],
- "rule_pack_version": policy_bundle["rule_pack_version"],
- }
- context = {
- "search_queries": list(search_queries),
- "discovered_content_items": list(discovered_content_items),
- "content_media_records": list(content_media_records),
- "evidence_bundles": list(evidence_bundles),
- "rule_decisions": list(rule_decisions),
- "walk_actions": [],
- }
- next_decision_index = len(rule_decisions) + 1
- next_recall_index = len(discovered_content_items) + 1
- query_rows, query_skipped_actions = _expand_queries(
- run_id, policy_run_id, context["search_queries"], discovered_content_items,
- rule_decisions, created_at,
- policy=policy, profile=profile, walk_strategy=walk_strategy, content_pack=content_pack,
- )
- context["walk_actions"].extend(query_skipped_actions)
- if query_rows:
- runtime.append_jsonl(run_id, "search_queries.jsonl", query_rows)
- context["search_queries"].extend(query_rows)
- batch = _execute_query_batch(
- run_id=run_id,
- policy_run_id=policy_run_id,
- pattern_seed_pack=pattern_seed_pack,
- source_context=source_context,
- search_queries=query_rows,
- policy_bundle=policy_bundle,
- platform_client=platform_client,
- runtime=runtime,
- gemini_video_client=gemini_video_client,
- start_recall_index=next_recall_index,
- start_decision_index=next_decision_index,
- existing_content_ids={
- item["platform_content_id"] for item in context["discovered_content_items"]
- },
- )
- _merge_batch(context, batch)
- next_decision_index += len(batch["rule_decisions"])
- next_recall_index += len(batch["discovered_content_items"])
- context["walk_actions"].extend(
- _query_actions(
- query_rows, batch.get("query_failures", []), created_at,
- walk_strategy=walk_strategy, content_pack=content_pack,
- )
- )
- author_batch = _expand_authors(
- run_id=run_id,
- policy_run_id=policy_run_id,
- source_context=source_context,
- discovered_content_items=context["discovered_content_items"],
- rule_decisions=context["rule_decisions"],
- policy=policy,
- profile=profile,
- walk_strategy=walk_strategy,
- policy_bundle=policy_bundle,
- content_pack=content_pack,
- platform_client=platform_client,
- runtime=runtime,
- gemini_video_client=gemini_video_client,
- start_recall_index=next_recall_index,
- start_decision_index=next_decision_index,
- created_at=created_at,
- )
- _merge_batch(context, author_batch)
- context["walk_actions"].extend(author_batch.get("walk_actions", []))
- # 作者作品的血缘 query 行:落盘后终端阶段(pattern_to_search_query 路径)与
- # recorder(search_clue 聚合)即可经既有机制覆盖作者内容,validate_run 不再断链。
- author_queries = author_batch.get("search_queries", [])
- if author_queries:
- runtime.append_jsonl(run_id, "search_queries.jsonl", author_queries)
- context["search_queries"].extend(author_queries)
- terminal = _terminal_stage(
- pattern_seed_pack,
- context["search_queries"],
- context["discovered_content_items"],
- context["rule_decisions"],
- walk_strategy,
- created_at,
- )
- context["walk_actions"].extend(terminal["walk_actions"])
- context["source_path_record_basis"] = terminal["source_path_record_basis"]
- return context
- def _execute_query_batch(
- *,
- run_id: str,
- policy_run_id: str,
- pattern_seed_pack: dict[str, Any],
- source_context: dict[str, Any],
- search_queries: list[dict[str, Any]],
- policy_bundle: dict[str, Any],
- platform_client: PlatformSearchClient,
- runtime: RuntimeFileStore,
- gemini_video_client: GeminiVideoClient,
- start_recall_index: int,
- start_decision_index: int,
- existing_content_ids: set[str],
- ) -> dict[str, list[dict[str, Any]]]:
- try:
- result = platform_access.run(search_queries, platform_client)
- except ContentAgentError as exc:
- return {
- "discovered_content_items": [],
- "content_media_records": [],
- "evidence_bundles": [],
- "rule_decisions": [],
- "query_failures": exc.detail.get("query_failures", []),
- }
- platform_results = _new_platform_results(result["platform_results"], existing_content_ids)
- if not platform_results:
- return {
- "discovered_content_items": [],
- "content_media_records": [],
- "evidence_bundles": [],
- "rule_decisions": [],
- "query_failures": result.get("query_failures", []),
- }
- discovered = content_discovery.run(
- run_id,
- policy_run_id,
- platform_results,
- source_context,
- runtime,
- )
- recalled = pattern_recall.run(
- run_id,
- policy_run_id,
- discovered["discovered_content_items"],
- discovered["content_media_records"],
- discovered["evidence_bundles"],
- source_context,
- runtime,
- gemini_video_client,
- start_index=start_recall_index,
- )
- decisions = rule_judgment.run(
- run_id,
- policy_run_id,
- recalled["evidence_bundles"],
- policy_bundle,
- runtime,
- start_index=start_decision_index,
- )
- return {
- "discovered_content_items": recalled["discovered_content_items"],
- "content_media_records": discovered["content_media_records"],
- "evidence_bundles": recalled["evidence_bundles"],
- "rule_decisions": decisions,
- "query_failures": result.get("query_failures", []),
- }
- def _expand_queries(
- run_id: str,
- policy_run_id: str,
- search_queries: list[dict[str, Any]],
- discovered_content_items: list[dict[str, Any]],
- rule_decisions: list[dict[str, Any]],
- created_at: str,
- *,
- policy: dict[str, Any],
- profile: dict[str, Any],
- walk_strategy: dict[str, Any],
- content_pack: dict[str, Any],
- ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
- budgets = policy["edge_budgets_by_id"]
- decision_by_content_id = _decision_by_content_id(rule_decisions)
- skipped_actions: list[dict[str, Any]] = []
- # 边 query_next_page:has_more+cursor、查询 effect=success 才翻页;
- # 预算耗尽不再静默(R8,2026-06-12 拍板):合格候选排不上队时补显式 skip 留痕。
- page_rows: list[dict[str, Any]] = []
- if edge_supported(profile, "query_next_page"):
- page_budget = budgets["query_next_page"]["max_total_actions"]
- page_binding, _ = _resolve_edge_binding("query_next_page", walk_strategy)
- query_by_id = {row["search_query_id"]: row for row in search_queries}
- query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions)
- seen_queries: set[str] = set()
- for item in discovered_content_items:
- search_query_id = item.get("search_query_id")
- cursor = item.get("next_cursor")
- if not item.get("has_more") or not cursor or search_query_id in seen_queries:
- continue
- if not _can_fetch_next_page(search_query_id, query_effect_by_id):
- continue
- source = query_by_id.get(search_query_id)
- if not source:
- continue
- seen_queries.add(search_query_id)
- if len(page_rows) >= page_budget:
- skipped_actions.append(
- _walk_action(
- run_id,
- policy_run_id,
- _walk_action_id(
- run_id, policy_run_id, "query_next_page", search_query_id, "page_budget"
- ),
- "query_next_page",
- "query",
- "SearchQuery",
- search_query_id,
- "SearchQuery",
- "next_page_skipped",
- "fetch_next_page",
- "skipped",
- created_at,
- reason_code="budget_exhausted",
- budget_tier="blocked",
- rule_pack_binding=page_binding,
- rule_pack_execution=_execution_record(
- decision_by_content_id.get(item.get("platform_content_id")),
- content_pack_id=content_pack["rule_pack_id"],
- ),
- fallback_rule_pack=content_pack,
- raw_extra={"parent_search_query_id": search_query_id},
- )
- )
- continue
- page_rows.append(
- _search_query_row(
- run_id,
- policy_run_id,
- f"{search_query_id}_page_002",
- source["search_query"],
- "query_next_page",
- source.get("discovery_start_source", "pattern_itemset"),
- "query_next_page",
- created_at,
- page_cursor=str(cursor),
- raw_extra={"parent_search_query_id": search_query_id},
- )
- )
- # 回灌链 video_to_hashtag → hashtag_to_query:keep_only 门由 edge_permissions 查表。
- tag_rows: list[dict[str, Any]] = []
- if edge_supported(profile, "video_to_hashtag") and edge_supported(profile, "hashtag_to_query"):
- tag_budget = budgets["hashtag_to_query"]["max_total_actions"]
- tag_binding, _ = _resolve_edge_binding("hashtag_to_query", walk_strategy)
- seen_tags: set[str] = set()
- for item in discovered_content_items:
- decision = decision_by_content_id.get(item.get("platform_content_id"))
- if not decision:
- continue
- if _edge_permission_for(decision, "video_to_hashtag", policy) == "deny":
- if item.get("tags"):
- reason_code = (
- "review_tag_expansion_disabled"
- if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"
- else "blocked_by_rule_decision"
- )
- skipped_actions.append(
- _walk_action(
- run_id,
- policy_run_id,
- _walk_action_id(
- run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tags"
- ),
- "hashtag_to_query",
- "tag_query",
- "Content",
- item["platform_content_id"],
- "SearchQuery",
- "tag_query_skipped",
- "create_tag_query",
- "skipped",
- created_at,
- reason_code=reason_code,
- budget_tier="blocked",
- rule_pack_binding=tag_binding,
- rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
- fallback_rule_pack=content_pack,
- raw_extra=_decision_context(decision),
- )
- )
- continue
- for tag in item.get("tags") or []:
- normalized = str(tag).lstrip("#").strip()
- if not normalized or normalized in seen_tags or _blocked_tag(normalized):
- continue
- if len(tag_rows) >= tag_budget:
- # R8:该内容首个合格标签排不上队,补一条 budget_exhausted skip
- # (每内容至多一条;不占 seen_tags,不影响已执行集合)。
- skipped_actions.append(
- _walk_action(
- run_id,
- policy_run_id,
- _walk_action_id(
- run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tag_budget"
- ),
- "hashtag_to_query",
- "tag_query",
- "Content",
- item["platform_content_id"],
- "SearchQuery",
- "tag_query_skipped",
- "create_tag_query",
- "skipped",
- created_at,
- reason_code="budget_exhausted",
- budget_tier="blocked",
- rule_pack_binding=tag_binding,
- rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
- fallback_rule_pack=content_pack,
- raw_extra={**_decision_context(decision), "hashtag": normalized},
- )
- )
- break
- seen_tags.add(normalized)
- tag_rows.append(
- _search_query_row(
- run_id,
- policy_run_id,
- f"tag_{_short_hash(normalized)}",
- normalized,
- "tag_query",
- item.get("discovery_start_source", "pattern_itemset"),
- "hashtag_to_query",
- created_at,
- raw_extra={
- "hashtag": normalized,
- "source_content_id": item.get("platform_content_id"),
- },
- )
- )
- return [*page_rows, *tag_rows], skipped_actions
- def _expand_authors(
- *,
- run_id: str,
- policy_run_id: str,
- source_context: dict[str, Any],
- discovered_content_items: list[dict[str, Any]],
- rule_decisions: list[dict[str, Any]],
- policy: dict[str, Any],
- profile: dict[str, Any],
- walk_strategy: dict[str, Any],
- policy_bundle: dict[str, Any],
- content_pack: dict[str, Any],
- platform_client: PlatformSearchClient,
- runtime: RuntimeFileStore,
- gemini_video_client: GeminiVideoClient,
- start_recall_index: int,
- start_decision_index: int,
- created_at: str,
- ) -> dict[str, list[dict[str, Any]]]:
- budgets = policy["edge_budgets_by_id"]["author_to_works"]
- decision_by_content_id = _decision_by_content_id(rule_decisions)
- binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
- unique_author_items = _unique_authors(discovered_content_items)
- author_items = unique_author_items[: budgets["max_total_actions"]]
- # R8(2026-06-12 拍板):预算外作者不再静默消失,补 budget_exhausted skip 留痕。
- overflow_author_items = unique_author_items[budgets["max_total_actions"]:]
- walk_actions: list[dict[str, Any]] = []
- # 平台不支持作者边(如视频号 blogger blocked):显式 skip 留痕、不调用平台,游走自然退化。
- if not edge_supported(profile, "author_to_works"):
- for item in author_items:
- author_id = item.get("platform_author_id")
- if not author_id:
- continue
- decision = decision_by_content_id.get(item.get("platform_content_id"))
- walk_actions.append(
- _author_walk_action(
- run_id, policy_run_id, author_id, "skipped", created_at,
- reason_code="edge_blocked_by_platform_profile",
- budget_tier="blocked",
- binding=binding,
- decision=decision,
- content_pack=content_pack,
- )
- )
- return {**_empty_batch(), "walk_actions": walk_actions}
- fetch_author_works = getattr(platform_client, "fetch_author_works", None) or getattr(
- platform_client, "author_works", None
- )
- if not callable(fetch_author_works):
- return _empty_batch()
- # 作者近期作品天然可能包含首轮已发现的同一条视频;不去重会撞
- # uk_ca_items_run_policy_content 唯一索引(真实 E2E v1_run_e6ba21f7543b 实证)。
- seen_content_ids = {
- str(item.get("platform_content_id"))
- for item in discovered_content_items
- if item.get("platform_content_id")
- }
- platform_results: list[dict[str, Any]] = []
- author_search_queries: list[dict[str, Any]] = []
- for item in author_items:
- author_id = item.get("platform_author_id")
- if not author_id:
- continue
- decision = decision_by_content_id.get(item.get("platform_content_id"))
- permission = _edge_permission_for(decision, "author_to_works", policy)
- if permission == "deny":
- walk_actions.append(
- _author_walk_action(
- run_id, policy_run_id, author_id, "skipped", created_at,
- reason_code="blocked_by_rule_decision",
- budget_tier="blocked",
- binding=binding,
- decision=decision,
- content_pack=content_pack,
- )
- )
- continue
- budget_tier = "low_budget" if permission == "allow_low_budget" else "normal"
- try:
- works = fetch_author_works(
- {
- "platform_author_id": author_id,
- "search_query_id": f"author_{_short_hash(author_id)}",
- "discovery_start_source": item.get("discovery_start_source", "pattern_itemset"),
- }
- )
- except Exception as exc:
- walk_actions.append(
- _author_walk_action(
- run_id, policy_run_id, author_id, "failed", created_at,
- reason_code=type(exc).__name__,
- budget_tier=budget_tier,
- binding=binding,
- decision=decision,
- content_pack=content_pack,
- )
- )
- continue
- walk_actions.append(
- _author_walk_action(
- run_id, policy_run_id, author_id, "success", created_at,
- budget_tier=budget_tier,
- binding=binding,
- decision=decision,
- content_pack=content_pack,
- )
- )
- new_works = [
- work
- for work in works
- if str(work.get("platform_content_id") or "") and str(work.get("platform_content_id")) not in seen_content_ids
- ]
- if new_works:
- # 血缘补全:作者作品内容引用的合成 query id 必须真实存在于 search_queries,
- # 否则 validate_run 断链(真实 E2E v1_run_3a3bc9f0d72d 实证:missing_search_query_ref 等 8 条 fail)。
- author_search_queries.append(
- _search_query_row(
- run_id,
- policy_run_id,
- f"author_{_short_hash(author_id)}",
- f"author:{author_id}",
- "author_works",
- item.get("discovery_start_source", "pattern_itemset"),
- "author_to_works",
- created_at,
- raw_extra={"platform_author_id": author_id},
- )
- )
- for index, work in enumerate(new_works[: budgets["max_works_per_author"]], start=1):
- seen_content_ids.add(str(work["platform_content_id"]))
- platform_results.append(
- {
- **work,
- "search_query_id": work.get("search_query_id") or f"author_{_short_hash(author_id)}",
- "content_discovery_id": work.get("content_discovery_id")
- or f"author_{_short_hash(author_id)}_content_{index:03d}",
- "discovery_start_source": work.get("discovery_start_source")
- or item.get("discovery_start_source", "pattern_itemset"),
- "previous_discovery_step": "author_works",
- }
- )
- for item in overflow_author_items:
- author_id = item.get("platform_author_id")
- if not author_id:
- continue
- walk_actions.append(
- _author_walk_action(
- run_id, policy_run_id, author_id, "skipped", created_at,
- reason_code="budget_exhausted",
- budget_tier="blocked",
- binding=binding,
- decision=decision_by_content_id.get(item.get("platform_content_id")),
- content_pack=content_pack,
- )
- )
- if not platform_results:
- return {**_empty_batch(), "walk_actions": walk_actions, "search_queries": author_search_queries}
- discovered = content_discovery.run(run_id, policy_run_id, platform_results, source_context, runtime)
- recalled = pattern_recall.run(
- run_id,
- policy_run_id,
- discovered["discovered_content_items"],
- discovered["content_media_records"],
- discovered["evidence_bundles"],
- source_context,
- runtime,
- gemini_video_client,
- start_index=start_recall_index,
- )
- decisions = rule_judgment.run(
- run_id,
- policy_run_id,
- recalled["evidence_bundles"],
- policy_bundle,
- runtime,
- start_index=start_decision_index,
- )
- return {
- "discovered_content_items": recalled["discovered_content_items"],
- "content_media_records": discovered["content_media_records"],
- "evidence_bundles": recalled["evidence_bundles"],
- "rule_decisions": decisions,
- "search_queries": author_search_queries,
- "query_failures": [],
- "walk_actions": walk_actions,
- }
- def _author_walk_action(
- run_id: str,
- policy_run_id: str,
- author_id: str,
- walk_status: str,
- created_at: str,
- *,
- budget_tier: str,
- binding: dict[str, Any],
- decision: dict[str, Any] | None,
- content_pack: dict[str, Any],
- reason_code: str | None = None,
- ) -> dict[str, Any]:
- return _walk_action(
- run_id,
- policy_run_id,
- _walk_action_id(run_id, policy_run_id, "author_to_works", author_id, "works"),
- "author_to_works",
- "author",
- "Author",
- author_id,
- "AuthorWorksPage",
- author_id,
- "fetch_author_works",
- walk_status,
- created_at,
- reason_code=reason_code,
- budget_tier=budget_tier,
- rule_pack_binding=binding,
- rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
- fallback_rule_pack=content_pack,
- raw_extra=_decision_context(decision),
- )
- def _terminal_stage(
- pattern_seed_pack: dict[str, Any],
- search_queries: list[dict[str, Any]],
- discovered_content_items: list[dict[str, Any]],
- decisions: list[dict[str, Any]],
- walk_strategy: dict[str, Any],
- created_at: str,
- ) -> dict[str, list[dict[str, Any]]]:
- """终端边(decision_to_asset/budget_downgrade/path_stop)+ 3 类血缘(原 plan_walk 语义)。"""
- binding_by_edge = _binding_by_edge_id(walk_strategy)
- decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
- walk_actions: list[dict[str, Any]] = []
- source_path_record_basis: list[dict[str, Any]] = []
- for search_query in search_queries:
- source_path_record_basis.append(
- {
- "policy_run_id": search_query["policy_run_id"],
- "record_schema_version": search_query["record_schema_version"],
- "from_node_type": "PatternSeed",
- "from_node_id": pattern_seed_pack["pattern_execution_id"],
- "to_node_type": "SearchQuery",
- "to_node_id": search_query["search_query_id"],
- "source_path_type": "pattern_to_search_query",
- "rule_pack_id": None,
- "decision_id": None,
- "discovery_start_source": search_query["discovery_start_source"],
- "previous_discovery_step": search_query["previous_discovery_step"],
- "origin_path_id": f"pattern_to_search_query:{search_query['search_query_id']}",
- "source_evidence_ref": "source_context.json#ext_data.evidence_pack",
- }
- )
- for item in discovered_content_items:
- # 无判定内容不产终端动作;判定覆盖完整性由 validate_run 把关,这里不掩盖。
- decision = decision_by_target_id.get(item["platform_content_id"])
- if not decision:
- continue
- decision_action = walk_terminal._action_for_decision(decision["decision_action"])
- binding = binding_by_edge.get(decision_action["edge_id"]) or {}
- execution = {
- "executed": True,
- "executed_rule_pack_id": decision["rule_pack_id"],
- "reason": "content_decision_reused_for_walk_gate",
- }
- walk_action_id = walk_terminal._walk_action_id(
- decision["run_id"],
- decision["policy_run_id"],
- decision_action["edge_id"],
- item["platform_content_id"],
- decision["decision_id"],
- )
- query_sources = item.get("query_sources") or [
- {
- "search_query_id": item["search_query_id"],
- "search_query": item.get("search_query"),
- "search_query_generation_method": item.get("search_query_generation_method"),
- }
- ]
- for query_source in query_sources:
- search_query_id = query_source["search_query_id"]
- source_path_record_basis.append(
- {
- "policy_run_id": decision["policy_run_id"],
- "record_schema_version": decision["record_schema_version"],
- "from_node_type": "SearchQuery",
- "from_node_id": search_query_id,
- "to_node_type": "Content",
- "to_node_id": item["platform_content_id"],
- "source_path_type": "search_query_to_content",
- "rule_pack_id": decision["rule_pack_id"],
- "decision_id": decision["decision_id"],
- "discovery_start_source": item["discovery_start_source"],
- "previous_discovery_step": item["previous_discovery_step"],
- "origin_path_id": (
- f"search_query_to_content:{search_query_id}:"
- f"{item['platform_content_id']}"
- ),
- "source_evidence_ref": decision["decision_input_snapshot_id"],
- "walk_action_id": walk_action_id,
- "rule_pack_binding": binding,
- "rule_pack_execution": execution,
- }
- )
- walk_actions.append(
- walk_terminal._walk_action_row(
- decision, item, decision_action, walk_action_id, created_at, binding, execution
- )
- )
- if decision["decision_action"] == "ADD_TO_CONTENT_POOL":
- source_path_record_basis.append(
- {
- "policy_run_id": decision["policy_run_id"],
- "record_schema_version": decision["record_schema_version"],
- "from_node_type": "RuleDecision",
- "from_node_id": decision["decision_id"],
- "to_node_type": "ContentAsset",
- "to_node_id": item["platform_content_id"],
- "source_path_type": "decision_to_asset",
- "rule_pack_id": decision["rule_pack_id"],
- "decision_id": decision["decision_id"],
- "discovery_start_source": item["discovery_start_source"],
- "previous_discovery_step": "asset_commit",
- "origin_path_id": (
- f"decision_to_asset:{decision['decision_id']}:"
- f"{item['platform_content_id']}"
- ),
- "source_evidence_ref": decision["decision_input_snapshot_id"],
- "walk_action_id": walk_action_id,
- "rule_pack_binding": binding,
- "rule_pack_execution": execution,
- }
- )
- return {"walk_actions": walk_actions, "source_path_record_basis": source_path_record_basis}
- def _query_actions(
- query_rows: list[dict[str, Any]],
- query_failures: list[dict[str, Any]],
- created_at: str,
- *,
- walk_strategy: dict[str, Any],
- content_pack: dict[str, Any],
- ) -> list[dict[str, Any]]:
- failure_ids = {row["search_query_id"] for row in query_failures}
- actions: list[dict[str, Any]] = []
- for row in query_rows:
- edge_id = (
- "hashtag_to_query"
- if row.get("search_query_generation_method") == "tag_query"
- else "query_next_page"
- )
- binding, _ = _resolve_edge_binding(edge_id, walk_strategy)
- actions.append(
- _walk_action(
- row["run_id"],
- row["policy_run_id"],
- _walk_action_id(
- row["run_id"],
- row["policy_run_id"],
- edge_id,
- row["search_query_id"],
- "query",
- ),
- edge_id,
- "query",
- "SearchQuery",
- row.get("raw_payload", {}).get("parent_search_query_id")
- or row.get("raw_payload", {}).get("hashtag")
- or row["search_query_id"],
- "SearchQuery",
- row["search_query_id"],
- "create_tag_query" if edge_id == "hashtag_to_query" else "fetch_next_page",
- "failed" if row["search_query_id"] in failure_ids else "success",
- created_at,
- page_cursor=row.get("page_cursor"),
- rule_pack_binding=binding,
- rule_pack_execution={
- "executed": True,
- "executed_rule_pack_id": content_pack["rule_pack_id"],
- "reason": "content_decision_reused_for_walk_gate",
- },
- fallback_rule_pack=content_pack,
- )
- )
- return actions
- def _search_query_row(
- run_id: str,
- policy_run_id: str,
- search_query_id: str,
- search_query: str,
- method: str,
- discovery_start_source: str,
- previous_discovery_step: str,
- created_at: str,
- *,
- page_cursor: str | None = None,
- raw_extra: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- row = {
- "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "search_query_id": search_query_id,
- "search_query": search_query,
- "search_query_generation_method": method,
- "discovery_start_source": discovery_start_source,
- "previous_discovery_step": previous_discovery_step,
- "pattern_seed_ref": {},
- "page_cursor": page_cursor,
- "created_at": created_at,
- **(raw_extra or {}),
- }
- return with_raw_payload(row)
- def _walk_action(
- run_id: str,
- policy_run_id: str,
- walk_action_id: str,
- edge_id: str,
- edge_type: str,
- from_node_type: str,
- from_node_id: str,
- to_node_type: str,
- to_node_id: str,
- walk_action: str,
- walk_status: str,
- created_at: str,
- *,
- page_cursor: str | None = None,
- reason_code: str | None = None,
- budget_tier: str | None = None,
- rule_pack_binding: dict[str, Any] | None = None,
- rule_pack_execution: dict[str, Any] | None = None,
- fallback_rule_pack: dict[str, Any] | None = None,
- raw_extra: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- row = with_raw_payload(
- {
- "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "walk_action_id": walk_action_id,
- "edge_id": edge_id,
- "edge_type": edge_type,
- "from_node_type": from_node_type,
- "from_node_id": from_node_id,
- "to_node_type": to_node_type,
- "to_node_id": to_node_id,
- "walk_action": walk_action,
- "walk_status": walk_status,
- "budget_tier": budget_tier or ("normal" if walk_status == "success" else "low_budget"),
- "depth": 1,
- "page_cursor": page_cursor,
- "reason_code": reason_code,
- "rule_pack_id": (rule_pack_binding or {}).get("rule_pack_id")
- or (fallback_rule_pack or {}).get("rule_pack_id"),
- "rule_pack_version": (rule_pack_binding or {}).get("rule_pack_version")
- or (fallback_rule_pack or {}).get("rule_pack_version"),
- "created_at": created_at,
- }
- )
- row["raw_payload"]["rule_pack_binding"] = rule_pack_binding or {}
- row["raw_payload"]["rule_pack_execution"] = rule_pack_execution or {}
- if raw_extra:
- row["raw_payload"].update(raw_extra)
- return row
- def _decision_by_content_id(rule_decisions: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
- return {row["decision_target_id"]: row for row in rule_decisions}
- def _query_effect_by_search_query_id(
- discovered_content_items: list[dict[str, Any]],
- rule_decisions: list[dict[str, Any]],
- ) -> dict[str, str]:
- decision_by_content_id = _decision_by_content_id(rule_decisions)
- statuses_by_query: dict[str, set[str]] = {}
- for item in discovered_content_items:
- decision = decision_by_content_id.get(item.get("platform_content_id"))
- if not decision:
- continue
- query_sources = item.get("query_sources") or [{"search_query_id": item.get("search_query_id")}]
- for query_source in query_sources:
- search_query_id = query_source.get("search_query_id")
- if search_query_id:
- statuses_by_query.setdefault(search_query_id, set()).add(
- decision.get("search_query_effect_status")
- )
- effects: dict[str, str] = {}
- for search_query_id, statuses in statuses_by_query.items():
- for status in ("success", "pending", "rule_blocked", "failed"):
- if status in statuses:
- effects[search_query_id] = status
- break
- return effects
- def _can_fetch_next_page(search_query_id: str, query_effect_by_id: dict[str, str]) -> bool:
- return query_effect_by_id.get(search_query_id) == "success"
- def _edge_permission_for(
- decision: dict[str, Any] | None, edge_id: str, policy: dict[str, Any]
- ) -> str:
- """判定→边通行证:无判定 / 查询 rule_blocked 一律 deny,其余查 edge_permissions。"""
- if not decision or decision.get("search_query_effect_status") == "rule_blocked":
- return "deny"
- return edge_permission(policy, decision.get("decision_action"), edge_id)
- def _binding_by_edge_id(walk_strategy: dict[str, Any]) -> dict[str, dict[str, Any]]:
- return {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])}
- def _resolve_edge_binding(
- edge_id: str, walk_strategy: dict[str, Any]
- ) -> tuple[dict[str, Any], str | None]:
- binding = _binding_by_edge_id(walk_strategy).get(edge_id)
- if not binding:
- return {}, "edge_binding_missing"
- return binding, None
- def _execution_record(decision: dict[str, Any] | None, *, content_pack_id: str) -> dict[str, Any]:
- if decision:
- return {
- "executed": True,
- "executed_rule_pack_id": decision.get("rule_pack_id") or content_pack_id,
- "reason": "content_decision_reused_for_walk_gate",
- }
- return {
- "executed": False,
- "executed_rule_pack_id": None,
- "reason": "future_pack_not_enabled",
- }
- def _decision_context(decision: dict[str, Any] | None) -> dict[str, Any]:
- if not decision:
- return {"decision_action": None, "search_query_effect_status": None}
- return {
- "decision_action": decision.get("decision_action"),
- "search_query_effect_status": decision.get("search_query_effect_status"),
- }
- def _merge_batch(context: dict[str, list[dict[str, Any]]], batch: dict[str, list[dict[str, Any]]]) -> None:
- for key in [
- "discovered_content_items",
- "content_media_records",
- "evidence_bundles",
- "rule_decisions",
- ]:
- context[key].extend(batch.get(key, []))
- def _new_platform_results(
- platform_results: list[dict[str, Any]],
- existing_content_ids: set[str],
- ) -> list[dict[str, Any]]:
- rows: list[dict[str, Any]] = []
- seen = set(existing_content_ids)
- for row in platform_results:
- content_id = row.get("platform_content_id")
- if content_id and content_id in seen:
- continue
- if content_id:
- seen.add(content_id)
- rows.append(row)
- return rows
- def _empty_batch() -> dict[str, list[dict[str, Any]]]:
- return {
- "discovered_content_items": [],
- "content_media_records": [],
- "evidence_bundles": [],
- "rule_decisions": [],
- "search_queries": [],
- "query_failures": [],
- "walk_actions": [],
- }
- def _unique_authors(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
- seen: set[str] = set()
- unique: list[dict[str, Any]] = []
- for item in items:
- author_id = item.get("platform_author_id")
- if not author_id or author_id in seen:
- continue
- seen.add(author_id)
- unique.append(item)
- return unique
- def _blocked_tag(tag: str) -> bool:
- lowered = tag.lower()
- return any(term in lowered for term in BLOCKED_TAG_TERMS)
- def _walk_action_id(
- run_id: str,
- policy_run_id: str,
- edge_id: str,
- target_id: str,
- suffix: str,
- ) -> str:
- raw = f"{run_id}:{policy_run_id}:{edge_id}:{target_id}:{suffix}"
- return f"wa_{hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]}"
- def _short_hash(value: str) -> str:
- return hashlib.sha1(value.encode("utf-8")).hexdigest()[:10]
|