hot_content_demand_export_service.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. """新热事件需求导出明细:按热点记录创建时间筛选 hot_content_demand_exports。"""
  2. from __future__ import annotations
  3. import re
  4. from datetime import date, datetime, timedelta
  5. from typing import Any
  6. from zoneinfo import ZoneInfo
  7. from sqlalchemy import text
  8. from app.db.hot_content_mysql import HotContentSessionLocal
  9. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  10. DATE_RE = re.compile(r"^\d{8}$")
  11. ITEM_TYPE_WORD = "词"
  12. ITEM_TYPE_POINT = "点"
  13. MAX_EXPORT_ROWS = 50_000
  14. _EXPORT_SELECT = """
  15. SELECT
  16. e.id,
  17. e.source,
  18. e.hot_title,
  19. e.item_text,
  20. e.point_category,
  21. e.matched_demand,
  22. e.is_as_demand,
  23. e.contribution_score,
  24. e.wxindex_keyword,
  25. e.all_hot_keywords,
  26. e.wxindex_latest_score,
  27. e.wxindex_trend,
  28. e.event_sense_score,
  29. e.senior_fit_score,
  30. e.item_type,
  31. r.created_at AS record_created_at
  32. FROM hot_content_demand_exports e
  33. INNER JOIN hot_content_records r ON r.id = e.record_id
  34. """
  35. def _normalize_date(date_value: str | None) -> str | None:
  36. if not date_value:
  37. return None
  38. normalized = str(date_value).replace("-", "").strip()
  39. if not normalized:
  40. return None
  41. if not DATE_RE.match(normalized):
  42. raise ValueError("date must be yyyymmdd or yyyy-mm-dd")
  43. return normalized
  44. def _parse_yyyymmdd(value: str) -> date:
  45. return datetime.strptime(value, "%Y%m%d").date()
  46. def _resolve_date_range(
  47. start_dt: str | None,
  48. end_dt: str | None,
  49. ) -> tuple[datetime, datetime]:
  50. """返回 [start_at, end_at_exclusive) 上海时区半开区间。"""
  51. today = datetime.now(SHANGHAI_TZ).date()
  52. normalized_start = _normalize_date(start_dt)
  53. normalized_end = _normalize_date(end_dt)
  54. start_day = _parse_yyyymmdd(normalized_start) if normalized_start else today
  55. end_day = _parse_yyyymmdd(normalized_end) if normalized_end else today
  56. if start_day > end_day:
  57. raise ValueError("开始日期不能晚于结束日期")
  58. start_at = datetime.combine(start_day, datetime.min.time(), tzinfo=SHANGHAI_TZ)
  59. end_at_exclusive = datetime.combine(
  60. end_day + timedelta(days=1),
  61. datetime.min.time(),
  62. tzinfo=SHANGHAI_TZ,
  63. )
  64. return start_at, end_at_exclusive
  65. def _item_type_label(item_type: str | None) -> str:
  66. value = str(item_type or "").strip()
  67. if value == ITEM_TYPE_WORD:
  68. return "特征点"
  69. if value == ITEM_TYPE_POINT:
  70. return "短语"
  71. return value or "-"
  72. def _normalize_item_type(item_type: str | None) -> str | None:
  73. if item_type is None:
  74. return None
  75. value = str(item_type).strip()
  76. if not value or value in {"全部", "all"}:
  77. return None
  78. if value in {ITEM_TYPE_WORD, "特征点"}:
  79. return ITEM_TYPE_WORD
  80. if value in {ITEM_TYPE_POINT, "短语"}:
  81. return ITEM_TYPE_POINT
  82. raise ValueError("item_type 须为 词、点、特征点、短语,或留空表示全部")
  83. def _build_filters(
  84. *,
  85. start_dt: str | None,
  86. end_dt: str | None,
  87. is_as_demand: int | None,
  88. has_matched_demand: int | None,
  89. item_type: str | None,
  90. min_wxindex_latest_score: float | None,
  91. min_event_sense_score: float | None,
  92. min_senior_fit_score: float | None,
  93. ) -> tuple[str, dict[str, object], datetime, datetime]:
  94. start_at, end_at_exclusive = _resolve_date_range(start_dt, end_dt)
  95. where_parts = [
  96. "r.created_at >= :start_at",
  97. "r.created_at < :end_at_exclusive",
  98. ]
  99. params: dict[str, object] = {
  100. "start_at": start_at.replace(tzinfo=None),
  101. "end_at_exclusive": end_at_exclusive.replace(tzinfo=None),
  102. }
  103. if is_as_demand is not None:
  104. if is_as_demand not in (0, 1):
  105. raise ValueError("is_as_demand 须为 0 或 1")
  106. where_parts.append("e.is_as_demand = :is_as_demand")
  107. params["is_as_demand"] = is_as_demand
  108. if has_matched_demand is not None:
  109. if has_matched_demand not in (0, 1):
  110. raise ValueError("has_matched_demand 须为 0 或 1")
  111. if has_matched_demand == 1:
  112. where_parts.append(
  113. "e.matched_demand IS NOT NULL AND TRIM(e.matched_demand) <> ''"
  114. )
  115. else:
  116. where_parts.append(
  117. "(e.matched_demand IS NULL OR TRIM(e.matched_demand) = '')"
  118. )
  119. normalized_item_type = _normalize_item_type(item_type)
  120. if normalized_item_type:
  121. where_parts.append("e.item_type = :item_type")
  122. params["item_type"] = normalized_item_type
  123. if min_wxindex_latest_score is not None:
  124. if min_wxindex_latest_score < 0:
  125. raise ValueError("min_wxindex_latest_score 不能为负数")
  126. where_parts.append("e.wxindex_latest_score >= :min_wxindex_latest_score")
  127. params["min_wxindex_latest_score"] = min_wxindex_latest_score
  128. if min_event_sense_score is not None:
  129. if min_event_sense_score < 0:
  130. raise ValueError("min_event_sense_score 不能为负数")
  131. where_parts.append("e.event_sense_score >= :min_event_sense_score")
  132. params["min_event_sense_score"] = min_event_sense_score
  133. if min_senior_fit_score is not None:
  134. if min_senior_fit_score < 0:
  135. raise ValueError("min_senior_fit_score 不能为负数")
  136. where_parts.append("e.senior_fit_score >= :min_senior_fit_score")
  137. params["min_senior_fit_score"] = min_senior_fit_score
  138. where_sql = f"WHERE {' AND '.join(where_parts)}"
  139. return where_sql, params, start_at, end_at_exclusive
  140. def _row_to_dict(row: dict[str, Any]) -> dict[str, object]:
  141. created_at = row.get("record_created_at")
  142. if isinstance(created_at, datetime):
  143. record_created_at = created_at.strftime("%Y-%m-%d %H:%M:%S")
  144. else:
  145. record_created_at = str(created_at) if created_at is not None else ""
  146. is_as_demand_raw = row.get("is_as_demand")
  147. is_as_demand_int = int(is_as_demand_raw) if is_as_demand_raw is not None else 0
  148. contribution = row.get("contribution_score")
  149. return {
  150. "id": int(row["id"]),
  151. "source": str(row.get("source") or ""),
  152. "hot_title": str(row.get("hot_title") or ""),
  153. "item_text": str(row.get("item_text") or ""),
  154. "point_category": str(row.get("point_category") or ""),
  155. "item_type": str(row.get("item_type") or ""),
  156. "item_type_label": _item_type_label(row.get("item_type")),
  157. "matched_demand": str(row.get("matched_demand") or ""),
  158. "is_as_demand": is_as_demand_int,
  159. "is_as_demand_label": "是" if is_as_demand_int == 1 else "否",
  160. "contribution_score": float(contribution) if contribution is not None else None,
  161. "wxindex_keyword": str(row.get("wxindex_keyword") or ""),
  162. "all_hot_keywords": str(row.get("all_hot_keywords") or ""),
  163. "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
  164. "wxindex_trend": str(row.get("wxindex_trend") or ""),
  165. "event_sense_score": float(row["event_sense_score"])
  166. if row.get("event_sense_score") is not None
  167. else None,
  168. "senior_fit_score": float(row["senior_fit_score"])
  169. if row.get("senior_fit_score") is not None
  170. else None,
  171. "item_type": str(row.get("item_type") or ""),
  172. "record_created_at": record_created_at,
  173. }
  174. def query_hot_content_demand_exports(
  175. *,
  176. start_dt: str | None = None,
  177. end_dt: str | None = None,
  178. is_as_demand: int | None = None,
  179. has_matched_demand: int | None = None,
  180. item_type: str | None = None,
  181. min_wxindex_latest_score: float | None = None,
  182. min_event_sense_score: float | None = None,
  183. min_senior_fit_score: float | None = None,
  184. page: int = 1,
  185. page_size: int = 20,
  186. ) -> dict[str, object]:
  187. where_sql, params, _, _ = _build_filters(
  188. start_dt=start_dt,
  189. end_dt=end_dt,
  190. is_as_demand=is_as_demand,
  191. has_matched_demand=has_matched_demand,
  192. item_type=item_type,
  193. min_wxindex_latest_score=min_wxindex_latest_score,
  194. min_event_sense_score=min_event_sense_score,
  195. min_senior_fit_score=min_senior_fit_score,
  196. )
  197. offset = (page - 1) * page_size
  198. count_sql = text(
  199. f"""
  200. SELECT COUNT(*) AS cnt
  201. FROM hot_content_demand_exports e
  202. INNER JOIN hot_content_records r ON r.id = e.record_id
  203. {where_sql}
  204. """
  205. )
  206. list_sql = text(
  207. f"""
  208. {_EXPORT_SELECT}
  209. {where_sql}
  210. ORDER BY r.created_at ASC, e.id ASC
  211. LIMIT :limit OFFSET :offset
  212. """
  213. )
  214. list_params = {**params, "limit": page_size, "offset": offset}
  215. with HotContentSessionLocal() as session:
  216. total = int(session.execute(count_sql, params).scalar_one())
  217. rows = session.execute(list_sql, list_params).mappings().all()
  218. return {
  219. "total": total,
  220. "page": page,
  221. "page_size": page_size,
  222. "items": [_row_to_dict(dict(row)) for row in rows],
  223. }
  224. def export_hot_content_demand_exports(
  225. *,
  226. start_dt: str | None = None,
  227. end_dt: str | None = None,
  228. is_as_demand: int | None = None,
  229. has_matched_demand: int | None = None,
  230. item_type: str | None = None,
  231. min_wxindex_latest_score: float | None = None,
  232. min_event_sense_score: float | None = None,
  233. min_senior_fit_score: float | None = None,
  234. ) -> list[dict[str, object]]:
  235. where_sql, params, _, _ = _build_filters(
  236. start_dt=start_dt,
  237. end_dt=end_dt,
  238. is_as_demand=is_as_demand,
  239. has_matched_demand=has_matched_demand,
  240. item_type=item_type,
  241. min_wxindex_latest_score=min_wxindex_latest_score,
  242. min_event_sense_score=min_event_sense_score,
  243. min_senior_fit_score=min_senior_fit_score,
  244. )
  245. export_sql = text(
  246. f"""
  247. {_EXPORT_SELECT}
  248. {where_sql}
  249. ORDER BY r.created_at ASC, e.id ASC
  250. LIMIT :limit
  251. """
  252. )
  253. export_params = {**params, "limit": MAX_EXPORT_ROWS + 1}
  254. with HotContentSessionLocal() as session:
  255. rows = session.execute(export_sql, export_params).mappings().all()
  256. if len(rows) > MAX_EXPORT_ROWS:
  257. raise ValueError(f"导出条数超过上限 {MAX_EXPORT_ROWS},请缩小日期或筛选范围")
  258. return [_row_to_dict(dict(row)) for row in rows]