element_search_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. """实质元素需求检索:通过 ODPS(MaxCompute)执行 Hive SQL。"""
  2. import json
  3. import re
  4. from datetime import date, datetime, timedelta
  5. from decimal import Decimal
  6. from zoneinfo import ZoneInfo
  7. from app.odps.client import get_odps_client
  8. _DATE_PARTITION_RE = re.compile(r"^\d{8}$")
  9. _VID_QUERY_RE = re.compile(r"^[0-9a-zA-Z_-]{1,128}$")
  10. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  11. def _today_shanghai() -> date:
  12. return datetime.now(SHANGHAI_TZ).date()
  13. def _same_day_last_year(value: date) -> date:
  14. try:
  15. return value.replace(year=value.year - 1)
  16. except ValueError:
  17. return date(value.year - 1, 2, 28)
  18. def _partition_yyyymmdd(value: date) -> str:
  19. return value.strftime("%Y%m%d")
  20. def _validate_partition_dt(label: str, value: str) -> str:
  21. if not _DATE_PARTITION_RE.match(value):
  22. raise ValueError(f"{label} 须为 YYYYMMDD 分区格式,当前为 {value!r}")
  23. return value
  24. def _serialize_video_list(value: object) -> str | None:
  25. if value is None:
  26. return None
  27. if isinstance(value, list):
  28. return json.dumps(value, ensure_ascii=False)
  29. return str(value)
  30. def _normalize_scalar(value: object) -> object:
  31. if isinstance(value, Decimal):
  32. return float(value)
  33. return value
  34. def query_same_period_last_year_element_demands(
  35. *,
  36. period_days: int,
  37. view_pv_count: int,
  38. min_contribution_score: float | int,
  39. rov_avg: float | int,
  40. ) -> list[dict[str, object]]:
  41. """去年同期阳历策略下的实质元素需求聚合(ROV 等均值过滤)。
  42. 分区 dt 范围(YYYYMMDD)由「上海时区下的去年今日」与时间段推算:
  43. - start_date:去年的今天(与今年同日历月日;闰年 2 月 29 日则退成去年 2 月 28 日)
  44. - end_date:从 start_date 起连续 period_days 个日历日的最后一天(含起始日)
  45. demand_id 中的 bizdate 固定为上海时区当天日期(YYYYMMDD),不作为入参。
  46. 其余参数:
  47. - period_days:区间天数(含起始日),须 >= 0;为 0 仅含 start_date 当日;为 7 表示含 start_date 在内共 7 天分区
  48. - view_pv_count:`当日分发曝光pv` 下限
  49. - min_contribution_score:`贡献分` 下限
  50. - rov_avg:分组后平均 ROV 下限
  51. """
  52. if period_days < 0:
  53. raise ValueError("period_days 不能为负")
  54. anchor_last_year = _same_day_last_year(_today_shanghai())
  55. start_dt = anchor_last_year
  56. end_dt = anchor_last_year + timedelta(days=max(0, period_days - 1))
  57. start_date = _validate_partition_dt("start_date", _partition_yyyymmdd(start_dt))
  58. end_date = _validate_partition_dt("end_date", _partition_yyyymmdd(end_dt))
  59. bizdate = _partition_yyyymmdd(_today_shanghai())
  60. if view_pv_count < 0:
  61. raise ValueError("view_pv_count 不能为负")
  62. sql = f"""
  63. WITH cleaned_video_metrics AS
  64. (
  65. SELECT CAST(视频id AS STRING) AS vid
  66. ,rov_t0
  67. FROM loghubods.video_dimension_detail_add_column
  68. WHERE dt >= '{start_date}'
  69. AND dt <= '{end_date}'
  70. AND COALESCE(`当日分发曝光pv`,0) >= {int(view_pv_count)}
  71. )
  72. ,video_avg_metrics AS
  73. (
  74. SELECT vid
  75. ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
  76. FROM cleaned_video_metrics
  77. GROUP BY vid
  78. )
  79. ,tag_vid_dedup AS
  80. (
  81. SELECT DISTINCT CAST(vid AS STRING) AS vid
  82. ,原始元素
  83. ,原始元素描述
  84. ,点类型
  85. ,元素维度
  86. ,短语
  87. ,选题
  88. ,`extend`
  89. FROM loghubods.dwd_topic_decode_result_detail_di
  90. WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
  91. AND TRIM(原始元素) <> ''
  92. AND 原始元素 IS NOT NULL
  93. AND 元素维度 = '实质'
  94. AND 贡献分 >= {float(min_contribution_score)}
  95. )
  96. SELECT '去年同期阳历' AS strategy
  97. ,md5(CONCAT('去年同期阳历',t1.原始元素,'{bizdate}')) AS demand_id
  98. ,t1.原始元素 AS demand_name
  99. ,COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) AS weight
  100. ,COUNT(DISTINCT t1.vid) AS video_count
  101. ,COLLECT_SET(t1.vid) AS video_list
  102. ,'{{}}' AS ext_info
  103. FROM tag_vid_dedup t1
  104. LEFT JOIN video_avg_metrics t2
  105. ON t1.vid = t2.vid
  106. GROUP BY t1.原始元素
  107. HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)}
  108. ORDER BY weight DESC
  109. """
  110. odps_client = get_odps_client()
  111. instance = odps_client.execute_sql(sql)
  112. rows: list[dict[str, object]] = []
  113. with instance.open_reader(tunnel=True) as reader:
  114. for record in reader:
  115. rows.append(
  116. {
  117. "strategy": record["strategy"],
  118. "demand_id": record["demand_id"],
  119. "demand_name": record["demand_name"],
  120. "weight": _normalize_scalar(record["weight"]),
  121. "video_count": record["video_count"],
  122. "video_list": _serialize_video_list(record["video_list"]),
  123. "ext_info": record["ext_info"],
  124. }
  125. )
  126. return rows
  127. def query_same_period_last_year_lunar_element_demands(
  128. *,
  129. period_days: int,
  130. view_pv_count: int,
  131. min_contribution_score: float | int,
  132. rov_avg: float | int,
  133. ) -> list[dict[str, object]]:
  134. """去年同期阴历策略下的实质元素需求聚合(ROV 等均值过滤)。
  135. 日期区间由 SQL 内 lunar_to_solar / lunar_add / solar_to_lunar 推算;
  136. end_date 为 start_date 起连续 period_days 天的最后一天(含首日),与阳历接口语义一致。
  137. bizdate 为上海时区当天 YYYYMMDD。
  138. """
  139. if period_days < 0:
  140. raise ValueError("period_days 不能为负")
  141. if view_pv_count < 0:
  142. raise ValueError("view_pv_count 不能为负")
  143. bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
  144. days_after = max(0, period_days - 1)
  145. sql = f"""
  146. SET odps.sql.type.system.odps2 = true
  147. ;
  148. SET odps.sql.decimal.odps2 = true
  149. ;
  150. WITH base_date AS
  151. (
  152. SELECT lunar_to_solar(lunar_add(loghubods.solar_to_lunar('{bizdate}','yyyyMMdd'),-1,'Y')) AS start_date
  153. )
  154. ,date_params AS
  155. (
  156. SELECT start_date
  157. ,DATE_FORMAT(CAST(DATE_ADD(TO_DATE(start_date,'yyyyMMdd'),{int(days_after)}) AS TIMESTAMP),'yyyyMMdd') AS end_date
  158. FROM base_date
  159. )
  160. ,cleaned_video_metrics AS
  161. (
  162. SELECT CAST(d.视频id AS STRING) AS vid
  163. ,d.rov_t0
  164. FROM loghubods.video_dimension_detail_add_column d
  165. WHERE d.dt >= (
  166. SELECT start_date
  167. FROM date_params
  168. )
  169. AND d.dt <= (
  170. SELECT end_date
  171. FROM date_params
  172. )
  173. AND COALESCE(d.`当日分发曝光pv`,0) >= {int(view_pv_count)}
  174. )
  175. ,video_avg_metrics AS
  176. (
  177. SELECT vid
  178. ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
  179. FROM cleaned_video_metrics
  180. GROUP BY vid
  181. )
  182. ,tag_vid_dedup AS
  183. (
  184. SELECT DISTINCT CAST(vid AS STRING) AS vid
  185. ,原始元素
  186. ,原始元素描述
  187. ,点类型
  188. ,元素维度
  189. ,短语
  190. ,选题
  191. ,`extend`
  192. FROM loghubods.dwd_topic_decode_result_detail_di
  193. WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
  194. AND TRIM(原始元素) <> ''
  195. AND 原始元素 IS NOT NULL
  196. AND 元素维度 = '实质'
  197. AND 贡献分 >= {float(min_contribution_score)}
  198. )
  199. SELECT '去年同期阴历' AS strategy
  200. ,md5(CONCAT('去年同期阴历',t1.原始元素,'{bizdate}')) AS demand_id
  201. ,t1.原始元素 AS demand_name
  202. ,CAST(COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS DECIMAL(20,6)) AS weight
  203. ,COUNT(DISTINCT t1.vid) AS video_count
  204. ,COLLECT_SET(t1.vid) AS video_list
  205. ,'{{}}' AS ext_info
  206. FROM tag_vid_dedup t1
  207. LEFT JOIN video_avg_metrics t2
  208. ON t1.vid = t2.vid
  209. GROUP BY t1.原始元素
  210. HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)}
  211. ORDER BY weight DESC
  212. """
  213. odps_client = get_odps_client()
  214. # 脚本模式:SET 多条 + SELECT;DECIMAL(p,s) 需 odps.sql.decimal.odps2
  215. instance = odps_client.execute_sql(
  216. sql,
  217. hints={
  218. "odps.sql.submit.mode": "script",
  219. "odps.sql.decimal.odps2": "true",
  220. },
  221. )
  222. rows: list[dict[str, object]] = []
  223. with instance.open_reader(tunnel=True) as reader:
  224. for record in reader:
  225. rows.append(
  226. {
  227. "strategy": record["strategy"],
  228. "demand_id": record["demand_id"],
  229. "demand_name": record["demand_name"],
  230. "weight": _normalize_scalar(record["weight"]),
  231. "video_count": record["video_count"],
  232. "video_list": _serialize_video_list(record["video_list"]),
  233. "ext_info": record["ext_info"],
  234. }
  235. )
  236. return rows
  237. def query_monthly_element_demands(
  238. *,
  239. view_pv_count: int,
  240. month_total_pv_threshold: float | int,
  241. min_contribution_score: float | int,
  242. rov_avg: float | int,
  243. min_frequency: int,
  244. ) -> list[dict[str, object]]:
  245. """逐月策略:按 bizdate 所在自然月回溯 12 个月窗口,聚合实质元素需求。"""
  246. if view_pv_count < 0:
  247. raise ValueError("view_pv_count 不能为负")
  248. if float(month_total_pv_threshold) < 0:
  249. raise ValueError("month_total_pv_threshold 不能为负")
  250. if min_frequency < 0:
  251. raise ValueError("min_frequency 不能为负")
  252. bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
  253. sql = f"""
  254. WITH biz_month AS (
  255. SELECT
  256. TO_DATE(CONCAT(SUBSTR('{bizdate}', 1, 4), '-', SUBSTR('{bizdate}', 5, 2), '-01')) AS biz_m1
  257. ),
  258. month_window AS (
  259. SELECT
  260. CONCAT(
  261. SUBSTR(CAST(ADD_MONTHS(biz_m1, -12) AS STRING), 1, 4),
  262. SUBSTR(CAST(ADD_MONTHS(biz_m1, -12) AS STRING), 6, 2)
  263. ) AS start_ym,
  264. CONCAT(
  265. SUBSTR(CAST(ADD_MONTHS(biz_m1, -1) AS STRING), 1, 4),
  266. SUBSTR(CAST(ADD_MONTHS(biz_m1, -1) AS STRING), 6, 2)
  267. ) AS end_ym
  268. FROM biz_month
  269. ),
  270. cleaned_video_metrics AS (
  271. SELECT
  272. CAST(视频id AS STRING) AS vid,
  273. SUBSTR(CAST(dt AS STRING), 1, 6) AS ym,
  274. rov_t0,
  275. COALESCE(`当日分发曝光pv`, 0) AS day_dist_pv
  276. FROM loghubods.video_dimension_detail_add_column
  277. WHERE SUBSTR(CAST(dt AS STRING), 1, 6) >= (SELECT start_ym FROM month_window)
  278. AND SUBSTR(CAST(dt AS STRING), 1, 6) <= (SELECT end_ym FROM month_window)
  279. AND COALESCE(`当日分发曝光pv`, 0) >= {int(view_pv_count)}
  280. ),
  281. video_monthly_avg_metrics AS (
  282. SELECT
  283. ym,
  284. vid,
  285. AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov,
  286. SUM(day_dist_pv) AS month_total_pv
  287. FROM cleaned_video_metrics
  288. GROUP BY ym, vid
  289. HAVING SUM(day_dist_pv) > {float(month_total_pv_threshold)}
  290. ),
  291. tag_vid_dedup AS (
  292. SELECT DISTINCT
  293. CAST(vid AS STRING) AS vid,
  294. 原始元素
  295. FROM loghubods.dwd_topic_decode_result_detail_di
  296. WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
  297. AND 元素维度 = '实质'
  298. AND 贡献分 >= {float(min_contribution_score)}
  299. ),
  300. element_monthly_metrics AS (
  301. SELECT
  302. t1.原始元素,
  303. t2.ym,
  304. COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS month_avg_rov
  305. FROM tag_vid_dedup t1
  306. JOIN video_monthly_avg_metrics t2
  307. ON t1.vid = t2.vid
  308. GROUP BY t1.原始元素, t2.ym
  309. HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)}
  310. ),
  311. element_total_rov AS (
  312. SELECT
  313. 原始元素,
  314. ROUND(SUM(month_avg_rov), 6) AS avg_rov
  315. FROM element_monthly_metrics
  316. GROUP BY 原始元素
  317. ),
  318. element_vid_dedup AS (
  319. SELECT DISTINCT
  320. em.原始元素,
  321. vm.vid
  322. FROM element_monthly_metrics em
  323. JOIN tag_vid_dedup tv
  324. ON em.原始元素 = tv.原始元素
  325. JOIN video_monthly_avg_metrics vm
  326. ON tv.vid = vm.vid
  327. AND em.ym = vm.ym
  328. ),
  329. element_vid_stats AS (
  330. SELECT
  331. 原始元素,
  332. COUNT(DISTINCT vid) AS vid_count,
  333. COLLECT_SET(vid) AS vid_list
  334. FROM element_vid_dedup
  335. GROUP BY 原始元素
  336. ),
  337. element_freq AS (
  338. SELECT
  339. 原始元素,
  340. COUNT(1) AS 频次
  341. FROM element_monthly_metrics
  342. GROUP BY 原始元素
  343. )
  344. ,element_month_list AS (
  345. SELECT
  346. 原始元素,
  347. TO_JSON(SORT_ARRAY(COLLECT_SET(ym))) AS month_list
  348. FROM element_monthly_metrics
  349. GROUP BY 原始元素
  350. )
  351. SELECT
  352. '逐月' AS strategy,
  353. md5(CONCAT('逐月', r.原始元素, '{bizdate}')) AS demand_id,
  354. r.原始元素 AS demand_name,
  355. r.avg_rov AS weight,
  356. COALESCE(v.vid_count, 0) AS video_count,
  357. v.vid_list AS video_list,
  358. ml.month_list AS month_list,
  359. COALESCE(f.频次, 0) AS frequency,
  360. '{{}}' AS ext_info
  361. FROM element_total_rov r
  362. LEFT JOIN element_vid_stats v
  363. ON r.原始元素 = v.原始元素
  364. LEFT JOIN element_freq f
  365. ON r.原始元素 = f.原始元素
  366. LEFT JOIN element_month_list ml -- 新增 JOIN
  367. ON r.原始元素 = ml.原始元素
  368. WHERE r.原始元素 NOT IN (
  369. '元旦','腊八节','小年','除夕','春节','正月初一','正月初二','正月初三','正月初四','正月初五',
  370. '情人节','元宵节','龙抬头','妇女节','植树节','劳动节','母亲节','儿童节','端午节','父亲节',
  371. '建党节','建军节','七夕节','中元节','中秋节','国庆节','重阳节','感恩节','公祭日','平安夜',
  372. '圣诞节','小寒','大寒','立春','雨水','惊蛰','春分','清明','谷雨','立夏','小满','芒种',
  373. '夏至','小暑','大暑','立秋','处暑','白露','秋分','寒露','霜降','立冬','小雪','大雪','冬至',
  374. '早上好','中午好','下午好','晚上好','晚安',
  375. '祝福','祝愿','祝你','祝贺','祝大家','祝您','祝好运','祝群主','祝朋友'
  376. )
  377. AND COALESCE(f.频次, 0) >= {int(min_frequency)}
  378. ORDER BY r.avg_rov DESC
  379. """
  380. odps_client = get_odps_client()
  381. instance = odps_client.execute_sql(sql)
  382. rows: list[dict[str, object]] = []
  383. with instance.open_reader(tunnel=True) as reader:
  384. for record in reader:
  385. rows.append(
  386. {
  387. "strategy": record["strategy"],
  388. "demand_id": record["demand_id"],
  389. "demand_name": record["demand_name"],
  390. "weight": _normalize_scalar(record["weight"]),
  391. "video_count": record["video_count"],
  392. "video_list": _serialize_video_list(record["video_list"]),
  393. "month_list": _serialize_video_list(record["month_list"]),
  394. "frequency": _normalize_scalar(record["frequency"]),
  395. "ext_info": record["ext_info"],
  396. }
  397. )
  398. return rows
  399. def query_video_decode_url2_for_today(vid: str) -> str | None:
  400. """按上海时区当天分区,从 dwd_topic_decode_result_di 取视频解构页 url2。"""
  401. vid_clean = vid.strip()
  402. if not _VID_QUERY_RE.match(vid_clean):
  403. raise ValueError("vid 须为 1~128 位字母、数字、下划线或连字符")
  404. bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
  405. sql = f"""
  406. SELECT url2
  407. FROM loghubods.dwd_topic_decode_result_di
  408. WHERE dt = '{bizdate}'
  409. AND CAST(vid AS STRING) = '{vid_clean}'
  410. LIMIT 1
  411. """
  412. odps_client = get_odps_client()
  413. instance = odps_client.execute_sql(sql)
  414. with instance.open_reader(tunnel=True) as reader:
  415. for record in reader:
  416. raw = record["url2"]
  417. if raw is None:
  418. return None
  419. text_val = str(raw).strip()
  420. return text_val if text_val else None
  421. return None