demand_hive_export.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. """新热事件需求写入 Hive 的行构建逻辑。"""
  2. from __future__ import annotations
  3. import hashlib
  4. from typing import Any
  5. from app.hot_content.demand_export import (
  6. ITEM_TYPE_ELEMENT,
  7. ITEM_TYPE_PHRASE,
  8. attach_wxindex_metadata,
  9. build_demand_export_rows,
  10. )
  11. from app.hot_content.demand_quality import (
  12. attach_quality_scores_to_export_rows,
  13. build_feature_combo_text,
  14. build_matched_element_texts,
  15. quality_passed,
  16. )
  17. TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
  18. TYPE_FEATURE_POINT = "特征点"
  19. TYPE_PHRASE = "短语"
  20. WEIGHT_DIVISOR = 1_000_000.0
  21. def _normalize_demand_key(value: str) -> str:
  22. return "".join(value.split())
  23. def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
  24. raw = f"{strategy}{demand_name}{partition_dt}"
  25. return hashlib.md5(raw.encode("utf-8")).hexdigest()
  26. def _dedupe_texts(texts: list[str]) -> list[str]:
  27. deduped: list[str] = []
  28. seen: set[str] = set()
  29. for raw in texts:
  30. text = str(raw).strip()
  31. if not text:
  32. continue
  33. keys = {text, _normalize_demand_key(text)}
  34. if keys & seen:
  35. continue
  36. seen.update(keys)
  37. deduped.append(text)
  38. return deduped
  39. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  40. scores: list[float] = []
  41. for row in export_rows:
  42. try:
  43. scores.append(float(row.get("wxindex_latest_score") or 0))
  44. except (TypeError, ValueError):
  45. continue
  46. return max(scores) if scores else 0.0
  47. def _should_retain_title(
  48. export_rows: list[dict[str, Any]],
  49. *,
  50. wxindex_threshold: float,
  51. ) -> bool:
  52. if _record_wxindex_score(export_rows) < wxindex_threshold:
  53. return False
  54. return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
  55. def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
  56. point_category = str(row.get("point_category") or "").strip()
  57. if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
  58. return False
  59. return bool(str(row.get("matched_demand") or "").strip())
  60. def _has_matched_demand(row: dict[str, Any]) -> bool:
  61. return bool(str(row.get("matched_demand") or "").strip())
  62. def _export_row_passes_quality(
  63. row: dict[str, Any],
  64. *,
  65. export_rows: list[dict[str, Any]],
  66. event_sense_json: dict[str, Any] | None,
  67. senior_fit_json: dict[str, Any] | None,
  68. event_threshold: float,
  69. senior_threshold: float,
  70. ) -> bool:
  71. item_type = str(row.get("item_type") or "")
  72. if item_type == ITEM_TYPE_PHRASE:
  73. return quality_passed(
  74. demand_type=TYPE_PHRASE,
  75. demand_text=str(row.get("item_text") or "").strip(),
  76. event_sense_json=event_sense_json,
  77. senior_fit_json=senior_fit_json,
  78. event_threshold=event_threshold,
  79. senior_threshold=senior_threshold,
  80. )
  81. if item_type == ITEM_TYPE_ELEMENT and _has_matched_demand(row):
  82. element_text = str(row.get("item_text") or "").strip()
  83. if not element_text:
  84. return False
  85. return quality_passed(
  86. demand_type=TYPE_FEATURE_POINT,
  87. demand_text=element_text,
  88. event_sense_json=event_sense_json,
  89. senior_fit_json=senior_fit_json,
  90. event_threshold=event_threshold,
  91. senior_threshold=senior_threshold,
  92. )
  93. return False
  94. def is_export_row_as_demand(
  95. row: dict[str, Any],
  96. export_rows: list[dict[str, Any]],
  97. *,
  98. wxindex_threshold: float,
  99. event_sense_json: dict[str, Any] | None = None,
  100. senior_fit_json: dict[str, Any] | None = None,
  101. event_threshold: float = 0.0,
  102. senior_threshold: float = 0.0,
  103. ) -> int:
  104. """是否与 ODPS 需求同步规则一致:标题保留、质量达标且该行有匹配需求。返回 0/1。"""
  105. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  106. return 0
  107. if not _has_matched_demand(row):
  108. return 0
  109. if event_sense_json is not None or senior_fit_json is not None:
  110. if not _export_row_passes_quality(
  111. row,
  112. export_rows=export_rows,
  113. event_sense_json=event_sense_json,
  114. senior_fit_json=senior_fit_json,
  115. event_threshold=event_threshold,
  116. senior_threshold=senior_threshold,
  117. ):
  118. return 0
  119. return 1
  120. def _build_export_rows_for_record(
  121. record: dict[str, Any],
  122. *,
  123. wxindex_threshold: float,
  124. event_sense_json: dict[str, Any] | None,
  125. senior_fit_json: dict[str, Any] | None,
  126. event_threshold: float,
  127. senior_threshold: float,
  128. ) -> list[dict[str, Any]]:
  129. match_result = record.get("contribution_demand_match_json")
  130. if not isinstance(match_result, dict):
  131. return []
  132. contribution_points = record.get("contribution_points_json")
  133. trend_json = record.get("wxindex_trend_json")
  134. export_rows = attach_wxindex_metadata(
  135. build_demand_export_rows(
  136. match_result,
  137. contribution_points=(
  138. contribution_points if isinstance(contribution_points, dict) else None
  139. ),
  140. trend_json=trend_json if isinstance(trend_json, dict) else None,
  141. ),
  142. trend_json if isinstance(trend_json, dict) else None,
  143. wxindex_threshold=wxindex_threshold,
  144. event_sense_json=event_sense_json,
  145. senior_fit_json=senior_fit_json,
  146. event_threshold=event_threshold,
  147. senior_threshold=senior_threshold,
  148. )
  149. return attach_quality_scores_to_export_rows(
  150. export_rows,
  151. event_sense_json=event_sense_json,
  152. senior_fit_json=senior_fit_json,
  153. )
  154. def build_hive_rows_for_record(
  155. export_rows: list[dict[str, Any]],
  156. *,
  157. record_id: int,
  158. strategy: str,
  159. partition_dt: str,
  160. wxindex_threshold: float,
  161. event_sense_json: dict[str, Any] | None = None,
  162. senior_fit_json: dict[str, Any] | None = None,
  163. event_threshold: float = 0.0,
  164. senior_threshold: float = 0.0,
  165. ) -> list[dict[str, Any]]:
  166. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  167. return []
  168. weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
  169. feature_combo = build_feature_combo_text(export_rows)
  170. element_texts = build_matched_element_texts(export_rows)
  171. phrase_texts = _dedupe_texts(
  172. [
  173. str(row.get("item_text") or "").strip()
  174. for row in export_rows
  175. if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
  176. and _has_matched_demand(row)
  177. and quality_passed(
  178. demand_type=TYPE_PHRASE,
  179. demand_text=str(row.get("item_text") or "").strip(),
  180. event_sense_json=event_sense_json,
  181. senior_fit_json=senior_fit_json,
  182. event_threshold=event_threshold,
  183. senior_threshold=senior_threshold,
  184. )
  185. ]
  186. )
  187. hive_rows: list[dict[str, Any]] = []
  188. if feature_combo and quality_passed(
  189. demand_type=TYPE_FEATURE_POINT,
  190. demand_text=feature_combo,
  191. event_sense_json=event_sense_json,
  192. senior_fit_json=senior_fit_json,
  193. event_threshold=event_threshold,
  194. senior_threshold=senior_threshold,
  195. ):
  196. hive_rows.append(
  197. _build_hive_row(
  198. record_id=record_id,
  199. strategy=strategy,
  200. demand_name=feature_combo,
  201. weight=weight,
  202. demand_type=TYPE_FEATURE_POINT,
  203. partition_dt=partition_dt,
  204. )
  205. )
  206. for element_text in element_texts:
  207. if quality_passed(
  208. demand_type=TYPE_FEATURE_POINT,
  209. demand_text=element_text,
  210. event_sense_json=event_sense_json,
  211. senior_fit_json=senior_fit_json,
  212. event_threshold=event_threshold,
  213. senior_threshold=senior_threshold,
  214. ):
  215. hive_rows.append(
  216. _build_hive_row(
  217. record_id=record_id,
  218. strategy=strategy,
  219. demand_name=element_text,
  220. weight=weight,
  221. demand_type=TYPE_FEATURE_POINT,
  222. partition_dt=partition_dt,
  223. )
  224. )
  225. for phrase_text in phrase_texts:
  226. hive_rows.append(
  227. _build_hive_row(
  228. record_id=record_id,
  229. strategy=strategy,
  230. demand_name=phrase_text,
  231. weight=weight,
  232. demand_type=TYPE_PHRASE,
  233. partition_dt=partition_dt,
  234. )
  235. )
  236. return hive_rows
  237. def build_hive_rows_for_odps_record(
  238. record: dict[str, Any],
  239. *,
  240. strategy: str,
  241. partition_dt: str,
  242. wxindex_threshold: float,
  243. event_threshold: float,
  244. senior_threshold: float,
  245. ) -> list[dict[str, Any]]:
  246. event_sense_json = record.get("demand_event_sense_json")
  247. senior_fit_json = record.get("demand_senior_fit_json")
  248. if not isinstance(event_sense_json, dict):
  249. event_sense_json = {}
  250. if not isinstance(senior_fit_json, dict):
  251. senior_fit_json = {}
  252. export_rows = _build_export_rows_for_record(
  253. record,
  254. wxindex_threshold=wxindex_threshold,
  255. event_sense_json=event_sense_json,
  256. senior_fit_json=senior_fit_json,
  257. event_threshold=event_threshold,
  258. senior_threshold=senior_threshold,
  259. )
  260. return build_hive_rows_for_record(
  261. export_rows,
  262. record_id=int(record.get("id") or 0),
  263. strategy=strategy,
  264. partition_dt=partition_dt,
  265. wxindex_threshold=wxindex_threshold,
  266. event_sense_json=event_sense_json,
  267. senior_fit_json=senior_fit_json,
  268. event_threshold=event_threshold,
  269. senior_threshold=senior_threshold,
  270. )
  271. def build_hive_rows_from_export_groups(
  272. export_groups: list[dict[str, Any]],
  273. *,
  274. strategy: str,
  275. partition_dt: str,
  276. wxindex_threshold: float,
  277. event_threshold: float = 0.0,
  278. senior_threshold: float = 0.0,
  279. ) -> list[dict[str, Any]]:
  280. rows: list[dict[str, Any]] = []
  281. seen_demand_ids: set[str] = set()
  282. for group in export_groups:
  283. export_rows = group.get("export_rows") or []
  284. if not isinstance(export_rows, list):
  285. continue
  286. record_id = int(group.get("record_id") or 0)
  287. if record_id <= 0:
  288. continue
  289. event_sense_json = group.get("demand_event_sense_json")
  290. senior_fit_json = group.get("demand_senior_fit_json")
  291. for hive_row in build_hive_rows_for_record(
  292. export_rows,
  293. record_id=record_id,
  294. strategy=strategy,
  295. partition_dt=partition_dt,
  296. wxindex_threshold=wxindex_threshold,
  297. event_sense_json=event_sense_json if isinstance(event_sense_json, dict) else None,
  298. senior_fit_json=senior_fit_json if isinstance(senior_fit_json, dict) else None,
  299. event_threshold=event_threshold,
  300. senior_threshold=senior_threshold,
  301. ):
  302. demand_id = str(hive_row["demand_id"])
  303. if demand_id in seen_demand_ids:
  304. continue
  305. seen_demand_ids.add(demand_id)
  306. rows.append(hive_row)
  307. return rows
  308. def build_hive_rows_from_odps_records(
  309. records: list[dict[str, Any]],
  310. *,
  311. strategy: str,
  312. partition_dt: str,
  313. wxindex_threshold: float,
  314. event_threshold: float,
  315. senior_threshold: float,
  316. ) -> list[dict[str, Any]]:
  317. rows: list[dict[str, Any]] = []
  318. seen_demand_ids: set[str] = set()
  319. for record in records:
  320. record_id = int(record.get("id") or 0)
  321. if record_id <= 0:
  322. continue
  323. for hive_row in build_hive_rows_for_odps_record(
  324. record,
  325. strategy=strategy,
  326. partition_dt=partition_dt,
  327. wxindex_threshold=wxindex_threshold,
  328. event_threshold=event_threshold,
  329. senior_threshold=senior_threshold,
  330. ):
  331. demand_id = str(hive_row["demand_id"])
  332. if demand_id in seen_demand_ids:
  333. continue
  334. seen_demand_ids.add(demand_id)
  335. rows.append(hive_row)
  336. return rows
  337. def _build_hive_row(
  338. *,
  339. record_id: int,
  340. strategy: str,
  341. demand_name: str,
  342. weight: float,
  343. demand_type: str,
  344. partition_dt: str,
  345. ) -> dict[str, Any]:
  346. normalized_name = demand_name.strip()
  347. return {
  348. "record_id": record_id,
  349. "strategy": strategy,
  350. "demand_id": build_demand_id(
  351. strategy=strategy,
  352. demand_name=normalized_name,
  353. partition_dt=partition_dt,
  354. ),
  355. "demand_name": normalized_name,
  356. "weight": weight,
  357. "type": demand_type,
  358. "video_count": None,
  359. "video_list": [],
  360. "extend": "{}",
  361. "dt": partition_dt,
  362. }