"""实质元素需求检索:通过 ODPS(MaxCompute)执行 Hive SQL。""" import json import re from datetime import date, datetime, timedelta from decimal import Decimal from zoneinfo import ZoneInfo from app.odps.client import get_odps_client _DATE_PARTITION_RE = re.compile(r"^\d{8}$") _VID_QUERY_RE = re.compile(r"^[0-9a-zA-Z_-]{1,128}$") SHANGHAI_TZ = ZoneInfo("Asia/Shanghai") def _today_shanghai() -> date: return datetime.now(SHANGHAI_TZ).date() def _same_day_last_year(value: date) -> date: try: return value.replace(year=value.year - 1) except ValueError: return date(value.year - 1, 2, 28) def _partition_yyyymmdd(value: date) -> str: return value.strftime("%Y%m%d") def _validate_partition_dt(label: str, value: str) -> str: if not _DATE_PARTITION_RE.match(value): raise ValueError(f"{label} 须为 YYYYMMDD 分区格式,当前为 {value!r}") return value def _serialize_video_list(value: object) -> str | None: if value is None: return None if isinstance(value, list): return json.dumps(value, ensure_ascii=False) return str(value) def _normalize_scalar(value: object) -> object: if isinstance(value, Decimal): return float(value) return value def query_same_period_last_year_element_demands( *, period_days: int, view_pv_count: int, min_contribution_score: float | int, rov_avg: float | int, ) -> list[dict[str, object]]: """去年同期阳历策略下的实质元素需求聚合(ROV 等均值过滤)。 分区 dt 范围(YYYYMMDD)由「上海时区下的去年今日」与时间段推算: - start_date:去年的今天(与今年同日历月日;闰年 2 月 29 日则退成去年 2 月 28 日) - end_date:从 start_date 起连续 period_days 个日历日的最后一天(含起始日) demand_id 中的 bizdate 固定为上海时区当天日期(YYYYMMDD),不作为入参。 其余参数: - period_days:区间天数(含起始日),须 >= 0;为 0 仅含 start_date 当日;为 7 表示含 start_date 在内共 7 天分区 - view_pv_count:`当日分发曝光pv` 下限 - min_contribution_score:`贡献分` 下限 - rov_avg:分组后平均 ROV 下限 """ if period_days < 0: raise ValueError("period_days 不能为负") anchor_last_year = _same_day_last_year(_today_shanghai()) start_dt = anchor_last_year end_dt = anchor_last_year + timedelta(days=max(0, period_days - 1)) start_date = _validate_partition_dt("start_date", _partition_yyyymmdd(start_dt)) end_date = _validate_partition_dt("end_date", _partition_yyyymmdd(end_dt)) bizdate = _partition_yyyymmdd(_today_shanghai()) if view_pv_count < 0: raise ValueError("view_pv_count 不能为负") sql = f""" WITH cleaned_video_metrics AS ( SELECT CAST(视频id AS STRING) AS vid ,rov_t0 FROM loghubods.video_dimension_detail_add_column WHERE dt >= '{start_date}' AND dt <= '{end_date}' AND COALESCE(`当日分发曝光pv`,0) >= {int(view_pv_count)} ) ,video_avg_metrics AS ( SELECT vid ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov FROM cleaned_video_metrics GROUP BY vid ) ,tag_vid_dedup AS ( SELECT DISTINCT CAST(vid AS STRING) AS vid ,原始元素 ,原始元素描述 ,点类型 ,元素维度 ,短语 ,选题 ,`extend` FROM loghubods.dwd_topic_decode_result_detail_di WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di') AND TRIM(原始元素) <> '' AND 原始元素 IS NOT NULL AND 元素维度 = '实质' AND 贡献分 >= {float(min_contribution_score)} ) SELECT '去年同期阳历' AS strategy ,md5(CONCAT('去年同期阳历',t1.原始元素,'{bizdate}')) AS demand_id ,t1.原始元素 AS demand_name ,COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) AS weight ,COUNT(DISTINCT t1.vid) AS video_count ,COLLECT_SET(t1.vid) AS video_list ,'{{}}' AS ext_info FROM tag_vid_dedup t1 LEFT JOIN video_avg_metrics t2 ON t1.vid = t2.vid GROUP BY t1.原始元素 HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)} ORDER BY weight DESC """ odps_client = get_odps_client() instance = odps_client.execute_sql(sql) rows: list[dict[str, object]] = [] with instance.open_reader(tunnel=True) as reader: for record in reader: rows.append( { "strategy": record["strategy"], "demand_id": record["demand_id"], "demand_name": record["demand_name"], "weight": _normalize_scalar(record["weight"]), "video_count": record["video_count"], "video_list": _serialize_video_list(record["video_list"]), "ext_info": record["ext_info"], } ) return rows def query_same_period_last_year_lunar_element_demands( *, period_days: int, view_pv_count: int, min_contribution_score: float | int, rov_avg: float | int, ) -> list[dict[str, object]]: """去年同期阴历策略下的实质元素需求聚合(ROV 等均值过滤)。 日期区间由 SQL 内 lunar_to_solar / lunar_add / solar_to_lunar 推算; end_date 为 start_date 起连续 period_days 天的最后一天(含首日),与阳历接口语义一致。 bizdate 为上海时区当天 YYYYMMDD。 """ if period_days < 0: raise ValueError("period_days 不能为负") if view_pv_count < 0: raise ValueError("view_pv_count 不能为负") bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai())) days_after = max(0, period_days - 1) sql = f""" SET odps.sql.type.system.odps2 = true ; SET odps.sql.decimal.odps2 = true ; WITH base_date AS ( SELECT lunar_to_solar(lunar_add(loghubods.solar_to_lunar('{bizdate}','yyyyMMdd'),-1,'Y')) AS start_date ) ,date_params AS ( SELECT start_date ,DATE_FORMAT(CAST(DATE_ADD(TO_DATE(start_date,'yyyyMMdd'),{int(days_after)}) AS TIMESTAMP),'yyyyMMdd') AS end_date FROM base_date ) ,cleaned_video_metrics AS ( SELECT CAST(d.视频id AS STRING) AS vid ,d.rov_t0 FROM loghubods.video_dimension_detail_add_column d WHERE d.dt >= ( SELECT start_date FROM date_params ) AND d.dt <= ( SELECT end_date FROM date_params ) AND COALESCE(d.`当日分发曝光pv`,0) >= {int(view_pv_count)} ) ,video_avg_metrics AS ( SELECT vid ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov FROM cleaned_video_metrics GROUP BY vid ) ,tag_vid_dedup AS ( SELECT DISTINCT CAST(vid AS STRING) AS vid ,原始元素 ,原始元素描述 ,点类型 ,元素维度 ,短语 ,选题 ,`extend` FROM loghubods.dwd_topic_decode_result_detail_di WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di') AND TRIM(原始元素) <> '' AND 原始元素 IS NOT NULL AND 元素维度 = '实质' AND 贡献分 >= {float(min_contribution_score)} ) SELECT '去年同期阴历' AS strategy ,md5(CONCAT('去年同期阴历',t1.原始元素,'{bizdate}')) AS demand_id ,t1.原始元素 AS demand_name ,CAST(COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS DECIMAL(20,6)) AS weight ,COUNT(DISTINCT t1.vid) AS video_count ,COLLECT_SET(t1.vid) AS video_list ,'{{}}' AS ext_info FROM tag_vid_dedup t1 LEFT JOIN video_avg_metrics t2 ON t1.vid = t2.vid GROUP BY t1.原始元素 HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)} ORDER BY weight DESC """ odps_client = get_odps_client() # 脚本模式:SET 多条 + SELECT;DECIMAL(p,s) 需 odps.sql.decimal.odps2 instance = odps_client.execute_sql( sql, hints={ "odps.sql.submit.mode": "script", "odps.sql.decimal.odps2": "true", }, ) rows: list[dict[str, object]] = [] with instance.open_reader(tunnel=True) as reader: for record in reader: rows.append( { "strategy": record["strategy"], "demand_id": record["demand_id"], "demand_name": record["demand_name"], "weight": _normalize_scalar(record["weight"]), "video_count": record["video_count"], "video_list": _serialize_video_list(record["video_list"]), "ext_info": record["ext_info"], } ) return rows def query_monthly_element_demands( *, view_pv_count: int, month_total_pv_threshold: float | int, min_contribution_score: float | int, rov_avg: float | int, min_frequency: int, ) -> list[dict[str, object]]: """逐月策略:按 bizdate 所在自然月回溯 12 个月窗口,聚合实质元素需求。""" if view_pv_count < 0: raise ValueError("view_pv_count 不能为负") if float(month_total_pv_threshold) < 0: raise ValueError("month_total_pv_threshold 不能为负") if min_frequency < 0: raise ValueError("min_frequency 不能为负") bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai())) sql = f""" WITH biz_day AS ( SELECT TO_DATE(CONCAT(SUBSTR('{bizdate}', 1, 4), '-', SUBSTR('{bizdate}', 5, 2), '-', SUBSTR('{bizdate}', 7, 2))) AS biz_dt ), yesterday AS ( SELECT DATE_SUB((SELECT biz_dt FROM biz_day), 1) AS yest ), window_bounds AS ( SELECT CAST((SELECT yest FROM yesterday) AS DATETIME) AS end_dt, CAST(DATE_SUB((SELECT yest FROM yesterday), 359) AS DATETIME) AS start_dt ), cleaned_video_metrics AS ( SELECT CAST(视频id AS STRING) AS vid, CAST(FLOOR(DATEDIFF( (SELECT yest FROM yesterday), TO_DATE(REGEXP_REPLACE(CAST(dt AS STRING), '-', ''), 'yyyyMMdd') ) / 30) AS STRING) AS ym, CONCAT( REGEXP_REPLACE(CAST(DATE_SUB( (SELECT yest FROM yesterday), CAST(FLOOR(DATEDIFF( (SELECT yest FROM yesterday), TO_DATE(REGEXP_REPLACE(CAST(dt AS STRING), '-', ''), 'yyyyMMdd') ) / 30) * 30 + 29 AS INT) ) AS STRING), '-', ''), '~', REGEXP_REPLACE(CAST(DATE_SUB( (SELECT yest FROM yesterday), CAST(FLOOR(DATEDIFF( (SELECT yest FROM yesterday), TO_DATE(REGEXP_REPLACE(CAST(dt AS STRING), '-', ''), 'yyyyMMdd') ) / 30) * 30 AS INT) ) AS STRING), '-', '') ) AS ym_range, rov_t0, COALESCE(`当日分发曝光pv`, 0) AS day_dist_pv FROM loghubods.video_dimension_detail_add_column WHERE TO_DATE(REGEXP_REPLACE(CAST(dt AS STRING), '-', ''), 'yyyyMMdd') BETWEEN (SELECT start_dt FROM window_bounds) AND (SELECT end_dt FROM window_bounds) AND COALESCE(`当日分发曝光pv`, 0) >= {int(view_pv_count)} ), video_monthly_avg_metrics AS ( SELECT ym, MAX(ym_range) AS ym_range, vid, AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov, SUM(day_dist_pv) AS month_total_pv FROM cleaned_video_metrics GROUP BY ym, vid HAVING SUM(day_dist_pv) > {float(month_total_pv_threshold)} ), tag_vid_dedup AS ( SELECT DISTINCT CAST(vid AS STRING) AS vid, 原始元素 FROM loghubods.dwd_topic_decode_result_detail_di WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di') AND 元素维度 = '实质' AND 贡献分 >= {float(min_contribution_score)} ), element_monthly_metrics AS ( SELECT t1.原始元素, t2.ym, COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS month_avg_rov FROM tag_vid_dedup t1 JOIN video_monthly_avg_metrics t2 ON t1.vid = t2.vid GROUP BY t1.原始元素, t2.ym HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)} ), element_total_rov AS ( SELECT 原始元素, ROUND(SUM(month_avg_rov), 6) AS avg_rov FROM element_monthly_metrics GROUP BY 原始元素 ), element_vid_dedup AS ( SELECT DISTINCT em.原始元素, vm.vid FROM element_monthly_metrics em JOIN tag_vid_dedup tv ON em.原始元素 = tv.原始元素 JOIN video_monthly_avg_metrics vm ON tv.vid = vm.vid AND em.ym = vm.ym ), element_vid_stats AS ( SELECT 原始元素, COUNT(DISTINCT vid) AS vid_count, COLLECT_SET(vid) AS vid_list FROM element_vid_dedup GROUP BY 原始元素 ), element_freq AS ( SELECT 原始元素, COUNT(1) AS 频次 FROM element_monthly_metrics GROUP BY 原始元素 ), element_month_list AS ( SELECT em.原始元素, TO_JSON(SORT_ARRAY(COLLECT_SET(vm.ym_range))) AS month_list FROM element_monthly_metrics em JOIN video_monthly_avg_metrics vm ON em.ym = vm.ym GROUP BY em.原始元素 ) SELECT '逐月' AS strategy, md5(CONCAT('逐月', r.原始元素, '{bizdate}')) AS demand_id, r.原始元素 AS demand_name, r.avg_rov AS weight, COALESCE(v.vid_count, 0) AS video_count, v.vid_list AS video_list, ml.month_list AS month_list, COALESCE(f.频次, 0) AS frequency, '{{}}' AS ext_info FROM element_total_rov r LEFT JOIN element_vid_stats v ON r.原始元素 = v.原始元素 LEFT JOIN element_freq f ON r.原始元素 = f.原始元素 LEFT JOIN element_month_list ml ON r.原始元素 = ml.原始元素 WHERE r.原始元素 NOT IN ( '元旦','腊八节','小年','除夕','春节','正月初一','正月初二','正月初三','正月初四','正月初五', '情人节','元宵节','龙抬头','妇女节','植树节','劳动节','母亲节','儿童节','端午节','父亲节', '建党节','建军节','七夕节','中元节','中秋节','国庆节','重阳节','感恩节','公祭日','平安夜', '圣诞节','小寒','大寒','立春','雨水','惊蛰','春分','清明','谷雨','立夏','小满','芒种', '夏至','小暑','大暑','立秋','处暑','白露','秋分','寒露','霜降','立冬','小雪','大雪','冬至', '早上好','中午好','下午好','晚上好','晚安', '祝福','祝愿','祝你','祝贺','祝大家','祝您','祝好运','祝群主','祝朋友' ) AND COALESCE(f.频次, 0) >= {int(min_frequency)} ORDER BY r.avg_rov DESC """ odps_client = get_odps_client() instance = odps_client.execute_sql(sql) rows: list[dict[str, object]] = [] with instance.open_reader(tunnel=True) as reader: for record in reader: rows.append( { "strategy": record["strategy"], "demand_id": record["demand_id"], "demand_name": record["demand_name"], "weight": _normalize_scalar(record["weight"]), "video_count": record["video_count"], "video_list": _serialize_video_list(record["video_list"]), "month_list": _serialize_video_list(record["month_list"]), "frequency": _normalize_scalar(record["frequency"]), "ext_info": record["ext_info"], } ) return rows def query_video_decode_url2_for_today(vid: str) -> str | None: """按上海时区当天分区,从 dwd_topic_decode_result_di 取视频解构页 url2。""" vid_clean = vid.strip() if not _VID_QUERY_RE.match(vid_clean): raise ValueError("vid 须为 1~128 位字母、数字、下划线或连字符") bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai())) sql = f""" SELECT url2 FROM loghubods.dwd_topic_decode_result_di WHERE dt = '{bizdate}' AND CAST(vid AS STRING) = '{vid_clean}' LIMIT 1 """ odps_client = get_odps_client() instance = odps_client.execute_sql(sql) with instance.open_reader(tunnel=True) as reader: for record in reader: raw = record["url2"] if raw is None: return None text_val = str(raw).strip() return text_val if text_val else None return None