walk_engine.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. """有界游走(V3-M4):配置驱动的统一 frontier 流程,取代三段硬编码。
  2. 每条边走同一套闸:platform_profiles 是否 supported(blocked 显式 skip 不调用平台)
  3. → walk_policy.edge_permissions 按判定结果放行(取代 _can_expand_from_decision 硬编码)
  4. → walk_policy.edge_budgets 预算(取代 [:2]/[:3]/>=1 散落硬限;预算耗尽静默,对齐 v1 行为)。
  5. 终端边(commit/downgrade/stop)与 3 类血缘并入本模块终端阶段(原 plan_walk 节点已删)。
  6. """
  7. from __future__ import annotations
  8. import hashlib
  9. from datetime import datetime, timezone
  10. from typing import Any
  11. from content_agent.business_modules import content_discovery, platform_access, rule_judgment
  12. from content_agent.business_modules import walk_strategy as walk_terminal
  13. from content_agent.business_modules.content_discovery import pattern_recall
  14. from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
  15. from content_agent.errors import ContentAgentError
  16. from content_agent.integrations.walk_graph_json import (
  17. WalkGraphStore,
  18. edge_permission,
  19. edge_supported,
  20. )
  21. from content_agent.integrations.walk_strategy_json import WalkStrategyStore
  22. from content_agent.interfaces import (
  23. GeminiVideoClient,
  24. PlatformSearchClient,
  25. RuntimeFileStore,
  26. )
  27. from content_agent.record_payload import with_raw_payload
  28. BLOCKED_TAG_TERMS = {
  29. "h" + "ot",
  30. "trend" + "ing",
  31. "热" + "点",
  32. "热" + "榜",
  33. "养" + "号",
  34. "无" + "限",
  35. }
  36. def run_bounded_walk(
  37. *,
  38. run_id: str,
  39. policy_run_id: str,
  40. pattern_seed_pack: dict[str, Any],
  41. source_context: dict[str, Any],
  42. search_queries: list[dict[str, Any]],
  43. discovered_content_items: list[dict[str, Any]],
  44. content_media_records: list[dict[str, Any]],
  45. evidence_bundles: list[dict[str, Any]],
  46. rule_decisions: list[dict[str, Any]],
  47. policy_bundle: dict[str, Any],
  48. platform_client: PlatformSearchClient,
  49. runtime: RuntimeFileStore,
  50. gemini_video_client: GeminiVideoClient,
  51. ) -> dict[str, list[dict[str, Any]]]:
  52. created_at = datetime.now(timezone.utc).isoformat()
  53. walk_strategy = WalkStrategyStore().load_walk_strategy()
  54. store = WalkGraphStore()
  55. policy = store.load_policy()
  56. platform = next(
  57. (item["platform"] for item in discovered_content_items if item.get("platform")),
  58. "douyin",
  59. )
  60. profile = store.load_profile(platform)
  61. content_pack = {
  62. "rule_pack_id": policy_bundle["rule_pack_id"],
  63. "rule_pack_version": policy_bundle["rule_pack_version"],
  64. }
  65. context = {
  66. "search_queries": list(search_queries),
  67. "discovered_content_items": list(discovered_content_items),
  68. "content_media_records": list(content_media_records),
  69. "evidence_bundles": list(evidence_bundles),
  70. "rule_decisions": list(rule_decisions),
  71. "walk_actions": [],
  72. }
  73. next_decision_index = len(rule_decisions) + 1
  74. next_recall_index = len(discovered_content_items) + 1
  75. query_rows, query_skipped_actions = _expand_queries(
  76. run_id, policy_run_id, context["search_queries"], discovered_content_items,
  77. rule_decisions, created_at,
  78. policy=policy, profile=profile, walk_strategy=walk_strategy, content_pack=content_pack,
  79. )
  80. context["walk_actions"].extend(query_skipped_actions)
  81. if query_rows:
  82. runtime.append_jsonl(run_id, "search_queries.jsonl", query_rows)
  83. context["search_queries"].extend(query_rows)
  84. batch = _execute_query_batch(
  85. run_id=run_id,
  86. policy_run_id=policy_run_id,
  87. pattern_seed_pack=pattern_seed_pack,
  88. source_context=source_context,
  89. search_queries=query_rows,
  90. policy_bundle=policy_bundle,
  91. platform_client=platform_client,
  92. runtime=runtime,
  93. gemini_video_client=gemini_video_client,
  94. start_recall_index=next_recall_index,
  95. start_decision_index=next_decision_index,
  96. existing_content_ids={
  97. item["platform_content_id"] for item in context["discovered_content_items"]
  98. },
  99. )
  100. _merge_batch(context, batch)
  101. next_decision_index += len(batch["rule_decisions"])
  102. next_recall_index += len(batch["discovered_content_items"])
  103. context["walk_actions"].extend(
  104. _query_actions(
  105. query_rows, batch.get("query_failures", []), created_at,
  106. walk_strategy=walk_strategy, content_pack=content_pack,
  107. )
  108. )
  109. author_batch = _expand_authors(
  110. run_id=run_id,
  111. policy_run_id=policy_run_id,
  112. source_context=source_context,
  113. discovered_content_items=context["discovered_content_items"],
  114. rule_decisions=context["rule_decisions"],
  115. policy=policy,
  116. profile=profile,
  117. walk_strategy=walk_strategy,
  118. policy_bundle=policy_bundle,
  119. content_pack=content_pack,
  120. platform_client=platform_client,
  121. runtime=runtime,
  122. gemini_video_client=gemini_video_client,
  123. start_recall_index=next_recall_index,
  124. start_decision_index=next_decision_index,
  125. created_at=created_at,
  126. )
  127. _merge_batch(context, author_batch)
  128. context["walk_actions"].extend(author_batch.get("walk_actions", []))
  129. # 作者作品的血缘 query 行:落盘后终端阶段(pattern_to_search_query 路径)与
  130. # recorder(search_clue 聚合)即可经既有机制覆盖作者内容,validate_run 不再断链。
  131. author_queries = author_batch.get("search_queries", [])
  132. if author_queries:
  133. runtime.append_jsonl(run_id, "search_queries.jsonl", author_queries)
  134. context["search_queries"].extend(author_queries)
  135. terminal = _terminal_stage(
  136. pattern_seed_pack,
  137. context["search_queries"],
  138. context["discovered_content_items"],
  139. context["rule_decisions"],
  140. walk_strategy,
  141. created_at,
  142. )
  143. context["walk_actions"].extend(terminal["walk_actions"])
  144. context["source_path_record_basis"] = terminal["source_path_record_basis"]
  145. return context
  146. def _execute_query_batch(
  147. *,
  148. run_id: str,
  149. policy_run_id: str,
  150. pattern_seed_pack: dict[str, Any],
  151. source_context: dict[str, Any],
  152. search_queries: list[dict[str, Any]],
  153. policy_bundle: dict[str, Any],
  154. platform_client: PlatformSearchClient,
  155. runtime: RuntimeFileStore,
  156. gemini_video_client: GeminiVideoClient,
  157. start_recall_index: int,
  158. start_decision_index: int,
  159. existing_content_ids: set[str],
  160. ) -> dict[str, list[dict[str, Any]]]:
  161. try:
  162. result = platform_access.run(search_queries, platform_client)
  163. except ContentAgentError as exc:
  164. return {
  165. "discovered_content_items": [],
  166. "content_media_records": [],
  167. "evidence_bundles": [],
  168. "rule_decisions": [],
  169. "query_failures": exc.detail.get("query_failures", []),
  170. }
  171. platform_results = _new_platform_results(result["platform_results"], existing_content_ids)
  172. if not platform_results:
  173. return {
  174. "discovered_content_items": [],
  175. "content_media_records": [],
  176. "evidence_bundles": [],
  177. "rule_decisions": [],
  178. "query_failures": result.get("query_failures", []),
  179. }
  180. discovered = content_discovery.run(
  181. run_id,
  182. policy_run_id,
  183. platform_results,
  184. source_context,
  185. runtime,
  186. )
  187. recalled = pattern_recall.run(
  188. run_id,
  189. policy_run_id,
  190. discovered["discovered_content_items"],
  191. discovered["content_media_records"],
  192. discovered["evidence_bundles"],
  193. source_context,
  194. runtime,
  195. gemini_video_client,
  196. start_index=start_recall_index,
  197. )
  198. decisions = rule_judgment.run(
  199. run_id,
  200. policy_run_id,
  201. recalled["evidence_bundles"],
  202. policy_bundle,
  203. runtime,
  204. start_index=start_decision_index,
  205. )
  206. return {
  207. "discovered_content_items": recalled["discovered_content_items"],
  208. "content_media_records": discovered["content_media_records"],
  209. "evidence_bundles": recalled["evidence_bundles"],
  210. "rule_decisions": decisions,
  211. "query_failures": result.get("query_failures", []),
  212. }
  213. def _expand_queries(
  214. run_id: str,
  215. policy_run_id: str,
  216. search_queries: list[dict[str, Any]],
  217. discovered_content_items: list[dict[str, Any]],
  218. rule_decisions: list[dict[str, Any]],
  219. created_at: str,
  220. *,
  221. policy: dict[str, Any],
  222. profile: dict[str, Any],
  223. walk_strategy: dict[str, Any],
  224. content_pack: dict[str, Any],
  225. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
  226. budgets = policy["edge_budgets_by_id"]
  227. decision_by_content_id = _decision_by_content_id(rule_decisions)
  228. skipped_actions: list[dict[str, Any]] = []
  229. # 边 query_next_page:has_more+cursor、查询 effect=success 才翻页;
  230. # 预算耗尽不再静默(R8,2026-06-12 拍板):合格候选排不上队时补显式 skip 留痕。
  231. page_rows: list[dict[str, Any]] = []
  232. if edge_supported(profile, "query_next_page"):
  233. page_budget = budgets["query_next_page"]["max_total_actions"]
  234. page_binding, _ = _resolve_edge_binding("query_next_page", walk_strategy)
  235. query_by_id = {row["search_query_id"]: row for row in search_queries}
  236. query_effect_by_id = _query_effect_by_search_query_id(discovered_content_items, rule_decisions)
  237. seen_queries: set[str] = set()
  238. for item in discovered_content_items:
  239. search_query_id = item.get("search_query_id")
  240. cursor = item.get("next_cursor")
  241. if not item.get("has_more") or not cursor or search_query_id in seen_queries:
  242. continue
  243. if not _can_fetch_next_page(search_query_id, query_effect_by_id):
  244. continue
  245. source = query_by_id.get(search_query_id)
  246. if not source:
  247. continue
  248. seen_queries.add(search_query_id)
  249. if len(page_rows) >= page_budget:
  250. skipped_actions.append(
  251. _walk_action(
  252. run_id,
  253. policy_run_id,
  254. _walk_action_id(
  255. run_id, policy_run_id, "query_next_page", search_query_id, "page_budget"
  256. ),
  257. "query_next_page",
  258. "query",
  259. "SearchQuery",
  260. search_query_id,
  261. "SearchQuery",
  262. "next_page_skipped",
  263. "fetch_next_page",
  264. "skipped",
  265. created_at,
  266. reason_code="budget_exhausted",
  267. budget_tier="blocked",
  268. rule_pack_binding=page_binding,
  269. rule_pack_execution=_execution_record(
  270. decision_by_content_id.get(item.get("platform_content_id")),
  271. content_pack_id=content_pack["rule_pack_id"],
  272. ),
  273. fallback_rule_pack=content_pack,
  274. raw_extra={"parent_search_query_id": search_query_id},
  275. )
  276. )
  277. continue
  278. page_rows.append(
  279. _search_query_row(
  280. run_id,
  281. policy_run_id,
  282. f"{search_query_id}_page_002",
  283. source["search_query"],
  284. "query_next_page",
  285. source.get("discovery_start_source", "pattern_itemset"),
  286. "query_next_page",
  287. created_at,
  288. page_cursor=str(cursor),
  289. raw_extra={"parent_search_query_id": search_query_id},
  290. )
  291. )
  292. # 回灌链 video_to_hashtag → hashtag_to_query:keep_only 门由 edge_permissions 查表。
  293. tag_rows: list[dict[str, Any]] = []
  294. if edge_supported(profile, "video_to_hashtag") and edge_supported(profile, "hashtag_to_query"):
  295. tag_budget = budgets["hashtag_to_query"]["max_total_actions"]
  296. tag_binding, _ = _resolve_edge_binding("hashtag_to_query", walk_strategy)
  297. seen_tags: set[str] = set()
  298. for item in discovered_content_items:
  299. decision = decision_by_content_id.get(item.get("platform_content_id"))
  300. if not decision:
  301. continue
  302. if _edge_permission_for(decision, "video_to_hashtag", policy) == "deny":
  303. if item.get("tags"):
  304. reason_code = (
  305. "review_tag_expansion_disabled"
  306. if decision.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"
  307. else "blocked_by_rule_decision"
  308. )
  309. skipped_actions.append(
  310. _walk_action(
  311. run_id,
  312. policy_run_id,
  313. _walk_action_id(
  314. run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tags"
  315. ),
  316. "hashtag_to_query",
  317. "tag_query",
  318. "Content",
  319. item["platform_content_id"],
  320. "SearchQuery",
  321. "tag_query_skipped",
  322. "create_tag_query",
  323. "skipped",
  324. created_at,
  325. reason_code=reason_code,
  326. budget_tier="blocked",
  327. rule_pack_binding=tag_binding,
  328. rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
  329. fallback_rule_pack=content_pack,
  330. raw_extra=_decision_context(decision),
  331. )
  332. )
  333. continue
  334. for tag in item.get("tags") or []:
  335. normalized = str(tag).lstrip("#").strip()
  336. if not normalized or normalized in seen_tags or _blocked_tag(normalized):
  337. continue
  338. if len(tag_rows) >= tag_budget:
  339. # R8:该内容首个合格标签排不上队,补一条 budget_exhausted skip
  340. # (每内容至多一条;不占 seen_tags,不影响已执行集合)。
  341. skipped_actions.append(
  342. _walk_action(
  343. run_id,
  344. policy_run_id,
  345. _walk_action_id(
  346. run_id, policy_run_id, "hashtag_to_query", item["platform_content_id"], "tag_budget"
  347. ),
  348. "hashtag_to_query",
  349. "tag_query",
  350. "Content",
  351. item["platform_content_id"],
  352. "SearchQuery",
  353. "tag_query_skipped",
  354. "create_tag_query",
  355. "skipped",
  356. created_at,
  357. reason_code="budget_exhausted",
  358. budget_tier="blocked",
  359. rule_pack_binding=tag_binding,
  360. rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
  361. fallback_rule_pack=content_pack,
  362. raw_extra={**_decision_context(decision), "hashtag": normalized},
  363. )
  364. )
  365. break
  366. seen_tags.add(normalized)
  367. tag_rows.append(
  368. _search_query_row(
  369. run_id,
  370. policy_run_id,
  371. f"tag_{_short_hash(normalized)}",
  372. normalized,
  373. "tag_query",
  374. item.get("discovery_start_source", "pattern_itemset"),
  375. "hashtag_to_query",
  376. created_at,
  377. raw_extra={
  378. "hashtag": normalized,
  379. "source_content_id": item.get("platform_content_id"),
  380. },
  381. )
  382. )
  383. return [*page_rows, *tag_rows], skipped_actions
  384. def _expand_authors(
  385. *,
  386. run_id: str,
  387. policy_run_id: str,
  388. source_context: dict[str, Any],
  389. discovered_content_items: list[dict[str, Any]],
  390. rule_decisions: list[dict[str, Any]],
  391. policy: dict[str, Any],
  392. profile: dict[str, Any],
  393. walk_strategy: dict[str, Any],
  394. policy_bundle: dict[str, Any],
  395. content_pack: dict[str, Any],
  396. platform_client: PlatformSearchClient,
  397. runtime: RuntimeFileStore,
  398. gemini_video_client: GeminiVideoClient,
  399. start_recall_index: int,
  400. start_decision_index: int,
  401. created_at: str,
  402. ) -> dict[str, list[dict[str, Any]]]:
  403. budgets = policy["edge_budgets_by_id"]["author_to_works"]
  404. decision_by_content_id = _decision_by_content_id(rule_decisions)
  405. binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
  406. unique_author_items = _unique_authors(discovered_content_items)
  407. author_items = unique_author_items[: budgets["max_total_actions"]]
  408. # R8(2026-06-12 拍板):预算外作者不再静默消失,补 budget_exhausted skip 留痕。
  409. overflow_author_items = unique_author_items[budgets["max_total_actions"]:]
  410. walk_actions: list[dict[str, Any]] = []
  411. # 平台不支持作者边(如视频号 blogger blocked):显式 skip 留痕、不调用平台,游走自然退化。
  412. if not edge_supported(profile, "author_to_works"):
  413. for item in author_items:
  414. author_id = item.get("platform_author_id")
  415. if not author_id:
  416. continue
  417. decision = decision_by_content_id.get(item.get("platform_content_id"))
  418. walk_actions.append(
  419. _author_walk_action(
  420. run_id, policy_run_id, author_id, "skipped", created_at,
  421. reason_code="edge_blocked_by_platform_profile",
  422. budget_tier="blocked",
  423. binding=binding,
  424. decision=decision,
  425. content_pack=content_pack,
  426. )
  427. )
  428. return {**_empty_batch(), "walk_actions": walk_actions}
  429. fetch_author_works = getattr(platform_client, "fetch_author_works", None) or getattr(
  430. platform_client, "author_works", None
  431. )
  432. if not callable(fetch_author_works):
  433. return _empty_batch()
  434. # 作者近期作品天然可能包含首轮已发现的同一条视频;不去重会撞
  435. # uk_ca_items_run_policy_content 唯一索引(真实 E2E v1_run_e6ba21f7543b 实证)。
  436. seen_content_ids = {
  437. str(item.get("platform_content_id"))
  438. for item in discovered_content_items
  439. if item.get("platform_content_id")
  440. }
  441. platform_results: list[dict[str, Any]] = []
  442. author_search_queries: list[dict[str, Any]] = []
  443. for item in author_items:
  444. author_id = item.get("platform_author_id")
  445. if not author_id:
  446. continue
  447. decision = decision_by_content_id.get(item.get("platform_content_id"))
  448. permission = _edge_permission_for(decision, "author_to_works", policy)
  449. if permission == "deny":
  450. walk_actions.append(
  451. _author_walk_action(
  452. run_id, policy_run_id, author_id, "skipped", created_at,
  453. reason_code="blocked_by_rule_decision",
  454. budget_tier="blocked",
  455. binding=binding,
  456. decision=decision,
  457. content_pack=content_pack,
  458. )
  459. )
  460. continue
  461. budget_tier = "low_budget" if permission == "allow_low_budget" else "normal"
  462. try:
  463. works = fetch_author_works(
  464. {
  465. "platform_author_id": author_id,
  466. "search_query_id": f"author_{_short_hash(author_id)}",
  467. "discovery_start_source": item.get("discovery_start_source", "pattern_itemset"),
  468. }
  469. )
  470. except Exception as exc:
  471. walk_actions.append(
  472. _author_walk_action(
  473. run_id, policy_run_id, author_id, "failed", created_at,
  474. reason_code=type(exc).__name__,
  475. budget_tier=budget_tier,
  476. binding=binding,
  477. decision=decision,
  478. content_pack=content_pack,
  479. )
  480. )
  481. continue
  482. walk_actions.append(
  483. _author_walk_action(
  484. run_id, policy_run_id, author_id, "success", created_at,
  485. budget_tier=budget_tier,
  486. binding=binding,
  487. decision=decision,
  488. content_pack=content_pack,
  489. )
  490. )
  491. new_works = [
  492. work
  493. for work in works
  494. if str(work.get("platform_content_id") or "") and str(work.get("platform_content_id")) not in seen_content_ids
  495. ]
  496. if new_works:
  497. # 血缘补全:作者作品内容引用的合成 query id 必须真实存在于 search_queries,
  498. # 否则 validate_run 断链(真实 E2E v1_run_3a3bc9f0d72d 实证:missing_search_query_ref 等 8 条 fail)。
  499. author_search_queries.append(
  500. _search_query_row(
  501. run_id,
  502. policy_run_id,
  503. f"author_{_short_hash(author_id)}",
  504. f"author:{author_id}",
  505. "author_works",
  506. item.get("discovery_start_source", "pattern_itemset"),
  507. "author_to_works",
  508. created_at,
  509. raw_extra={"platform_author_id": author_id},
  510. )
  511. )
  512. for index, work in enumerate(new_works[: budgets["max_works_per_author"]], start=1):
  513. seen_content_ids.add(str(work["platform_content_id"]))
  514. platform_results.append(
  515. {
  516. **work,
  517. "search_query_id": work.get("search_query_id") or f"author_{_short_hash(author_id)}",
  518. "content_discovery_id": work.get("content_discovery_id")
  519. or f"author_{_short_hash(author_id)}_content_{index:03d}",
  520. "discovery_start_source": work.get("discovery_start_source")
  521. or item.get("discovery_start_source", "pattern_itemset"),
  522. "previous_discovery_step": "author_works",
  523. }
  524. )
  525. for item in overflow_author_items:
  526. author_id = item.get("platform_author_id")
  527. if not author_id:
  528. continue
  529. walk_actions.append(
  530. _author_walk_action(
  531. run_id, policy_run_id, author_id, "skipped", created_at,
  532. reason_code="budget_exhausted",
  533. budget_tier="blocked",
  534. binding=binding,
  535. decision=decision_by_content_id.get(item.get("platform_content_id")),
  536. content_pack=content_pack,
  537. )
  538. )
  539. if not platform_results:
  540. return {**_empty_batch(), "walk_actions": walk_actions, "search_queries": author_search_queries}
  541. discovered = content_discovery.run(run_id, policy_run_id, platform_results, source_context, runtime)
  542. recalled = pattern_recall.run(
  543. run_id,
  544. policy_run_id,
  545. discovered["discovered_content_items"],
  546. discovered["content_media_records"],
  547. discovered["evidence_bundles"],
  548. source_context,
  549. runtime,
  550. gemini_video_client,
  551. start_index=start_recall_index,
  552. )
  553. decisions = rule_judgment.run(
  554. run_id,
  555. policy_run_id,
  556. recalled["evidence_bundles"],
  557. policy_bundle,
  558. runtime,
  559. start_index=start_decision_index,
  560. )
  561. return {
  562. "discovered_content_items": recalled["discovered_content_items"],
  563. "content_media_records": discovered["content_media_records"],
  564. "evidence_bundles": recalled["evidence_bundles"],
  565. "rule_decisions": decisions,
  566. "search_queries": author_search_queries,
  567. "query_failures": [],
  568. "walk_actions": walk_actions,
  569. }
  570. def _author_walk_action(
  571. run_id: str,
  572. policy_run_id: str,
  573. author_id: str,
  574. walk_status: str,
  575. created_at: str,
  576. *,
  577. budget_tier: str,
  578. binding: dict[str, Any],
  579. decision: dict[str, Any] | None,
  580. content_pack: dict[str, Any],
  581. reason_code: str | None = None,
  582. ) -> dict[str, Any]:
  583. return _walk_action(
  584. run_id,
  585. policy_run_id,
  586. _walk_action_id(run_id, policy_run_id, "author_to_works", author_id, "works"),
  587. "author_to_works",
  588. "author",
  589. "Author",
  590. author_id,
  591. "AuthorWorksPage",
  592. author_id,
  593. "fetch_author_works",
  594. walk_status,
  595. created_at,
  596. reason_code=reason_code,
  597. budget_tier=budget_tier,
  598. rule_pack_binding=binding,
  599. rule_pack_execution=_execution_record(decision, content_pack_id=content_pack["rule_pack_id"]),
  600. fallback_rule_pack=content_pack,
  601. raw_extra=_decision_context(decision),
  602. )
  603. def _terminal_stage(
  604. pattern_seed_pack: dict[str, Any],
  605. search_queries: list[dict[str, Any]],
  606. discovered_content_items: list[dict[str, Any]],
  607. decisions: list[dict[str, Any]],
  608. walk_strategy: dict[str, Any],
  609. created_at: str,
  610. ) -> dict[str, list[dict[str, Any]]]:
  611. """终端边(decision_to_asset/budget_downgrade/path_stop)+ 3 类血缘(原 plan_walk 语义)。"""
  612. binding_by_edge = _binding_by_edge_id(walk_strategy)
  613. decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
  614. walk_actions: list[dict[str, Any]] = []
  615. source_path_record_basis: list[dict[str, Any]] = []
  616. for search_query in search_queries:
  617. source_path_record_basis.append(
  618. {
  619. "policy_run_id": search_query["policy_run_id"],
  620. "record_schema_version": search_query["record_schema_version"],
  621. "from_node_type": "PatternSeed",
  622. "from_node_id": pattern_seed_pack["pattern_execution_id"],
  623. "to_node_type": "SearchQuery",
  624. "to_node_id": search_query["search_query_id"],
  625. "source_path_type": "pattern_to_search_query",
  626. "rule_pack_id": None,
  627. "decision_id": None,
  628. "discovery_start_source": search_query["discovery_start_source"],
  629. "previous_discovery_step": search_query["previous_discovery_step"],
  630. "origin_path_id": f"pattern_to_search_query:{search_query['search_query_id']}",
  631. "source_evidence_ref": "source_context.json#ext_data.evidence_pack",
  632. }
  633. )
  634. for item in discovered_content_items:
  635. # 无判定内容不产终端动作;判定覆盖完整性由 validate_run 把关,这里不掩盖。
  636. decision = decision_by_target_id.get(item["platform_content_id"])
  637. if not decision:
  638. continue
  639. decision_action = walk_terminal._action_for_decision(decision["decision_action"])
  640. binding = binding_by_edge.get(decision_action["edge_id"]) or {}
  641. execution = {
  642. "executed": True,
  643. "executed_rule_pack_id": decision["rule_pack_id"],
  644. "reason": "content_decision_reused_for_walk_gate",
  645. }
  646. walk_action_id = walk_terminal._walk_action_id(
  647. decision["run_id"],
  648. decision["policy_run_id"],
  649. decision_action["edge_id"],
  650. item["platform_content_id"],
  651. decision["decision_id"],
  652. )
  653. query_sources = item.get("query_sources") or [
  654. {
  655. "search_query_id": item["search_query_id"],
  656. "search_query": item.get("search_query"),
  657. "search_query_generation_method": item.get("search_query_generation_method"),
  658. }
  659. ]
  660. for query_source in query_sources:
  661. search_query_id = query_source["search_query_id"]
  662. source_path_record_basis.append(
  663. {
  664. "policy_run_id": decision["policy_run_id"],
  665. "record_schema_version": decision["record_schema_version"],
  666. "from_node_type": "SearchQuery",
  667. "from_node_id": search_query_id,
  668. "to_node_type": "Content",
  669. "to_node_id": item["platform_content_id"],
  670. "source_path_type": "search_query_to_content",
  671. "rule_pack_id": decision["rule_pack_id"],
  672. "decision_id": decision["decision_id"],
  673. "discovery_start_source": item["discovery_start_source"],
  674. "previous_discovery_step": item["previous_discovery_step"],
  675. "origin_path_id": (
  676. f"search_query_to_content:{search_query_id}:"
  677. f"{item['platform_content_id']}"
  678. ),
  679. "source_evidence_ref": decision["decision_input_snapshot_id"],
  680. "walk_action_id": walk_action_id,
  681. "rule_pack_binding": binding,
  682. "rule_pack_execution": execution,
  683. }
  684. )
  685. walk_actions.append(
  686. walk_terminal._walk_action_row(
  687. decision, item, decision_action, walk_action_id, created_at, binding, execution
  688. )
  689. )
  690. if decision["decision_action"] == "ADD_TO_CONTENT_POOL":
  691. source_path_record_basis.append(
  692. {
  693. "policy_run_id": decision["policy_run_id"],
  694. "record_schema_version": decision["record_schema_version"],
  695. "from_node_type": "RuleDecision",
  696. "from_node_id": decision["decision_id"],
  697. "to_node_type": "ContentAsset",
  698. "to_node_id": item["platform_content_id"],
  699. "source_path_type": "decision_to_asset",
  700. "rule_pack_id": decision["rule_pack_id"],
  701. "decision_id": decision["decision_id"],
  702. "discovery_start_source": item["discovery_start_source"],
  703. "previous_discovery_step": "asset_commit",
  704. "origin_path_id": (
  705. f"decision_to_asset:{decision['decision_id']}:"
  706. f"{item['platform_content_id']}"
  707. ),
  708. "source_evidence_ref": decision["decision_input_snapshot_id"],
  709. "walk_action_id": walk_action_id,
  710. "rule_pack_binding": binding,
  711. "rule_pack_execution": execution,
  712. }
  713. )
  714. return {"walk_actions": walk_actions, "source_path_record_basis": source_path_record_basis}
  715. def _query_actions(
  716. query_rows: list[dict[str, Any]],
  717. query_failures: list[dict[str, Any]],
  718. created_at: str,
  719. *,
  720. walk_strategy: dict[str, Any],
  721. content_pack: dict[str, Any],
  722. ) -> list[dict[str, Any]]:
  723. failure_ids = {row["search_query_id"] for row in query_failures}
  724. actions: list[dict[str, Any]] = []
  725. for row in query_rows:
  726. edge_id = (
  727. "hashtag_to_query"
  728. if row.get("search_query_generation_method") == "tag_query"
  729. else "query_next_page"
  730. )
  731. binding, _ = _resolve_edge_binding(edge_id, walk_strategy)
  732. actions.append(
  733. _walk_action(
  734. row["run_id"],
  735. row["policy_run_id"],
  736. _walk_action_id(
  737. row["run_id"],
  738. row["policy_run_id"],
  739. edge_id,
  740. row["search_query_id"],
  741. "query",
  742. ),
  743. edge_id,
  744. "query",
  745. "SearchQuery",
  746. row.get("raw_payload", {}).get("parent_search_query_id")
  747. or row.get("raw_payload", {}).get("hashtag")
  748. or row["search_query_id"],
  749. "SearchQuery",
  750. row["search_query_id"],
  751. "create_tag_query" if edge_id == "hashtag_to_query" else "fetch_next_page",
  752. "failed" if row["search_query_id"] in failure_ids else "success",
  753. created_at,
  754. page_cursor=row.get("page_cursor"),
  755. rule_pack_binding=binding,
  756. rule_pack_execution={
  757. "executed": True,
  758. "executed_rule_pack_id": content_pack["rule_pack_id"],
  759. "reason": "content_decision_reused_for_walk_gate",
  760. },
  761. fallback_rule_pack=content_pack,
  762. )
  763. )
  764. return actions
  765. def _search_query_row(
  766. run_id: str,
  767. policy_run_id: str,
  768. search_query_id: str,
  769. search_query: str,
  770. method: str,
  771. discovery_start_source: str,
  772. previous_discovery_step: str,
  773. created_at: str,
  774. *,
  775. page_cursor: str | None = None,
  776. raw_extra: dict[str, Any] | None = None,
  777. ) -> dict[str, Any]:
  778. row = {
  779. "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
  780. "run_id": run_id,
  781. "policy_run_id": policy_run_id,
  782. "search_query_id": search_query_id,
  783. "search_query": search_query,
  784. "search_query_generation_method": method,
  785. "discovery_start_source": discovery_start_source,
  786. "previous_discovery_step": previous_discovery_step,
  787. "pattern_seed_ref": {},
  788. "page_cursor": page_cursor,
  789. "created_at": created_at,
  790. **(raw_extra or {}),
  791. }
  792. return with_raw_payload(row)
  793. def _walk_action(
  794. run_id: str,
  795. policy_run_id: str,
  796. walk_action_id: str,
  797. edge_id: str,
  798. edge_type: str,
  799. from_node_type: str,
  800. from_node_id: str,
  801. to_node_type: str,
  802. to_node_id: str,
  803. walk_action: str,
  804. walk_status: str,
  805. created_at: str,
  806. *,
  807. page_cursor: str | None = None,
  808. reason_code: str | None = None,
  809. budget_tier: str | None = None,
  810. rule_pack_binding: dict[str, Any] | None = None,
  811. rule_pack_execution: dict[str, Any] | None = None,
  812. fallback_rule_pack: dict[str, Any] | None = None,
  813. raw_extra: dict[str, Any] | None = None,
  814. ) -> dict[str, Any]:
  815. row = with_raw_payload(
  816. {
  817. "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
  818. "run_id": run_id,
  819. "policy_run_id": policy_run_id,
  820. "walk_action_id": walk_action_id,
  821. "edge_id": edge_id,
  822. "edge_type": edge_type,
  823. "from_node_type": from_node_type,
  824. "from_node_id": from_node_id,
  825. "to_node_type": to_node_type,
  826. "to_node_id": to_node_id,
  827. "walk_action": walk_action,
  828. "walk_status": walk_status,
  829. "budget_tier": budget_tier or ("normal" if walk_status == "success" else "low_budget"),
  830. "depth": 1,
  831. "page_cursor": page_cursor,
  832. "reason_code": reason_code,
  833. "rule_pack_id": (rule_pack_binding or {}).get("rule_pack_id")
  834. or (fallback_rule_pack or {}).get("rule_pack_id"),
  835. "rule_pack_version": (rule_pack_binding or {}).get("rule_pack_version")
  836. or (fallback_rule_pack or {}).get("rule_pack_version"),
  837. "created_at": created_at,
  838. }
  839. )
  840. row["raw_payload"]["rule_pack_binding"] = rule_pack_binding or {}
  841. row["raw_payload"]["rule_pack_execution"] = rule_pack_execution or {}
  842. if raw_extra:
  843. row["raw_payload"].update(raw_extra)
  844. return row
  845. def _decision_by_content_id(rule_decisions: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
  846. return {row["decision_target_id"]: row for row in rule_decisions}
  847. def _query_effect_by_search_query_id(
  848. discovered_content_items: list[dict[str, Any]],
  849. rule_decisions: list[dict[str, Any]],
  850. ) -> dict[str, str]:
  851. decision_by_content_id = _decision_by_content_id(rule_decisions)
  852. statuses_by_query: dict[str, set[str]] = {}
  853. for item in discovered_content_items:
  854. decision = decision_by_content_id.get(item.get("platform_content_id"))
  855. if not decision:
  856. continue
  857. query_sources = item.get("query_sources") or [{"search_query_id": item.get("search_query_id")}]
  858. for query_source in query_sources:
  859. search_query_id = query_source.get("search_query_id")
  860. if search_query_id:
  861. statuses_by_query.setdefault(search_query_id, set()).add(
  862. decision.get("search_query_effect_status")
  863. )
  864. effects: dict[str, str] = {}
  865. for search_query_id, statuses in statuses_by_query.items():
  866. for status in ("success", "pending", "rule_blocked", "failed"):
  867. if status in statuses:
  868. effects[search_query_id] = status
  869. break
  870. return effects
  871. def _can_fetch_next_page(search_query_id: str, query_effect_by_id: dict[str, str]) -> bool:
  872. return query_effect_by_id.get(search_query_id) == "success"
  873. def _edge_permission_for(
  874. decision: dict[str, Any] | None, edge_id: str, policy: dict[str, Any]
  875. ) -> str:
  876. """判定→边通行证:无判定 / 查询 rule_blocked 一律 deny,其余查 edge_permissions。"""
  877. if not decision or decision.get("search_query_effect_status") == "rule_blocked":
  878. return "deny"
  879. return edge_permission(policy, decision.get("decision_action"), edge_id)
  880. def _binding_by_edge_id(walk_strategy: dict[str, Any]) -> dict[str, dict[str, Any]]:
  881. return {row["edge_id"]: row for row in walk_strategy.get("walk_rule_pack_binding", [])}
  882. def _resolve_edge_binding(
  883. edge_id: str, walk_strategy: dict[str, Any]
  884. ) -> tuple[dict[str, Any], str | None]:
  885. binding = _binding_by_edge_id(walk_strategy).get(edge_id)
  886. if not binding:
  887. return {}, "edge_binding_missing"
  888. return binding, None
  889. def _execution_record(decision: dict[str, Any] | None, *, content_pack_id: str) -> dict[str, Any]:
  890. if decision:
  891. return {
  892. "executed": True,
  893. "executed_rule_pack_id": decision.get("rule_pack_id") or content_pack_id,
  894. "reason": "content_decision_reused_for_walk_gate",
  895. }
  896. return {
  897. "executed": False,
  898. "executed_rule_pack_id": None,
  899. "reason": "future_pack_not_enabled",
  900. }
  901. def _decision_context(decision: dict[str, Any] | None) -> dict[str, Any]:
  902. if not decision:
  903. return {"decision_action": None, "search_query_effect_status": None}
  904. return {
  905. "decision_action": decision.get("decision_action"),
  906. "search_query_effect_status": decision.get("search_query_effect_status"),
  907. }
  908. def _merge_batch(context: dict[str, list[dict[str, Any]]], batch: dict[str, list[dict[str, Any]]]) -> None:
  909. for key in [
  910. "discovered_content_items",
  911. "content_media_records",
  912. "evidence_bundles",
  913. "rule_decisions",
  914. ]:
  915. context[key].extend(batch.get(key, []))
  916. def _new_platform_results(
  917. platform_results: list[dict[str, Any]],
  918. existing_content_ids: set[str],
  919. ) -> list[dict[str, Any]]:
  920. rows: list[dict[str, Any]] = []
  921. seen = set(existing_content_ids)
  922. for row in platform_results:
  923. content_id = row.get("platform_content_id")
  924. if content_id and content_id in seen:
  925. continue
  926. if content_id:
  927. seen.add(content_id)
  928. rows.append(row)
  929. return rows
  930. def _empty_batch() -> dict[str, list[dict[str, Any]]]:
  931. return {
  932. "discovered_content_items": [],
  933. "content_media_records": [],
  934. "evidence_bundles": [],
  935. "rule_decisions": [],
  936. "search_queries": [],
  937. "query_failures": [],
  938. "walk_actions": [],
  939. }
  940. def _unique_authors(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
  941. seen: set[str] = set()
  942. unique: list[dict[str, Any]] = []
  943. for item in items:
  944. author_id = item.get("platform_author_id")
  945. if not author_id or author_id in seen:
  946. continue
  947. seen.add(author_id)
  948. unique.append(item)
  949. return unique
  950. def _blocked_tag(tag: str) -> bool:
  951. lowered = tag.lower()
  952. return any(term in lowered for term in BLOCKED_TAG_TERMS)
  953. def _walk_action_id(
  954. run_id: str,
  955. policy_run_id: str,
  956. edge_id: str,
  957. target_id: str,
  958. suffix: str,
  959. ) -> str:
  960. raw = f"{run_id}:{policy_run_id}:{edge_id}:{target_id}:{suffix}"
  961. return f"wa_{hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]}"
  962. def _short_hash(value: str) -> str:
  963. return hashlib.sha1(value.encode("utf-8")).hexdigest()[:10]