element_search_service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. """实质元素需求检索:通过 ODPS(MaxCompute)执行 Hive SQL。"""
  2. import json
  3. import re
  4. from datetime import date, datetime, timedelta
  5. from decimal import Decimal
  6. from zoneinfo import ZoneInfo
  7. from app.odps.client import get_odps_client
  8. _DATE_PARTITION_RE = re.compile(r"^\d{8}$")
  9. SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
  10. def _today_shanghai() -> date:
  11. return datetime.now(SHANGHAI_TZ).date()
  12. def _same_day_last_year(value: date) -> date:
  13. try:
  14. return value.replace(year=value.year - 1)
  15. except ValueError:
  16. return date(value.year - 1, 2, 28)
  17. def _partition_yyyymmdd(value: date) -> str:
  18. return value.strftime("%Y%m%d")
  19. def _validate_partition_dt(label: str, value: str) -> str:
  20. if not _DATE_PARTITION_RE.match(value):
  21. raise ValueError(f"{label} 须为 YYYYMMDD 分区格式,当前为 {value!r}")
  22. return value
  23. def _serialize_video_list(value: object) -> str | None:
  24. if value is None:
  25. return None
  26. if isinstance(value, list):
  27. return json.dumps(value, ensure_ascii=False)
  28. return str(value)
  29. def _normalize_scalar(value: object) -> object:
  30. if isinstance(value, Decimal):
  31. return float(value)
  32. return value
  33. def query_same_period_last_year_element_demands(
  34. *,
  35. period_days: int,
  36. view_pv_count: int,
  37. min_contribution_score: float | int,
  38. rov_avg: float | int,
  39. ) -> list[dict[str, object]]:
  40. """去年同期阳历策略下的实质元素需求聚合(ROV 等均值过滤)。
  41. 分区 dt 范围(YYYYMMDD)由「上海时区下的去年今日」与时间段推算:
  42. - start_date:去年的今天(与今年同日历月日;闰年 2 月 29 日则退成去年 2 月 28 日)
  43. - end_date:从 start_date 起连续 period_days 个日历日的最后一天(含起始日)
  44. demand_id 中的 bizdate 固定为上海时区当天日期(YYYYMMDD),不作为入参。
  45. 其余参数:
  46. - period_days:区间天数(含起始日),须 >= 0;为 0 仅含 start_date 当日;为 7 表示含 start_date 在内共 7 天分区
  47. - view_pv_count:`当日分发曝光pv` 下限
  48. - min_contribution_score:`贡献分` 下限
  49. - rov_avg:分组后平均 ROV 下限
  50. """
  51. if period_days < 0:
  52. raise ValueError("period_days 不能为负")
  53. anchor_last_year = _same_day_last_year(_today_shanghai())
  54. start_dt = anchor_last_year
  55. end_dt = anchor_last_year + timedelta(days=max(0, period_days - 1))
  56. start_date = _validate_partition_dt("start_date", _partition_yyyymmdd(start_dt))
  57. end_date = _validate_partition_dt("end_date", _partition_yyyymmdd(end_dt))
  58. bizdate = _partition_yyyymmdd(_today_shanghai())
  59. if view_pv_count < 0:
  60. raise ValueError("view_pv_count 不能为负")
  61. sql = f"""
  62. WITH cleaned_video_metrics AS
  63. (
  64. SELECT CAST(视频id AS STRING) AS vid
  65. ,rov_t0
  66. FROM loghubods.video_dimension_detail_add_column
  67. WHERE dt >= '{start_date}'
  68. AND dt <= '{end_date}'
  69. AND COALESCE(`当日分发曝光pv`,0) >= {int(view_pv_count)}
  70. )
  71. ,video_avg_metrics AS
  72. (
  73. SELECT vid
  74. ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
  75. FROM cleaned_video_metrics
  76. GROUP BY vid
  77. )
  78. ,tag_vid_dedup AS
  79. (
  80. SELECT DISTINCT CAST(vid AS STRING) AS vid
  81. ,原始元素
  82. ,原始元素描述
  83. ,点类型
  84. ,元素维度
  85. ,短语
  86. ,选题
  87. ,`extend`
  88. FROM loghubods.dwd_topic_decode_result_detail_di
  89. WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
  90. AND TRIM(原始元素) <> ''
  91. AND 原始元素 IS NOT NULL
  92. AND 元素维度 = '实质'
  93. AND 贡献分 >= {float(min_contribution_score)}
  94. )
  95. SELECT '去年同期阳历' AS strategy
  96. ,md5(CONCAT('去年同期阳历',t1.原始元素,'{bizdate}')) AS demand_id
  97. ,t1.原始元素 AS demand_name
  98. ,COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) AS weight
  99. ,COUNT(DISTINCT t1.vid) AS video_count
  100. ,COLLECT_SET(t1.vid) AS video_list
  101. ,'{{}}' AS ext_info
  102. FROM tag_vid_dedup t1
  103. LEFT JOIN video_avg_metrics t2
  104. ON t1.vid = t2.vid
  105. GROUP BY t1.原始元素
  106. HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)}
  107. ORDER BY weight DESC
  108. """
  109. odps_client = get_odps_client()
  110. instance = odps_client.execute_sql(sql)
  111. rows: list[dict[str, object]] = []
  112. with instance.open_reader(tunnel=True) as reader:
  113. for record in reader:
  114. rows.append(
  115. {
  116. "strategy": record["strategy"],
  117. "demand_id": record["demand_id"],
  118. "demand_name": record["demand_name"],
  119. "weight": _normalize_scalar(record["weight"]),
  120. "video_count": record["video_count"],
  121. "video_list": _serialize_video_list(record["video_list"]),
  122. "ext_info": record["ext_info"],
  123. }
  124. )
  125. return rows
  126. def query_same_period_last_year_lunar_element_demands(
  127. *,
  128. period_days: int,
  129. view_pv_count: int,
  130. min_contribution_score: float | int,
  131. rov_avg: float | int,
  132. ) -> list[dict[str, object]]:
  133. """去年同期阴历策略下的实质元素需求聚合(ROV 等均值过滤)。
  134. 日期区间由 SQL 内 lunar_to_solar / lunar_add / solar_to_lunar 推算;
  135. end_date 为 start_date 起连续 period_days 天的最后一天(含首日),与阳历接口语义一致。
  136. bizdate 为上海时区当天 YYYYMMDD。
  137. """
  138. if period_days < 0:
  139. raise ValueError("period_days 不能为负")
  140. if view_pv_count < 0:
  141. raise ValueError("view_pv_count 不能为负")
  142. bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
  143. days_after = max(0, period_days - 1)
  144. sql = f"""
  145. SET odps.sql.type.system.odps2 = true
  146. ;
  147. SET odps.sql.decimal.odps2 = true
  148. ;
  149. WITH base_date AS
  150. (
  151. SELECT lunar_to_solar(lunar_add(loghubods.solar_to_lunar('{bizdate}','yyyyMMdd'),-1,'Y')) AS start_date
  152. )
  153. ,date_params AS
  154. (
  155. SELECT start_date
  156. ,DATE_FORMAT(CAST(DATE_ADD(TO_DATE(start_date,'yyyyMMdd'),{int(days_after)}) AS TIMESTAMP),'yyyyMMdd') AS end_date
  157. FROM base_date
  158. )
  159. ,cleaned_video_metrics AS
  160. (
  161. SELECT CAST(d.视频id AS STRING) AS vid
  162. ,d.rov_t0
  163. FROM loghubods.video_dimension_detail_add_column d
  164. WHERE d.dt >= (
  165. SELECT start_date
  166. FROM date_params
  167. )
  168. AND d.dt <= (
  169. SELECT end_date
  170. FROM date_params
  171. )
  172. AND COALESCE(d.`当日分发曝光pv`,0) >= {int(view_pv_count)}
  173. )
  174. ,video_avg_metrics AS
  175. (
  176. SELECT vid
  177. ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
  178. FROM cleaned_video_metrics
  179. GROUP BY vid
  180. )
  181. ,tag_vid_dedup AS
  182. (
  183. SELECT DISTINCT CAST(vid AS STRING) AS vid
  184. ,原始元素
  185. ,原始元素描述
  186. ,点类型
  187. ,元素维度
  188. ,短语
  189. ,选题
  190. ,`extend`
  191. FROM loghubods.dwd_topic_decode_result_detail_di
  192. WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
  193. AND TRIM(原始元素) <> ''
  194. AND 原始元素 IS NOT NULL
  195. AND 元素维度 = '实质'
  196. AND 贡献分 >= {float(min_contribution_score)}
  197. )
  198. SELECT '去年同期阴历' AS strategy
  199. ,md5(CONCAT('去年同期阴历',t1.原始元素,'{bizdate}')) AS demand_id
  200. ,t1.原始元素 AS demand_name
  201. ,CAST(COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS DECIMAL(20,6)) AS weight
  202. ,COUNT(DISTINCT t1.vid) AS video_count
  203. ,COLLECT_SET(t1.vid) AS video_list
  204. ,'{{}}' AS ext_info
  205. FROM tag_vid_dedup t1
  206. LEFT JOIN video_avg_metrics t2
  207. ON t1.vid = t2.vid
  208. GROUP BY t1.原始元素
  209. HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)}
  210. ORDER BY weight DESC
  211. """
  212. odps_client = get_odps_client()
  213. # 脚本模式:SET 多条 + SELECT;DECIMAL(p,s) 需 odps.sql.decimal.odps2
  214. instance = odps_client.execute_sql(
  215. sql,
  216. hints={
  217. "odps.sql.submit.mode": "script",
  218. "odps.sql.decimal.odps2": "true",
  219. },
  220. )
  221. rows: list[dict[str, object]] = []
  222. with instance.open_reader(tunnel=True) as reader:
  223. for record in reader:
  224. rows.append(
  225. {
  226. "strategy": record["strategy"],
  227. "demand_id": record["demand_id"],
  228. "demand_name": record["demand_name"],
  229. "weight": _normalize_scalar(record["weight"]),
  230. "video_count": record["video_count"],
  231. "video_list": _serialize_video_list(record["video_list"]),
  232. "ext_info": record["ext_info"],
  233. }
  234. )
  235. return rows
  236. def query_monthly_element_demands(
  237. *,
  238. view_pv_count: int,
  239. month_total_pv_threshold: float | int,
  240. min_contribution_score: float | int,
  241. rov_avg: float | int,
  242. min_frequency: int,
  243. ) -> list[dict[str, object]]:
  244. """逐月策略:按 bizdate 所在自然月回溯 12 个月窗口,聚合实质元素需求。"""
  245. if view_pv_count < 0:
  246. raise ValueError("view_pv_count 不能为负")
  247. if float(month_total_pv_threshold) < 0:
  248. raise ValueError("month_total_pv_threshold 不能为负")
  249. if min_frequency < 0:
  250. raise ValueError("min_frequency 不能为负")
  251. bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
  252. sql = f"""
  253. WITH biz_month AS (
  254. SELECT
  255. TO_DATE(CONCAT(SUBSTR('{bizdate}', 1, 4), '-', SUBSTR('{bizdate}', 5, 2), '-01')) AS biz_m1
  256. ),
  257. month_window AS (
  258. SELECT
  259. CONCAT(
  260. SUBSTR(CAST(ADD_MONTHS(biz_m1, -12) AS STRING), 1, 4),
  261. SUBSTR(CAST(ADD_MONTHS(biz_m1, -12) AS STRING), 6, 2)
  262. ) AS start_ym,
  263. CONCAT(
  264. SUBSTR(CAST(ADD_MONTHS(biz_m1, -1) AS STRING), 1, 4),
  265. SUBSTR(CAST(ADD_MONTHS(biz_m1, -1) AS STRING), 6, 2)
  266. ) AS end_ym
  267. FROM biz_month
  268. ),
  269. cleaned_video_metrics AS (
  270. SELECT
  271. CAST(视频id AS STRING) AS vid,
  272. SUBSTR(CAST(dt AS STRING), 1, 6) AS ym,
  273. rov_t0,
  274. COALESCE(`当日分发曝光pv`, 0) AS day_dist_pv
  275. FROM loghubods.video_dimension_detail_add_column
  276. WHERE SUBSTR(CAST(dt AS STRING), 1, 6) >= (SELECT start_ym FROM month_window)
  277. AND SUBSTR(CAST(dt AS STRING), 1, 6) <= (SELECT end_ym FROM month_window)
  278. AND COALESCE(`当日分发曝光pv`, 0) >= {int(view_pv_count)}
  279. ),
  280. video_monthly_avg_metrics AS (
  281. SELECT
  282. ym,
  283. vid,
  284. AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov,
  285. SUM(day_dist_pv) AS month_total_pv
  286. FROM cleaned_video_metrics
  287. GROUP BY ym, vid
  288. HAVING SUM(day_dist_pv) > {float(month_total_pv_threshold)}
  289. ),
  290. tag_vid_dedup AS (
  291. SELECT DISTINCT
  292. CAST(vid AS STRING) AS vid,
  293. 原始元素
  294. FROM loghubods.dwd_topic_decode_result_detail_di
  295. WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
  296. AND 元素维度 = '实质'
  297. AND 贡献分 >= {float(min_contribution_score)}
  298. ),
  299. element_monthly_metrics AS (
  300. SELECT
  301. t1.原始元素,
  302. t2.ym,
  303. COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS month_avg_rov
  304. FROM tag_vid_dedup t1
  305. JOIN video_monthly_avg_metrics t2
  306. ON t1.vid = t2.vid
  307. GROUP BY t1.原始元素, t2.ym
  308. HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)}
  309. ),
  310. element_total_rov AS (
  311. SELECT
  312. 原始元素,
  313. ROUND(SUM(month_avg_rov), 6) AS avg_rov
  314. FROM element_monthly_metrics
  315. GROUP BY 原始元素
  316. ),
  317. element_vid_dedup AS (
  318. SELECT DISTINCT
  319. em.原始元素,
  320. vm.vid
  321. FROM element_monthly_metrics em
  322. JOIN tag_vid_dedup tv
  323. ON em.原始元素 = tv.原始元素
  324. JOIN video_monthly_avg_metrics vm
  325. ON tv.vid = vm.vid
  326. AND em.ym = vm.ym
  327. ),
  328. element_vid_stats AS (
  329. SELECT
  330. 原始元素,
  331. COUNT(DISTINCT vid) AS vid_count,
  332. COLLECT_SET(vid) AS vid_list
  333. FROM element_vid_dedup
  334. GROUP BY 原始元素
  335. ),
  336. element_freq AS (
  337. SELECT
  338. 原始元素,
  339. COUNT(1) AS 频次
  340. FROM element_monthly_metrics
  341. GROUP BY 原始元素
  342. )
  343. SELECT
  344. '逐月' AS strategy,
  345. md5(CONCAT('逐月', r.原始元素, '{bizdate}')) AS demand_id,
  346. r.原始元素 AS demand_name,
  347. r.avg_rov AS weight,
  348. COALESCE(v.vid_count, 0) AS video_count,
  349. v.vid_list AS video_list,
  350. '{{}}' AS ext_info
  351. FROM element_total_rov r
  352. LEFT JOIN element_vid_stats v
  353. ON r.原始元素 = v.原始元素
  354. LEFT JOIN element_freq f
  355. ON r.原始元素 = f.原始元素
  356. WHERE r.原始元素 NOT IN (
  357. '元旦','腊八节','小年','除夕','春节','正月初一','正月初二','正月初三','正月初四','正月初五',
  358. '情人节','元宵节','龙抬头','妇女节','植树节','劳动节','母亲节','儿童节','端午节','父亲节',
  359. '建党节','建军节','七夕节','中元节','中秋节','国庆节','重阳节','感恩节','公祭日','平安夜',
  360. '圣诞节','小寒','大寒','立春','雨水','惊蛰','春分','清明','谷雨','立夏','小满','芒种',
  361. '夏至','小暑','大暑','立秋','处暑','白露','秋分','寒露','霜降','立冬','小雪','大雪','冬至',
  362. '早上好','中午好','下午好','晚上好','晚安',
  363. '祝福','祝愿','祝你','祝贺','祝大家','祝您','祝好运','祝群主','祝朋友'
  364. )
  365. AND COALESCE(f.频次, 0) >= {int(min_frequency)}
  366. ORDER BY r.avg_rov DESC
  367. """
  368. odps_client = get_odps_client()
  369. instance = odps_client.execute_sql(sql)
  370. rows: list[dict[str, object]] = []
  371. with instance.open_reader(tunnel=True) as reader:
  372. for record in reader:
  373. rows.append(
  374. {
  375. "strategy": record["strategy"],
  376. "demand_id": record["demand_id"],
  377. "demand_name": record["demand_name"],
  378. "weight": _normalize_scalar(record["weight"]),
  379. "video_count": record["video_count"],
  380. "video_list": _serialize_video_list(record["video_list"]),
  381. "ext_info": record["ext_info"],
  382. }
  383. )
  384. return rows