demand_hive_export.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """近期热点需求写入 Hive 的行构建逻辑。"""
  2. from __future__ import annotations
  3. import hashlib
  4. from typing import Any
  5. from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
  6. TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
  7. TYPE_FEATURE_POINT = "特征点"
  8. TYPE_PHRASE = "短语"
  9. WEIGHT_DIVISOR = 1_000_000.0
  10. def _normalize_demand_key(value: str) -> str:
  11. return "".join(value.split())
  12. def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
  13. raw = f"{strategy}{demand_name}{partition_dt}"
  14. return hashlib.md5(raw.encode("utf-8")).hexdigest()
  15. def _dedupe_texts(texts: list[str]) -> list[str]:
  16. deduped: list[str] = []
  17. seen: set[str] = set()
  18. for raw in texts:
  19. text = str(raw).strip()
  20. if not text:
  21. continue
  22. keys = {text, _normalize_demand_key(text)}
  23. if keys & seen:
  24. continue
  25. seen.update(keys)
  26. deduped.append(text)
  27. return deduped
  28. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  29. scores: list[float] = []
  30. for row in export_rows:
  31. try:
  32. scores.append(float(row.get("wxindex_latest_score") or 0))
  33. except (TypeError, ValueError):
  34. continue
  35. return max(scores) if scores else 0.0
  36. def _should_retain_title(
  37. export_rows: list[dict[str, Any]],
  38. *,
  39. wxindex_threshold: float,
  40. ) -> bool:
  41. if _record_wxindex_score(export_rows) < wxindex_threshold:
  42. return False
  43. return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
  44. def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
  45. point_category = str(row.get("point_category") or "").strip()
  46. if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
  47. return False
  48. return bool(str(row.get("matched_demand") or "").strip())
  49. def _has_matched_demand(row: dict[str, Any]) -> bool:
  50. return bool(str(row.get("matched_demand") or "").strip())
  51. def build_hive_rows_for_record(
  52. export_rows: list[dict[str, Any]],
  53. *,
  54. record_id: int,
  55. strategy: str,
  56. partition_dt: str,
  57. wxindex_threshold: float,
  58. ) -> list[dict[str, Any]]:
  59. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  60. return []
  61. weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
  62. element_texts = _dedupe_texts(
  63. [
  64. str(row.get("item_text") or "").strip()
  65. for row in export_rows
  66. if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row)
  67. ]
  68. )
  69. phrase_texts = _dedupe_texts(
  70. [
  71. str(row.get("item_text") or "").strip()
  72. for row in export_rows
  73. if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
  74. and _has_matched_demand(row)
  75. ]
  76. )
  77. hive_rows: list[dict[str, Any]] = []
  78. if element_texts:
  79. demand_name = " ".join(element_texts)
  80. hive_rows.append(
  81. _build_hive_row(
  82. record_id=record_id,
  83. strategy=strategy,
  84. demand_name=demand_name,
  85. weight=weight,
  86. demand_type=TYPE_FEATURE_POINT,
  87. partition_dt=partition_dt,
  88. )
  89. )
  90. for phrase_text in phrase_texts:
  91. hive_rows.append(
  92. _build_hive_row(
  93. record_id=record_id,
  94. strategy=strategy,
  95. demand_name=phrase_text,
  96. weight=weight,
  97. demand_type=TYPE_PHRASE,
  98. partition_dt=partition_dt,
  99. )
  100. )
  101. return hive_rows
  102. def build_hive_rows_from_export_groups(
  103. export_groups: list[dict[str, Any]],
  104. *,
  105. strategy: str,
  106. partition_dt: str,
  107. wxindex_threshold: float,
  108. ) -> list[dict[str, Any]]:
  109. rows: list[dict[str, Any]] = []
  110. seen_demand_ids: set[str] = set()
  111. for group in export_groups:
  112. export_rows = group.get("export_rows") or []
  113. if not isinstance(export_rows, list):
  114. continue
  115. record_id = int(group.get("record_id") or 0)
  116. if record_id <= 0:
  117. continue
  118. for hive_row in build_hive_rows_for_record(
  119. export_rows,
  120. record_id=record_id,
  121. strategy=strategy,
  122. partition_dt=partition_dt,
  123. wxindex_threshold=wxindex_threshold,
  124. ):
  125. demand_id = str(hive_row["demand_id"])
  126. if demand_id in seen_demand_ids:
  127. continue
  128. seen_demand_ids.add(demand_id)
  129. rows.append(hive_row)
  130. return rows
  131. def _build_hive_row(
  132. *,
  133. record_id: int,
  134. strategy: str,
  135. demand_name: str,
  136. weight: float,
  137. demand_type: str,
  138. partition_dt: str,
  139. ) -> dict[str, Any]:
  140. normalized_name = demand_name.strip()
  141. return {
  142. "record_id": record_id,
  143. "strategy": strategy,
  144. "demand_id": build_demand_id(
  145. strategy=strategy,
  146. demand_name=normalized_name,
  147. partition_dt=partition_dt,
  148. ),
  149. "demand_name": normalized_name,
  150. "weight": weight,
  151. "type": demand_type,
  152. "video_count": None,
  153. "video_list": [],
  154. "extend": "{}",
  155. "dt": partition_dt,
  156. }