result_source_lookup.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. from __future__ import annotations
  2. import hashlib
  3. from collections import Counter
  4. from typing import Any
  5. from content_agent.constants import RUNTIME_SCHEMA_VERSION
  6. from content_agent.business_modules.run_record.validation import compute_final_output_completeness
  7. from content_agent.interfaces import RuntimeFileStore
  8. def run(
  9. run_id: str,
  10. policy_run_id: str,
  11. policy_bundle: dict[str, Any],
  12. discovered_content_items: list[dict[str, Any]],
  13. content_media_records: list[dict[str, Any]],
  14. decisions: list[dict[str, Any]],
  15. source_path_records: list[dict[str, Any]],
  16. search_clues: list[dict[str, Any]],
  17. runtime: RuntimeFileStore,
  18. ) -> dict[str, Any]:
  19. decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
  20. media_by_platform_content_id = {
  21. media["platform_content_id"]: media for media in content_media_records
  22. }
  23. paths_by_content_id = _paths_by_content_id(source_path_records)
  24. content_assets = _build_content_assets(
  25. policy_run_id,
  26. discovered_content_items,
  27. decision_by_target_id,
  28. media_by_platform_content_id,
  29. paths_by_content_id,
  30. )
  31. review_records = _build_review_records(
  32. policy_run_id,
  33. discovered_content_items,
  34. decision_by_target_id,
  35. media_by_platform_content_id,
  36. paths_by_content_id,
  37. )
  38. reject_records = _build_reject_records(
  39. policy_run_id,
  40. discovered_content_items,
  41. decision_by_target_id,
  42. )
  43. author_assets, author_asset_rows, author_role_rows = _build_author_assets(
  44. run_id,
  45. policy_run_id,
  46. discovered_content_items,
  47. decision_by_target_id,
  48. paths_by_content_id,
  49. )
  50. action_counts = Counter(decision["decision_action"] for decision in decisions)
  51. effect_status_counts = Counter(
  52. decision.get("search_query_effect_status", "failed") for decision in decisions
  53. )
  54. final_output = {
  55. "schema_version": RUNTIME_SCHEMA_VERSION,
  56. "run_id": run_id,
  57. "policy_run_id": policy_run_id,
  58. "policy": {
  59. "policy_bundle_id": policy_bundle["policy_bundle_id"],
  60. "strategy_id": policy_bundle["strategy_id"],
  61. "strategy_version": policy_bundle["strategy_version"],
  62. "rule_pack_id": policy_bundle["rule_pack_id"],
  63. "rule_pack_version": policy_bundle["rule_pack_version"],
  64. "policy_bundle_hash": policy_bundle["policy_bundle_hash"],
  65. "strategy_source_ref": policy_bundle["strategy_source_ref"],
  66. "rule_pack_source_ref": policy_bundle["rule_pack_source_ref"],
  67. },
  68. "walk_strategy": {
  69. "walk_strategy_id": policy_bundle.get("walk_strategy_id"),
  70. "walk_strategy_version": policy_bundle.get("walk_strategy_version"),
  71. "walk_strategy_source_ref": policy_bundle.get("walk_strategy_source_ref"),
  72. },
  73. "dispatch": policy_bundle.get("dispatch"),
  74. "dispatch_id": policy_bundle.get("dispatch_id"),
  75. "runtime_status_contract": policy_bundle.get("runtime_status_contract", {}),
  76. "content_assets": content_assets,
  77. "author_assets": author_assets,
  78. "review_records": review_records,
  79. "decision_records": [
  80. {
  81. "decision_id": decision["decision_id"],
  82. "policy_run_id": policy_run_id,
  83. "rule_pack_id": decision["rule_pack_id"],
  84. "rule_pack_version": decision["rule_pack_version"],
  85. "strategy_version": decision["strategy_version"],
  86. "decision_target_id": decision["decision_target_id"],
  87. "decision_action": decision["decision_action"],
  88. "decision_reason_code": decision["decision_reason_code"],
  89. "search_query_effect_status": decision["search_query_effect_status"],
  90. "decision_replay_data": decision.get("decision_replay_data", {}),
  91. "source_evidence": decision["source_evidence"],
  92. }
  93. for decision in decisions
  94. ],
  95. "search_clues": [
  96. {
  97. "search_query_id": clue["search_query_id"],
  98. "policy_run_id": policy_run_id,
  99. "final_asset_status": "clue_only",
  100. "search_query_effect_status": clue["search_query_effect_status"],
  101. }
  102. for clue in search_clues
  103. ],
  104. "reject_records": reject_records,
  105. "summary": {
  106. "search_query_count": len(search_clues),
  107. "pooled_content_count": action_counts["ADD_TO_CONTENT_POOL"],
  108. "review_content_count": action_counts["KEEP_CONTENT_FOR_REVIEW"],
  109. "pending_content_count": 0,
  110. "rejected_content_count": action_counts["REJECT_CONTENT"],
  111. "effect_status_counts": {
  112. "success": effect_status_counts["success"],
  113. "pending": effect_status_counts["pending"],
  114. "failed": effect_status_counts["failed"],
  115. "rule_blocked": effect_status_counts["rule_blocked"],
  116. },
  117. "policy_bundle_hash": policy_bundle["policy_bundle_hash"],
  118. },
  119. }
  120. completeness = compute_final_output_completeness(
  121. final_output,
  122. decisions,
  123. source_path_records,
  124. )
  125. final_output["validation_status"] = completeness["validation_status"]
  126. final_output["summary"]["run_path_complete"] = completeness["run_path_complete"]
  127. final_output["summary"]["trace_complete"] = completeness["trace_complete"]
  128. final_output["summary"]["validation_findings_summary"] = completeness["findings_summary"]
  129. runtime.write_json(run_id, "final_output.json", final_output)
  130. runtime.write_publish_jobs(
  131. run_id,
  132. policy_run_id,
  133. _build_publish_jobs(run_id, policy_run_id, content_assets),
  134. )
  135. runtime.write_author_assets(author_asset_rows)
  136. runtime.write_author_asset_roles(author_role_rows)
  137. return final_output
  138. def _build_content_assets(
  139. policy_run_id: str,
  140. discovered_content_items: list[dict[str, Any]],
  141. decision_by_target_id: dict[str, dict[str, Any]],
  142. media_by_platform_content_id: dict[str, dict[str, Any]],
  143. paths_by_content_id: dict[str, list[str]],
  144. ) -> list[dict[str, Any]]:
  145. content_assets: list[dict[str, Any]] = []
  146. for item in discovered_content_items:
  147. platform_content_id = item["platform_content_id"]
  148. decision = decision_by_target_id[platform_content_id]
  149. if decision["decision_action"] != "ADD_TO_CONTENT_POOL":
  150. continue
  151. path_ids = paths_by_content_id[platform_content_id]
  152. content_assets.append(
  153. {
  154. "platform": item["platform"],
  155. "platform_content_id": platform_content_id,
  156. "policy_run_id": policy_run_id,
  157. "content_discovery_id": item["content_discovery_id"],
  158. "final_asset_status": "pooled",
  159. "decision_id": decision["decision_id"],
  160. "rule_pack_id": decision["rule_pack_id"],
  161. "rule_pack_version": decision["rule_pack_version"],
  162. "strategy_version": decision["strategy_version"],
  163. "source_path_record_ids": path_ids,
  164. "source_evidence": {
  165. **decision["source_evidence"],
  166. "source_path_record_ids": path_ids,
  167. },
  168. "content_media_status": media_by_platform_content_id[
  169. platform_content_id
  170. ]["content_media_status"],
  171. }
  172. )
  173. return content_assets
  174. def _build_review_records(
  175. policy_run_id: str,
  176. discovered_content_items: list[dict[str, Any]],
  177. decision_by_target_id: dict[str, dict[str, Any]],
  178. media_by_platform_content_id: dict[str, dict[str, Any]],
  179. paths_by_content_id: dict[str, list[str]],
  180. ) -> list[dict[str, Any]]:
  181. review_records: list[dict[str, Any]] = []
  182. for item in discovered_content_items:
  183. platform_content_id = item["platform_content_id"]
  184. decision = decision_by_target_id[platform_content_id]
  185. if decision["decision_action"] != "KEEP_CONTENT_FOR_REVIEW":
  186. continue
  187. path_ids = paths_by_content_id[platform_content_id]
  188. review_records.append(
  189. {
  190. "platform": item["platform"],
  191. "platform_content_id": platform_content_id,
  192. "policy_run_id": policy_run_id,
  193. "content_discovery_id": item["content_discovery_id"],
  194. "review_status": "pending_review",
  195. "final_asset_status": "review_only",
  196. "decision_id": decision["decision_id"],
  197. "rule_pack_id": decision["rule_pack_id"],
  198. "rule_pack_version": decision["rule_pack_version"],
  199. "strategy_version": decision["strategy_version"],
  200. "decision_reason_code": decision["decision_reason_code"],
  201. "source_path_record_ids": path_ids,
  202. "source_evidence": {
  203. **decision["source_evidence"],
  204. "source_path_record_ids": path_ids,
  205. },
  206. "content_media_status": media_by_platform_content_id[
  207. platform_content_id
  208. ]["content_media_status"],
  209. }
  210. )
  211. return review_records
  212. def _build_reject_records(
  213. policy_run_id: str,
  214. discovered_content_items: list[dict[str, Any]],
  215. decision_by_target_id: dict[str, dict[str, Any]],
  216. ) -> list[dict[str, Any]]:
  217. reject_records: list[dict[str, Any]] = []
  218. for item in discovered_content_items:
  219. platform_content_id = item["platform_content_id"]
  220. decision = decision_by_target_id[platform_content_id]
  221. if decision["decision_action"] != "REJECT_CONTENT":
  222. continue
  223. reject_records.append(
  224. {
  225. "decision_target_id": platform_content_id,
  226. "policy_run_id": policy_run_id,
  227. "main_decision_reason_code": decision["decision_reason_code"],
  228. "decision_id": decision["decision_id"],
  229. "source_evidence": decision["source_evidence"],
  230. }
  231. )
  232. return reject_records
  233. def _build_publish_jobs(
  234. run_id: str,
  235. policy_run_id: str,
  236. content_assets: list[dict[str, Any]],
  237. ) -> list[dict[str, Any]]:
  238. jobs: list[dict[str, Any]] = []
  239. for asset in content_assets:
  240. publish_job_id = _stable_id(
  241. "publish_job",
  242. run_id,
  243. policy_run_id,
  244. asset["platform_content_id"],
  245. asset["decision_id"],
  246. )
  247. jobs.append(
  248. {
  249. "publish_job_id": publish_job_id,
  250. "platform_content_id": asset["platform_content_id"],
  251. "job_status": "created",
  252. "trigger_mode": "manual_review",
  253. "request_payload": {
  254. "run_id": run_id,
  255. "policy_run_id": policy_run_id,
  256. "content_asset": asset,
  257. "decision_id": asset["decision_id"],
  258. "source_path_record_ids": asset["source_path_record_ids"],
  259. },
  260. "response_payload": {},
  261. }
  262. )
  263. return jobs
  264. def _build_author_assets(
  265. run_id: str,
  266. policy_run_id: str,
  267. discovered_content_items: list[dict[str, Any]],
  268. decision_by_target_id: dict[str, dict[str, Any]],
  269. paths_by_content_id: dict[str, list[str]],
  270. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
  271. by_author: dict[tuple[str, str], list[dict[str, Any]]] = {}
  272. for item in discovered_content_items:
  273. author_id = item.get("platform_author_id")
  274. if not author_id:
  275. continue
  276. platform = item.get("platform", "douyin")
  277. by_author.setdefault((platform, author_id), []).append(item)
  278. summaries: list[dict[str, Any]] = []
  279. asset_rows: list[dict[str, Any]] = []
  280. role_rows: list[dict[str, Any]] = []
  281. for (platform, author_id), items in sorted(by_author.items()):
  282. decisions = [
  283. decision_by_target_id[item["platform_content_id"]]
  284. for item in items
  285. if item["platform_content_id"] in decision_by_target_id
  286. ]
  287. qualified_decisions = [
  288. decision for decision in decisions if decision["decision_action"] == "ADD_TO_CONTENT_POOL"
  289. ]
  290. sample_count = len(items)
  291. qualified_count = len(qualified_decisions)
  292. qualified_ratio = qualified_count / sample_count if sample_count else 0
  293. age_level = _best_age_level(decisions)
  294. if not _author_asset_eligible(sample_count, qualified_count, qualified_ratio, age_level):
  295. continue
  296. author_asset_id = _stable_id("author_asset", platform, author_id)
  297. source_path_record_ids = sorted(
  298. {
  299. path_id
  300. for item in items
  301. for path_id in paths_by_content_id.get(item["platform_content_id"], [])
  302. }
  303. )
  304. decision_ids = [decision["decision_id"] for decision in decisions]
  305. tags = sorted({tag for item in items for tag in item.get("tags", [])})
  306. display_name = next((item.get("author_display_name") for item in items if item.get("author_display_name")), "")
  307. source_type = (
  308. "new_discovery"
  309. if any(item.get("previous_discovery_step") == "author_work" for item in items)
  310. else "new_discovery"
  311. )
  312. evidence_refs = {
  313. "decision_ids": decision_ids,
  314. "content_discovery_ids": [item["content_discovery_id"] for item in items],
  315. "source_path_record_ids": source_path_record_ids,
  316. }
  317. profile_snapshot = {
  318. "sample_count": sample_count,
  319. "qualified_content_count": qualified_count,
  320. "qualified_content_ratio": qualified_ratio,
  321. "age_50_plus_level": age_level,
  322. }
  323. summary = {
  324. "author_asset_id": author_asset_id,
  325. "platform": platform,
  326. "platform_author_id": author_id,
  327. "author_display_name": display_name,
  328. "asset_status": "active",
  329. "roles": ["author_asset", "source_seed", "high_50plus_profile"],
  330. "eligible_as_source": True,
  331. "source_path_record_ids": source_path_record_ids,
  332. "decision_ids": decision_ids,
  333. "evidence_refs": evidence_refs,
  334. }
  335. summaries.append(summary)
  336. asset_rows.append(
  337. {
  338. "author_asset_id": author_asset_id,
  339. "platform": platform,
  340. "platform_author_id": author_id,
  341. "author_display_name": display_name,
  342. "author_profile_url": None,
  343. "asset_status": "active",
  344. "source_type": source_type,
  345. "validation_status": "rule_validated",
  346. "eligible_as_source": 1,
  347. "elderly_ratio": None,
  348. "elderly_tgi": None,
  349. "content_tags": tags,
  350. "source_run_id": run_id,
  351. "source_policy_run_id": policy_run_id,
  352. "profile_snapshot": profile_snapshot,
  353. "evidence_refs": evidence_refs,
  354. "raw_payload": {
  355. "final_output_summary": summary,
  356. "profile_snapshot": profile_snapshot,
  357. },
  358. }
  359. )
  360. role_rows.extend(
  361. {
  362. "author_asset_id": author_asset_id,
  363. "role": role,
  364. "role_status": "active",
  365. "role_reason_code": "p7_author_asset_eligible",
  366. "assigned_by": "system",
  367. "source_run_id": run_id,
  368. "raw_payload": {"evidence_refs": evidence_refs},
  369. }
  370. for role in summary["roles"]
  371. )
  372. return summaries, asset_rows, role_rows
  373. def _author_asset_eligible(
  374. sample_count: int,
  375. qualified_count: int,
  376. qualified_ratio: float,
  377. age_level: str,
  378. ) -> bool:
  379. return (
  380. sample_count >= 9
  381. and qualified_count >= 3
  382. and qualified_ratio >= 1 / 3
  383. and age_level in {"medium", "strong"}
  384. )
  385. def _best_age_level(decisions: list[dict[str, Any]]) -> str:
  386. priority = {"strong": 3, "medium": 2, "weak": 1, "missing": 0}
  387. levels = [decision.get("age_50_plus_level", "missing") for decision in decisions]
  388. return max(levels or ["missing"], key=lambda level: priority.get(level, 0))
  389. def _paths_by_content_id(source_path_records: list[dict[str, Any]]) -> dict[str, list[str]]:
  390. pattern_path_records = [
  391. path for path in source_path_records if path["source_path_type"] == "pattern_to_search_query"
  392. ]
  393. by_search_query_id = {
  394. path["to_node_id"]: path["source_path_record_id"] for path in pattern_path_records
  395. }
  396. result: dict[str, list[str]] = {}
  397. for path in source_path_records:
  398. if path["source_path_type"] == "search_query_to_content":
  399. result.setdefault(path["to_node_id"], [])
  400. pattern_source_path_record_id = by_search_query_id.get(path["from_node_id"])
  401. if pattern_source_path_record_id:
  402. result[path["to_node_id"]].append(pattern_source_path_record_id)
  403. result[path["to_node_id"]].append(path["source_path_record_id"])
  404. elif path["source_path_type"] == "decision_to_asset":
  405. result.setdefault(path["to_node_id"], []).append(path["source_path_record_id"])
  406. return result
  407. def _stable_id(prefix: str, *parts: Any) -> str:
  408. raw = ":".join(str(part) for part in parts)
  409. return f"{prefix}_{hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]}"