demand_hive_export.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. """新热事件需求写入 Hive 的行构建逻辑。"""
  2. from __future__ import annotations
  3. import hashlib
  4. from typing import Any
  5. from app.hot_content.demand_export import (
  6. ITEM_TYPE_ELEMENT,
  7. ITEM_TYPE_PHRASE,
  8. attach_wxindex_metadata,
  9. build_demand_export_rows,
  10. )
  11. from app.hot_content.demand_quality import (
  12. attach_quality_scores_to_export_rows,
  13. build_feature_combo_text,
  14. quality_passed,
  15. )
  16. TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
  17. TYPE_FEATURE_POINT = "特征点"
  18. TYPE_PHRASE = "短语"
  19. WEIGHT_DIVISOR = 1_000_000.0
  20. def _normalize_demand_key(value: str) -> str:
  21. return "".join(value.split())
  22. def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
  23. raw = f"{strategy}{demand_name}{partition_dt}"
  24. return hashlib.md5(raw.encode("utf-8")).hexdigest()
  25. def _dedupe_texts(texts: list[str]) -> list[str]:
  26. deduped: list[str] = []
  27. seen: set[str] = set()
  28. for raw in texts:
  29. text = str(raw).strip()
  30. if not text:
  31. continue
  32. keys = {text, _normalize_demand_key(text)}
  33. if keys & seen:
  34. continue
  35. seen.update(keys)
  36. deduped.append(text)
  37. return deduped
  38. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  39. scores: list[float] = []
  40. for row in export_rows:
  41. try:
  42. scores.append(float(row.get("wxindex_latest_score") or 0))
  43. except (TypeError, ValueError):
  44. continue
  45. return max(scores) if scores else 0.0
  46. def _should_retain_title(
  47. export_rows: list[dict[str, Any]],
  48. *,
  49. wxindex_threshold: float,
  50. ) -> bool:
  51. if _record_wxindex_score(export_rows) < wxindex_threshold:
  52. return False
  53. return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
  54. def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
  55. point_category = str(row.get("point_category") or "").strip()
  56. if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
  57. return False
  58. return bool(str(row.get("matched_demand") or "").strip())
  59. def _has_matched_demand(row: dict[str, Any]) -> bool:
  60. return bool(str(row.get("matched_demand") or "").strip())
  61. def _export_row_passes_quality(
  62. row: dict[str, Any],
  63. *,
  64. export_rows: list[dict[str, Any]],
  65. event_sense_json: dict[str, Any] | None,
  66. senior_fit_json: dict[str, Any] | None,
  67. event_threshold: float,
  68. senior_threshold: float,
  69. ) -> bool:
  70. item_type = str(row.get("item_type") or "")
  71. if item_type == ITEM_TYPE_PHRASE:
  72. return quality_passed(
  73. demand_type=TYPE_PHRASE,
  74. demand_text=str(row.get("item_text") or "").strip(),
  75. event_sense_json=event_sense_json,
  76. senior_fit_json=senior_fit_json,
  77. event_threshold=event_threshold,
  78. senior_threshold=senior_threshold,
  79. )
  80. if item_type == ITEM_TYPE_ELEMENT and _has_matched_demand(row):
  81. feature_combo = build_feature_combo_text(export_rows)
  82. if not feature_combo:
  83. return False
  84. return quality_passed(
  85. demand_type=TYPE_FEATURE_POINT,
  86. demand_text=feature_combo,
  87. event_sense_json=event_sense_json,
  88. senior_fit_json=senior_fit_json,
  89. event_threshold=event_threshold,
  90. senior_threshold=senior_threshold,
  91. )
  92. return False
  93. def is_export_row_as_demand(
  94. row: dict[str, Any],
  95. export_rows: list[dict[str, Any]],
  96. *,
  97. wxindex_threshold: float,
  98. event_sense_json: dict[str, Any] | None = None,
  99. senior_fit_json: dict[str, Any] | None = None,
  100. event_threshold: float = 0.0,
  101. senior_threshold: float = 0.0,
  102. ) -> int:
  103. """是否与 ODPS 需求同步规则一致:标题保留、质量达标且该行有匹配需求。返回 0/1。"""
  104. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  105. return 0
  106. if not _has_matched_demand(row):
  107. return 0
  108. if event_sense_json is not None or senior_fit_json is not None:
  109. if not _export_row_passes_quality(
  110. row,
  111. export_rows=export_rows,
  112. event_sense_json=event_sense_json,
  113. senior_fit_json=senior_fit_json,
  114. event_threshold=event_threshold,
  115. senior_threshold=senior_threshold,
  116. ):
  117. return 0
  118. return 1
  119. def _build_export_rows_for_record(
  120. record: dict[str, Any],
  121. *,
  122. wxindex_threshold: float,
  123. event_sense_json: dict[str, Any] | None,
  124. senior_fit_json: dict[str, Any] | None,
  125. event_threshold: float,
  126. senior_threshold: float,
  127. ) -> list[dict[str, Any]]:
  128. match_result = record.get("contribution_demand_match_json")
  129. if not isinstance(match_result, dict):
  130. return []
  131. contribution_points = record.get("contribution_points_json")
  132. trend_json = record.get("wxindex_trend_json")
  133. export_rows = attach_wxindex_metadata(
  134. build_demand_export_rows(
  135. match_result,
  136. contribution_points=(
  137. contribution_points if isinstance(contribution_points, dict) else None
  138. ),
  139. trend_json=trend_json if isinstance(trend_json, dict) else None,
  140. ),
  141. trend_json if isinstance(trend_json, dict) else None,
  142. wxindex_threshold=wxindex_threshold,
  143. event_sense_json=event_sense_json,
  144. senior_fit_json=senior_fit_json,
  145. event_threshold=event_threshold,
  146. senior_threshold=senior_threshold,
  147. )
  148. return attach_quality_scores_to_export_rows(
  149. export_rows,
  150. event_sense_json=event_sense_json,
  151. senior_fit_json=senior_fit_json,
  152. )
  153. def build_hive_rows_for_record(
  154. export_rows: list[dict[str, Any]],
  155. *,
  156. record_id: int,
  157. strategy: str,
  158. partition_dt: str,
  159. wxindex_threshold: float,
  160. event_sense_json: dict[str, Any] | None = None,
  161. senior_fit_json: dict[str, Any] | None = None,
  162. event_threshold: float = 0.0,
  163. senior_threshold: float = 0.0,
  164. ) -> list[dict[str, Any]]:
  165. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  166. return []
  167. weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
  168. feature_combo = build_feature_combo_text(export_rows)
  169. phrase_texts = _dedupe_texts(
  170. [
  171. str(row.get("item_text") or "").strip()
  172. for row in export_rows
  173. if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
  174. and _has_matched_demand(row)
  175. and quality_passed(
  176. demand_type=TYPE_PHRASE,
  177. demand_text=str(row.get("item_text") or "").strip(),
  178. event_sense_json=event_sense_json,
  179. senior_fit_json=senior_fit_json,
  180. event_threshold=event_threshold,
  181. senior_threshold=senior_threshold,
  182. )
  183. ]
  184. )
  185. hive_rows: list[dict[str, Any]] = []
  186. if feature_combo and quality_passed(
  187. demand_type=TYPE_FEATURE_POINT,
  188. demand_text=feature_combo,
  189. event_sense_json=event_sense_json,
  190. senior_fit_json=senior_fit_json,
  191. event_threshold=event_threshold,
  192. senior_threshold=senior_threshold,
  193. ):
  194. hive_rows.append(
  195. _build_hive_row(
  196. record_id=record_id,
  197. strategy=strategy,
  198. demand_name=feature_combo,
  199. weight=weight,
  200. demand_type=TYPE_FEATURE_POINT,
  201. partition_dt=partition_dt,
  202. )
  203. )
  204. for phrase_text in phrase_texts:
  205. hive_rows.append(
  206. _build_hive_row(
  207. record_id=record_id,
  208. strategy=strategy,
  209. demand_name=phrase_text,
  210. weight=weight,
  211. demand_type=TYPE_PHRASE,
  212. partition_dt=partition_dt,
  213. )
  214. )
  215. return hive_rows
  216. def build_hive_rows_for_odps_record(
  217. record: dict[str, Any],
  218. *,
  219. strategy: str,
  220. partition_dt: str,
  221. wxindex_threshold: float,
  222. event_threshold: float,
  223. senior_threshold: float,
  224. ) -> list[dict[str, Any]]:
  225. event_sense_json = record.get("demand_event_sense_json")
  226. senior_fit_json = record.get("demand_senior_fit_json")
  227. if not isinstance(event_sense_json, dict):
  228. event_sense_json = {}
  229. if not isinstance(senior_fit_json, dict):
  230. senior_fit_json = {}
  231. export_rows = _build_export_rows_for_record(
  232. record,
  233. wxindex_threshold=wxindex_threshold,
  234. event_sense_json=event_sense_json,
  235. senior_fit_json=senior_fit_json,
  236. event_threshold=event_threshold,
  237. senior_threshold=senior_threshold,
  238. )
  239. return build_hive_rows_for_record(
  240. export_rows,
  241. record_id=int(record.get("id") or 0),
  242. strategy=strategy,
  243. partition_dt=partition_dt,
  244. wxindex_threshold=wxindex_threshold,
  245. event_sense_json=event_sense_json,
  246. senior_fit_json=senior_fit_json,
  247. event_threshold=event_threshold,
  248. senior_threshold=senior_threshold,
  249. )
  250. def build_hive_rows_from_export_groups(
  251. export_groups: list[dict[str, Any]],
  252. *,
  253. strategy: str,
  254. partition_dt: str,
  255. wxindex_threshold: float,
  256. event_threshold: float = 0.0,
  257. senior_threshold: float = 0.0,
  258. ) -> list[dict[str, Any]]:
  259. rows: list[dict[str, Any]] = []
  260. seen_demand_ids: set[str] = set()
  261. for group in export_groups:
  262. export_rows = group.get("export_rows") or []
  263. if not isinstance(export_rows, list):
  264. continue
  265. record_id = int(group.get("record_id") or 0)
  266. if record_id <= 0:
  267. continue
  268. event_sense_json = group.get("demand_event_sense_json")
  269. senior_fit_json = group.get("demand_senior_fit_json")
  270. for hive_row in build_hive_rows_for_record(
  271. export_rows,
  272. record_id=record_id,
  273. strategy=strategy,
  274. partition_dt=partition_dt,
  275. wxindex_threshold=wxindex_threshold,
  276. event_sense_json=event_sense_json if isinstance(event_sense_json, dict) else None,
  277. senior_fit_json=senior_fit_json if isinstance(senior_fit_json, dict) else None,
  278. event_threshold=event_threshold,
  279. senior_threshold=senior_threshold,
  280. ):
  281. demand_id = str(hive_row["demand_id"])
  282. if demand_id in seen_demand_ids:
  283. continue
  284. seen_demand_ids.add(demand_id)
  285. rows.append(hive_row)
  286. return rows
  287. def build_hive_rows_from_odps_records(
  288. records: list[dict[str, Any]],
  289. *,
  290. strategy: str,
  291. partition_dt: str,
  292. wxindex_threshold: float,
  293. event_threshold: float,
  294. senior_threshold: float,
  295. ) -> list[dict[str, Any]]:
  296. rows: list[dict[str, Any]] = []
  297. seen_demand_ids: set[str] = set()
  298. for record in records:
  299. record_id = int(record.get("id") or 0)
  300. if record_id <= 0:
  301. continue
  302. for hive_row in build_hive_rows_for_odps_record(
  303. record,
  304. strategy=strategy,
  305. partition_dt=partition_dt,
  306. wxindex_threshold=wxindex_threshold,
  307. event_threshold=event_threshold,
  308. senior_threshold=senior_threshold,
  309. ):
  310. demand_id = str(hive_row["demand_id"])
  311. if demand_id in seen_demand_ids:
  312. continue
  313. seen_demand_ids.add(demand_id)
  314. rows.append(hive_row)
  315. return rows
  316. def _build_hive_row(
  317. *,
  318. record_id: int,
  319. strategy: str,
  320. demand_name: str,
  321. weight: float,
  322. demand_type: str,
  323. partition_dt: str,
  324. ) -> dict[str, Any]:
  325. normalized_name = demand_name.strip()
  326. return {
  327. "record_id": record_id,
  328. "strategy": strategy,
  329. "demand_id": build_demand_id(
  330. strategy=strategy,
  331. demand_name=normalized_name,
  332. partition_dt=partition_dt,
  333. ),
  334. "demand_name": normalized_name,
  335. "weight": weight,
  336. "type": demand_type,
  337. "video_count": None,
  338. "video_list": [],
  339. "extend": "{}",
  340. "dt": partition_dt,
  341. }