hot_content_source_service.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. """新热事件需求词来源:按 sync_log 定位热点标题并返回单条展示数据。"""
  2. from __future__ import annotations
  3. import hashlib
  4. import json
  5. import math
  6. from datetime import datetime
  7. from typing import Any
  8. from sqlalchemy import text
  9. from app.core.config import settings
  10. from app.db.hot_content_mysql import HotContentSessionLocal
  11. TYPE_FEATURE_POINT = "特征点"
  12. TYPE_PHRASE = "短语"
  13. ITEM_TYPE_ELEMENT = "元素"
  14. ITEM_TYPE_PHRASE = "短语"
  15. TITLE_RETAIN_POINT_CATEGORIES = frozenset({"灵感点", "目的点"})
  16. WEIGHT_DIVISOR = 1_000_000.0
  17. DEFAULT_WXINDEX_THRESHOLD = 1_000_000.0
  18. DEFAULT_HOT_STRATEGY = "新热事件"
  19. def _hot_strategy_name() -> str:
  20. return str(
  21. getattr(settings, "hot_demand_pool_strategy", None) or DEFAULT_HOT_STRATEGY
  22. ).strip() or DEFAULT_HOT_STRATEGY
  23. def _wxindex_threshold() -> float:
  24. raw = getattr(settings, "hot_content_wxindex_threshold", None)
  25. if raw is None:
  26. return DEFAULT_WXINDEX_THRESHOLD
  27. try:
  28. return float(raw)
  29. except (TypeError, ValueError):
  30. return DEFAULT_WXINDEX_THRESHOLD
  31. def _normalize_date(date_value: str | None) -> str:
  32. if not date_value:
  33. return ""
  34. normalized = str(date_value).replace("-", "").strip()
  35. return normalized if len(normalized) == 8 and normalized.isdigit() else ""
  36. def _normalize_text(value: str) -> str:
  37. return "".join(str(value or "").split())
  38. def _load_json(value: Any) -> dict[str, Any]:
  39. if value is None:
  40. return {}
  41. if isinstance(value, dict):
  42. return value
  43. if isinstance(value, (bytes, bytearray)):
  44. value = value.decode("utf-8")
  45. if isinstance(value, str):
  46. text_value = value.strip()
  47. if not text_value:
  48. return {}
  49. try:
  50. parsed = json.loads(text_value)
  51. except json.JSONDecodeError:
  52. return {"__parse_error__": text_value}
  53. return parsed if isinstance(parsed, dict) else {"value": parsed}
  54. return {"value": value}
  55. def _fmt_datetime(value: Any) -> str:
  56. if value is None:
  57. return "-"
  58. if isinstance(value, datetime):
  59. return value.strftime("%Y-%m-%d %H:%M:%S")
  60. return str(value)
  61. def build_demand_id(*, strategy: str, demand_name: str, partition_dt: str) -> str:
  62. raw = f"{strategy}{demand_name.strip()}{partition_dt}"
  63. return hashlib.md5(raw.encode("utf-8")).hexdigest()
  64. def _record_wxindex_score(export_rows: list[dict[str, Any]]) -> float:
  65. scores: list[float] = []
  66. for row in export_rows:
  67. try:
  68. scores.append(float(row.get("wxindex_latest_score") or 0))
  69. except (TypeError, ValueError):
  70. continue
  71. return max(scores) if scores else 0.0
  72. def _has_inspiration_or_goal_demand_match(row: dict[str, Any]) -> bool:
  73. point_category = str(row.get("point_category") or "").strip()
  74. if point_category not in TITLE_RETAIN_POINT_CATEGORIES:
  75. return False
  76. return bool(str(row.get("matched_demand") or "").strip())
  77. def _export_row_contributes_to_demand(
  78. row: dict[str, Any],
  79. *,
  80. demand_name: str,
  81. demand_type: str,
  82. ) -> bool:
  83. normalized_name = _normalize_text(demand_name)
  84. item_type = str(row.get("item_type") or "")
  85. item_text = str(row.get("item_text") or "")
  86. matched = str(row.get("matched_demand") or "").strip()
  87. if not matched:
  88. return False
  89. if demand_type == TYPE_PHRASE:
  90. return item_type == ITEM_TYPE_PHRASE and _normalize_text(item_text) == normalized_name
  91. if demand_type == TYPE_FEATURE_POINT:
  92. return item_type == ITEM_TYPE_ELEMENT and _normalize_text(item_text) in normalized_name
  93. return False
  94. def _hive_weight_for_record(
  95. export_rows: list[dict[str, Any]],
  96. *,
  97. wxindex_threshold: float,
  98. ) -> float | None:
  99. if _record_wxindex_score(export_rows) < wxindex_threshold:
  100. return None
  101. if not any(_has_inspiration_or_goal_demand_match(row) for row in export_rows):
  102. return None
  103. return _record_wxindex_score(export_rows) / WEIGHT_DIVISOR
  104. def fetch_sync_log_row(
  105. *,
  106. demand_name: str,
  107. demand_type: str,
  108. partition_dt: str,
  109. strategy: str | None = None,
  110. ) -> dict[str, Any] | None:
  111. """按需求名称、需求类型、分区 dt 查询 hot_content_odps_sync_log 唯一记录。"""
  112. name = (demand_name or "").strip()
  113. dtype = (demand_type or "").strip()
  114. dt = _normalize_date(partition_dt)
  115. if not name or not dtype or not dt:
  116. raise ValueError("demand_name、demand_type、dt 均为必填")
  117. strategy_value = (strategy or _hot_strategy_name()).strip()
  118. sql = text(
  119. """
  120. SELECT
  121. id,
  122. partition_dt,
  123. strategy,
  124. demand_id,
  125. demand_name,
  126. demand_type,
  127. record_id,
  128. synced_at
  129. FROM hot_content_odps_sync_log
  130. WHERE demand_name = :demand_name
  131. AND demand_type = :demand_type
  132. AND partition_dt = :partition_dt
  133. AND strategy = :strategy
  134. LIMIT 2
  135. """
  136. )
  137. params = {
  138. "demand_name": name,
  139. "demand_type": dtype,
  140. "partition_dt": dt,
  141. "strategy": strategy_value,
  142. }
  143. with HotContentSessionLocal() as session:
  144. rows = session.execute(sql, params).mappings().all()
  145. if not rows:
  146. return None
  147. if len(rows) > 1:
  148. raise ValueError("匹配到多条同步记录,请检查查询条件")
  149. return dict(rows[0])
  150. def fetch_hot_content_source_detail(
  151. *,
  152. demand_name: str,
  153. demand_type: str,
  154. partition_dt: str,
  155. strategy: str | None = None,
  156. ) -> dict[str, Any]:
  157. sync_row = fetch_sync_log_row(
  158. demand_name=demand_name,
  159. demand_type=demand_type,
  160. partition_dt=partition_dt,
  161. strategy=strategy,
  162. )
  163. if sync_row is None:
  164. raise LookupError("未找到对应的同步记录")
  165. record_id = int(sync_row.get("record_id") or 0)
  166. if record_id <= 0:
  167. raise LookupError("同步记录未关联热点内容")
  168. wxindex_threshold = _wxindex_threshold()
  169. strategy_value = str(sync_row.get("strategy") or _hot_strategy_name())
  170. partition_dt = str(sync_row.get("partition_dt") or "")
  171. sync_demand_name = str(sync_row.get("demand_name") or "")
  172. sync_demand_type = str(sync_row.get("demand_type") or "")
  173. demand_id = str(sync_row.get("demand_id") or "")
  174. record_sql = text(
  175. """
  176. SELECT
  177. id,
  178. source,
  179. title,
  180. article_title,
  181. article_body,
  182. hot_rank,
  183. execution_status,
  184. postprocess_status,
  185. created_at,
  186. contribution_demand_match_json,
  187. wxindex_trend_json
  188. FROM hot_content_records
  189. WHERE id = :record_id
  190. LIMIT 1
  191. """
  192. )
  193. export_sql = text(
  194. """
  195. SELECT
  196. id,
  197. item_type,
  198. item_text,
  199. point_category,
  200. matched_demand,
  201. contribution_score,
  202. wxindex_keyword,
  203. all_hot_keywords,
  204. wxindex_latest_score,
  205. wxindex_trend
  206. FROM hot_content_demand_exports
  207. WHERE record_id = :record_id
  208. ORDER BY id ASC
  209. """
  210. )
  211. with HotContentSessionLocal() as session:
  212. record_row = session.execute(record_sql, {"record_id": record_id}).mappings().first()
  213. export_rows_raw = session.execute(export_sql, {"record_id": record_id}).mappings().all()
  214. if record_row is None:
  215. raise LookupError("未找到关联热点内容")
  216. export_dicts: list[dict[str, Any]] = []
  217. export_items: list[dict[str, Any]] = []
  218. for row in export_rows_raw:
  219. item = dict(row)
  220. contributes = _export_row_contributes_to_demand(
  221. item,
  222. demand_name=sync_demand_name,
  223. demand_type=sync_demand_type,
  224. )
  225. export_dicts.append(
  226. {
  227. "item_type": item.get("item_type"),
  228. "item_text": item.get("item_text"),
  229. "point_category": item.get("point_category"),
  230. "matched_demand": item.get("matched_demand"),
  231. "wxindex_latest_score": item.get("wxindex_latest_score"),
  232. }
  233. )
  234. export_items.append(
  235. {
  236. "id": int(item["id"]),
  237. "item_type": str(item.get("item_type") or ""),
  238. "item_text": str(item.get("item_text") or ""),
  239. "point_category": str(item.get("point_category") or ""),
  240. "matched_demand": str(item.get("matched_demand") or ""),
  241. "contribution_score": float(item["contribution_score"])
  242. if item.get("contribution_score") is not None
  243. else None,
  244. "wxindex_keyword": str(item.get("wxindex_keyword") or ""),
  245. "all_hot_keywords": str(item.get("all_hot_keywords") or ""),
  246. "wxindex_latest_score": float(item.get("wxindex_latest_score") or 0),
  247. "wxindex_trend": str(item.get("wxindex_trend") or ""),
  248. "contributes_to_sync": contributes,
  249. }
  250. )
  251. max_wxindex = _record_wxindex_score(export_dicts)
  252. hive_weight = _hive_weight_for_record(
  253. export_dicts,
  254. wxindex_threshold=wxindex_threshold,
  255. )
  256. expected_id = build_demand_id(
  257. strategy=strategy_value,
  258. demand_name=sync_demand_name,
  259. partition_dt=partition_dt,
  260. )
  261. contribution = _load_json(record_row.get("contribution_demand_match_json"))
  262. wxindex = _load_json(record_row.get("wxindex_trend_json"))
  263. return {
  264. "wxindex_threshold": wxindex_threshold,
  265. "sync_log": {
  266. "id": int(sync_row["id"]),
  267. "partition_dt": partition_dt,
  268. "strategy": strategy_value,
  269. "demand_id": demand_id,
  270. "demand_name": sync_demand_name,
  271. "demand_type": sync_demand_type,
  272. "record_id": record_id,
  273. "synced_at": _fmt_datetime(sync_row.get("synced_at")),
  274. "demand_id_verified": expected_id == demand_id,
  275. "hive_weight": hive_weight,
  276. },
  277. "record": {
  278. "id": int(record_row["id"]),
  279. "source": str(record_row.get("source") or ""),
  280. "title": str(record_row.get("title") or ""),
  281. "article_title": str(record_row.get("article_title") or ""),
  282. "article_body": str(record_row.get("article_body") or ""),
  283. "hot_rank": int(record_row["hot_rank"])
  284. if record_row.get("hot_rank") is not None
  285. else None,
  286. "created_at": _fmt_datetime(record_row.get("created_at")),
  287. "contribution": contribution,
  288. "wxindex": wxindex,
  289. "max_wxindex_score": max_wxindex,
  290. "passes_wxindex_gate": max_wxindex >= wxindex_threshold,
  291. "passes_point_gate": any(
  292. _has_inspiration_or_goal_demand_match(row) for row in export_dicts
  293. ),
  294. },
  295. "export_rows": export_items,
  296. }
  297. def format_number(value: Any) -> str:
  298. try:
  299. number = float(value)
  300. except (TypeError, ValueError):
  301. return "-"
  302. if math.isnan(number):
  303. return "-"
  304. if abs(number) >= 10000:
  305. return f"{number / 10000:.1f}万"
  306. return f"{number:.0f}"