demand_hive_export.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """近期热点需求写入 Hive 的行构建逻辑。"""
  2. from __future__ import annotations
  3. import hashlib
  4. from typing import Any
  5. from app.hot_content.demand_export import ITEM_TYPE_ELEMENT, ITEM_TYPE_PHRASE
  6. TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
  7. TYPE_FEATURE_POINT = "特征点"
  8. TYPE_PHRASE = "短语"
  9. WEIGHT_DIVISOR = 1_000_000.0
  10. def _normalize_demand_key(value: str) -> str:
  11. return "".join(value.split())
  12. def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
  13. raw = f"{strategy}{demand_name}{partition_dt}"
  14. return hashlib.md5(raw.encode("utf-8")).hexdigest()
  15. def _dedupe_texts(texts: list[str]) -> list[str]:
  16. deduped: list[str] = []
  17. seen: set[str] = set()
  18. for raw in texts:
  19. text = str(raw).strip()
  20. if not text:
  21. continue
  22. keys = {text, _normalize_demand_key(text)}
  23. if keys & seen:
  24. continue
  25. seen.update(keys)
  26. deduped.append(text)
  27. return deduped
  28. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  29. scores: list[float] = []
  30. for row in export_rows:
  31. try:
  32. scores.append(float(row.get("wxindex_latest_score") or 0))
  33. except (TypeError, ValueError):
  34. continue
  35. return max(scores) if scores else 0.0
  36. def _should_retain_title(
  37. export_rows: list[dict[str, Any]],
  38. *,
  39. wxindex_threshold: float,
  40. ) -> bool:
  41. if _record_wxindex_score(export_rows) < wxindex_threshold:
  42. return False
  43. return any(_has_inspiration_or_goal_demand_match(row) for row in export_rows)
  44. def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
  45. point_category = str(row.get("point_category") or "").strip()
  46. if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
  47. return False
  48. return bool(str(row.get("matched_demand") or "").strip())
  49. def _has_matched_demand(row: dict[str, Any]) -> bool:
  50. return bool(str(row.get("matched_demand") or "").strip())
  51. def build_hive_rows_for_record(
  52. export_rows: list[dict[str, Any]],
  53. *,
  54. strategy: str,
  55. partition_dt: str,
  56. wxindex_threshold: float,
  57. ) -> list[dict[str, Any]]:
  58. if not _should_retain_title(export_rows, wxindex_threshold=wxindex_threshold):
  59. return []
  60. weight = _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
  61. element_texts = _dedupe_texts(
  62. [
  63. str(row.get("item_text") or "").strip()
  64. for row in export_rows
  65. if str(row.get("item_type") or "") == ITEM_TYPE_ELEMENT and _has_matched_demand(row)
  66. ]
  67. )
  68. phrase_texts = _dedupe_texts(
  69. [
  70. str(row.get("item_text") or "").strip()
  71. for row in export_rows
  72. if str(row.get("item_type") or "") == ITEM_TYPE_PHRASE
  73. and str(row.get("item_text") or "").strip()
  74. ]
  75. )
  76. hive_rows: list[dict[str, Any]] = []
  77. if element_texts:
  78. demand_name = " ".join(element_texts)
  79. hive_rows.append(
  80. _build_hive_row(
  81. strategy=strategy,
  82. demand_name=demand_name,
  83. weight=weight,
  84. demand_type=TYPE_FEATURE_POINT,
  85. partition_dt=partition_dt,
  86. )
  87. )
  88. for phrase_text in phrase_texts:
  89. hive_rows.append(
  90. _build_hive_row(
  91. strategy=strategy,
  92. demand_name=phrase_text,
  93. weight=weight,
  94. demand_type=TYPE_PHRASE,
  95. partition_dt=partition_dt,
  96. )
  97. )
  98. return hive_rows
  99. def build_hive_rows_from_export_groups(
  100. export_groups: list[dict[str, Any]],
  101. *,
  102. strategy: str,
  103. partition_dt: str,
  104. wxindex_threshold: float,
  105. ) -> list[dict[str, Any]]:
  106. rows: list[dict[str, Any]] = []
  107. seen_demand_ids: set[str] = set()
  108. for group in export_groups:
  109. export_rows = group.get("export_rows") or []
  110. if not isinstance(export_rows, list):
  111. continue
  112. for hive_row in build_hive_rows_for_record(
  113. export_rows,
  114. strategy=strategy,
  115. partition_dt=partition_dt,
  116. wxindex_threshold=wxindex_threshold,
  117. ):
  118. demand_id = str(hive_row["demand_id"])
  119. if demand_id in seen_demand_ids:
  120. continue
  121. seen_demand_ids.add(demand_id)
  122. rows.append(hive_row)
  123. return rows
  124. def _build_hive_row(
  125. *,
  126. strategy: str,
  127. demand_name: str,
  128. weight: float,
  129. demand_type: str,
  130. partition_dt: str,
  131. ) -> dict[str, Any]:
  132. normalized_name = demand_name.strip()
  133. return {
  134. "strategy": strategy,
  135. "demand_id": build_demand_id(
  136. strategy=strategy,
  137. demand_name=normalized_name,
  138. partition_dt=partition_dt,
  139. ),
  140. "demand_name": normalized_name,
  141. "weight": weight,
  142. "type": demand_type,
  143. "video_count": None,
  144. "video_list": [],
  145. "extend": "{}",
  146. "dt": partition_dt,
  147. }