demand_hive_export.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. """新热事件需求写入 Hive 的行构建逻辑。"""
  2. from __future__ import annotations
  3. import hashlib
  4. from typing import Any
  5. from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
  6. TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
  7. TYPE_FEATURE_POINT = "特征点"
  8. TYPE_PHRASE = "短语"
  9. WEIGHT_DIVISOR = 1_000_000.0
  10. def _normalize_demand_key(value: str) -> str:
  11. return "".join(value.split())
  12. def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
  13. raw = f"{strategy}{demand_name}{partition_dt}"
  14. return hashlib.md5(raw.encode("utf-8")).hexdigest()
  15. def _dedupe_texts(texts: list[str]) -> list[str]:
  16. deduped: list[str] = []
  17. seen: set[str] = set()
  18. for raw in texts:
  19. text = str(raw).strip()
  20. if not text:
  21. continue
  22. keys = {text, _normalize_demand_key(text)}
  23. if keys & seen:
  24. continue
  25. seen.update(keys)
  26. deduped.append(text)
  27. return deduped
  28. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  29. scores: list[float] = []
  30. for row in export_rows:
  31. try:
  32. scores.append(float(row.get("wxindex_latest_score") or 0))
  33. except (TypeError, ValueError):
  34. continue
  35. return max(scores) if scores else 0.0
  36. def _should_retain_title(
  37. export_rows: list[dict[str, Any]],
  38. *,
  39. wxindex_threshold: float,
  40. ) -> bool:
  41. if _record_wxindex_score(export_rows) < wxindex_threshold:
  42. return False
  43. return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
  44. def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
  45. point_category = str(row.get("point_category") or "").strip()
  46. if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
  47. return False
  48. return bool(str(row.get("matched_demand") or "").strip())
  49. def _has_matched_demand(row: dict[str, Any]) -> bool:
  50. return bool(str(row.get("matched_demand") or "").strip())
  51. def is_export_row_as_demand(
  52. row: dict[str, Any],
  53. export_rows: list[dict[str, Any]],
  54. *,
  55. wxindex_threshold: float,
  56. ) -> int:
  57. """是否与 ODPS 需求同步规则一致:标题保留且该行有匹配需求。返回 0/1。"""
  58. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  59. return 0
  60. return 1 if _has_matched_demand(row) else 0
  61. def build_hive_rows_for_record(
  62. export_rows: list[dict[str, Any]],
  63. *,
  64. record_id: int,
  65. strategy: str,
  66. partition_dt: str,
  67. wxindex_threshold: float,
  68. ) -> list[dict[str, Any]]:
  69. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  70. return []
  71. weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
  72. element_texts = _dedupe_texts(
  73. [
  74. str(row.get("item_text") or "").strip()
  75. for row in export_rows
  76. if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row)
  77. ]
  78. )
  79. phrase_texts = _dedupe_texts(
  80. [
  81. str(row.get("item_text") or "").strip()
  82. for row in export_rows
  83. if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
  84. and _has_matched_demand(row)
  85. ]
  86. )
  87. hive_rows: list[dict[str, Any]] = []
  88. if element_texts:
  89. demand_name = " ".join(element_texts)
  90. hive_rows.append(
  91. _build_hive_row(
  92. record_id=record_id,
  93. strategy=strategy,
  94. demand_name=demand_name,
  95. weight=weight,
  96. demand_type=TYPE_FEATURE_POINT,
  97. partition_dt=partition_dt,
  98. )
  99. )
  100. for phrase_text in phrase_texts:
  101. hive_rows.append(
  102. _build_hive_row(
  103. record_id=record_id,
  104. strategy=strategy,
  105. demand_name=phrase_text,
  106. weight=weight,
  107. demand_type=TYPE_PHRASE,
  108. partition_dt=partition_dt,
  109. )
  110. )
  111. return hive_rows
  112. def build_hive_rows_from_export_groups(
  113. export_groups: list[dict[str, Any]],
  114. *,
  115. strategy: str,
  116. partition_dt: str,
  117. wxindex_threshold: float,
  118. ) -> list[dict[str, Any]]:
  119. rows: list[dict[str, Any]] = []
  120. seen_demand_ids: set[str] = set()
  121. for group in export_groups:
  122. export_rows = group.get("export_rows") or []
  123. if not isinstance(export_rows, list):
  124. continue
  125. record_id = int(group.get("record_id") or 0)
  126. if record_id <= 0:
  127. continue
  128. for hive_row in build_hive_rows_for_record(
  129. export_rows,
  130. record_id=record_id,
  131. strategy=strategy,
  132. partition_dt=partition_dt,
  133. wxindex_threshold=wxindex_threshold,
  134. ):
  135. demand_id = str(hive_row["demand_id"])
  136. if demand_id in seen_demand_ids:
  137. continue
  138. seen_demand_ids.add(demand_id)
  139. rows.append(hive_row)
  140. return rows
  141. def _build_hive_row(
  142. *,
  143. record_id: int,
  144. strategy: str,
  145. demand_name: str,
  146. weight: float,
  147. demand_type: str,
  148. partition_dt: str,
  149. ) -> dict[str, Any]:
  150. normalized_name = demand_name.strip()
  151. return {
  152. "record_id": record_id,
  153. "strategy": strategy,
  154. "demand_id": build_demand_id(
  155. strategy=strategy,
  156. demand_name=normalized_name,
  157. partition_dt=partition_dt,
  158. ),
  159. "demand_name": normalized_name,
  160. "weight": weight,
  161. "type": demand_type,
  162. "video_count": None,
  163. "video_list": [],
  164. "extend": "{}",
  165. "dt": partition_dt,
  166. }