| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464 |
- """实质元素需求检索:通过 ODPS(MaxCompute)执行 Hive SQL。"""
- import json
- import re
- from datetime import date, datetime, timedelta
- from decimal import Decimal
- from zoneinfo import ZoneInfo
- from app.odps.client import get_odps_client
- _DATE_PARTITION_RE = re.compile(r"^\d{8}$")
- _VID_QUERY_RE = re.compile(r"^[0-9a-zA-Z_-]{1,128}$")
- SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
- def _today_shanghai() -> date:
- return datetime.now(SHANGHAI_TZ).date()
- def _same_day_last_year(value: date) -> date:
- try:
- return value.replace(year=value.year - 1)
- except ValueError:
- return date(value.year - 1, 2, 28)
- def _partition_yyyymmdd(value: date) -> str:
- return value.strftime("%Y%m%d")
- def _validate_partition_dt(label: str, value: str) -> str:
- if not _DATE_PARTITION_RE.match(value):
- raise ValueError(f"{label} 须为 YYYYMMDD 分区格式,当前为 {value!r}")
- return value
- def _serialize_video_list(value: object) -> str | None:
- if value is None:
- return None
- if isinstance(value, list):
- return json.dumps(value, ensure_ascii=False)
- return str(value)
- def _normalize_scalar(value: object) -> object:
- if isinstance(value, Decimal):
- return float(value)
- return value
- def query_same_period_last_year_element_demands(
- *,
- period_days: int,
- view_pv_count: int,
- min_contribution_score: float | int,
- rov_avg: float | int,
- ) -> list[dict[str, object]]:
- """去年同期阳历策略下的实质元素需求聚合(ROV 等均值过滤)。
- 分区 dt 范围(YYYYMMDD)由「上海时区下的去年今日」与时间段推算:
- - start_date:去年的今天(与今年同日历月日;闰年 2 月 29 日则退成去年 2 月 28 日)
- - end_date:从 start_date 起连续 period_days 个日历日的最后一天(含起始日)
- demand_id 中的 bizdate 固定为上海时区当天日期(YYYYMMDD),不作为入参。
- 其余参数:
- - period_days:区间天数(含起始日),须 >= 0;为 0 仅含 start_date 当日;为 7 表示含 start_date 在内共 7 天分区
- - view_pv_count:`当日分发曝光pv` 下限
- - min_contribution_score:`贡献分` 下限
- - rov_avg:分组后平均 ROV 下限
- """
- if period_days < 0:
- raise ValueError("period_days 不能为负")
- anchor_last_year = _same_day_last_year(_today_shanghai())
- start_dt = anchor_last_year
- end_dt = anchor_last_year + timedelta(days=max(0, period_days - 1))
- start_date = _validate_partition_dt("start_date", _partition_yyyymmdd(start_dt))
- end_date = _validate_partition_dt("end_date", _partition_yyyymmdd(end_dt))
- bizdate = _partition_yyyymmdd(_today_shanghai())
- if view_pv_count < 0:
- raise ValueError("view_pv_count 不能为负")
- sql = f"""
- WITH cleaned_video_metrics AS
- (
- SELECT CAST(视频id AS STRING) AS vid
- ,rov_t0
- FROM loghubods.video_dimension_detail_add_column
- WHERE dt >= '{start_date}'
- AND dt <= '{end_date}'
- AND COALESCE(`当日分发曝光pv`,0) >= {int(view_pv_count)}
- )
- ,video_avg_metrics AS
- (
- SELECT vid
- ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
- FROM cleaned_video_metrics
- GROUP BY vid
- )
- ,tag_vid_dedup AS
- (
- SELECT DISTINCT CAST(vid AS STRING) AS vid
- ,原始元素
- ,原始元素描述
- ,点类型
- ,元素维度
- ,短语
- ,选题
- ,`extend`
- FROM loghubods.dwd_topic_decode_result_detail_di
- WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
- AND TRIM(原始元素) <> ''
- AND 原始元素 IS NOT NULL
- AND 元素维度 = '实质'
- AND 贡献分 >= {float(min_contribution_score)}
- )
- SELECT '去年同期阳历' AS strategy
- ,md5(CONCAT('去年同期阳历',t1.原始元素,'{bizdate}')) AS demand_id
- ,t1.原始元素 AS demand_name
- ,COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) AS weight
- ,COUNT(DISTINCT t1.vid) AS video_count
- ,COLLECT_SET(t1.vid) AS video_list
- ,'{{}}' AS ext_info
- FROM tag_vid_dedup t1
- LEFT JOIN video_avg_metrics t2
- ON t1.vid = t2.vid
- GROUP BY t1.原始元素
- HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)}
- ORDER BY weight DESC
- """
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- rows: list[dict[str, object]] = []
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- rows.append(
- {
- "strategy": record["strategy"],
- "demand_id": record["demand_id"],
- "demand_name": record["demand_name"],
- "weight": _normalize_scalar(record["weight"]),
- "video_count": record["video_count"],
- "video_list": _serialize_video_list(record["video_list"]),
- "ext_info": record["ext_info"],
- }
- )
- return rows
- def query_same_period_last_year_lunar_element_demands(
- *,
- period_days: int,
- view_pv_count: int,
- min_contribution_score: float | int,
- rov_avg: float | int,
- ) -> list[dict[str, object]]:
- """去年同期阴历策略下的实质元素需求聚合(ROV 等均值过滤)。
- 日期区间由 SQL 内 lunar_to_solar / lunar_add / solar_to_lunar 推算;
- end_date 为 start_date 起连续 period_days 天的最后一天(含首日),与阳历接口语义一致。
- bizdate 为上海时区当天 YYYYMMDD。
- """
- if period_days < 0:
- raise ValueError("period_days 不能为负")
- if view_pv_count < 0:
- raise ValueError("view_pv_count 不能为负")
- bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
- days_after = max(0, period_days - 1)
- sql = f"""
- SET odps.sql.type.system.odps2 = true
- ;
- SET odps.sql.decimal.odps2 = true
- ;
- WITH base_date AS
- (
- SELECT lunar_to_solar(lunar_add(loghubods.solar_to_lunar('{bizdate}','yyyyMMdd'),-1,'Y')) AS start_date
- )
- ,date_params AS
- (
- SELECT start_date
- ,DATE_FORMAT(CAST(DATE_ADD(TO_DATE(start_date,'yyyyMMdd'),{int(days_after)}) AS TIMESTAMP),'yyyyMMdd') AS end_date
- FROM base_date
- )
- ,cleaned_video_metrics AS
- (
- SELECT CAST(d.视频id AS STRING) AS vid
- ,d.rov_t0
- FROM loghubods.video_dimension_detail_add_column d
- WHERE d.dt >= (
- SELECT start_date
- FROM date_params
- )
- AND d.dt <= (
- SELECT end_date
- FROM date_params
- )
- AND COALESCE(d.`当日分发曝光pv`,0) >= {int(view_pv_count)}
- )
- ,video_avg_metrics AS
- (
- SELECT vid
- ,AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov
- FROM cleaned_video_metrics
- GROUP BY vid
- )
- ,tag_vid_dedup AS
- (
- SELECT DISTINCT CAST(vid AS STRING) AS vid
- ,原始元素
- ,原始元素描述
- ,点类型
- ,元素维度
- ,短语
- ,选题
- ,`extend`
- FROM loghubods.dwd_topic_decode_result_detail_di
- WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
- AND TRIM(原始元素) <> ''
- AND 原始元素 IS NOT NULL
- AND 元素维度 = '实质'
- AND 贡献分 >= {float(min_contribution_score)}
- )
- SELECT '去年同期阴历' AS strategy
- ,md5(CONCAT('去年同期阴历',t1.原始元素,'{bizdate}')) AS demand_id
- ,t1.原始元素 AS demand_name
- ,CAST(COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS DECIMAL(20,6)) AS weight
- ,COUNT(DISTINCT t1.vid) AS video_count
- ,COLLECT_SET(t1.vid) AS video_list
- ,'{{}}' AS ext_info
- FROM tag_vid_dedup t1
- LEFT JOIN video_avg_metrics t2
- ON t1.vid = t2.vid
- GROUP BY t1.原始元素
- HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov),6),0) >= {float(rov_avg)}
- ORDER BY weight DESC
- """
- odps_client = get_odps_client()
- # 脚本模式:SET 多条 + SELECT;DECIMAL(p,s) 需 odps.sql.decimal.odps2
- instance = odps_client.execute_sql(
- sql,
- hints={
- "odps.sql.submit.mode": "script",
- "odps.sql.decimal.odps2": "true",
- },
- )
- rows: list[dict[str, object]] = []
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- rows.append(
- {
- "strategy": record["strategy"],
- "demand_id": record["demand_id"],
- "demand_name": record["demand_name"],
- "weight": _normalize_scalar(record["weight"]),
- "video_count": record["video_count"],
- "video_list": _serialize_video_list(record["video_list"]),
- "ext_info": record["ext_info"],
- }
- )
- return rows
- def query_monthly_element_demands(
- *,
- view_pv_count: int,
- month_total_pv_threshold: float | int,
- min_contribution_score: float | int,
- rov_avg: float | int,
- min_frequency: int,
- ) -> list[dict[str, object]]:
- """逐月策略:按 bizdate 所在自然月回溯 12 个月窗口,聚合实质元素需求。"""
- if view_pv_count < 0:
- raise ValueError("view_pv_count 不能为负")
- if float(month_total_pv_threshold) < 0:
- raise ValueError("month_total_pv_threshold 不能为负")
- if min_frequency < 0:
- raise ValueError("min_frequency 不能为负")
- bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
- sql = f"""
- WITH biz_month AS (
- SELECT
- TO_DATE(CONCAT(SUBSTR('{bizdate}', 1, 4), '-', SUBSTR('{bizdate}', 5, 2), '-01')) AS biz_m1
- ),
- month_window AS (
- SELECT
- CONCAT(
- SUBSTR(CAST(ADD_MONTHS(biz_m1, -12) AS STRING), 1, 4),
- SUBSTR(CAST(ADD_MONTHS(biz_m1, -12) AS STRING), 6, 2)
- ) AS start_ym,
- CONCAT(
- SUBSTR(CAST(ADD_MONTHS(biz_m1, -1) AS STRING), 1, 4),
- SUBSTR(CAST(ADD_MONTHS(biz_m1, -1) AS STRING), 6, 2)
- ) AS end_ym
- FROM biz_month
- ),
- cleaned_video_metrics AS (
- SELECT
- CAST(视频id AS STRING) AS vid,
- SUBSTR(CAST(dt AS STRING), 1, 6) AS ym,
- rov_t0,
- COALESCE(`当日分发曝光pv`, 0) AS day_dist_pv
- FROM loghubods.video_dimension_detail_add_column
- WHERE SUBSTR(CAST(dt AS STRING), 1, 6) >= (SELECT start_ym FROM month_window)
- AND SUBSTR(CAST(dt AS STRING), 1, 6) <= (SELECT end_ym FROM month_window)
- AND COALESCE(`当日分发曝光pv`, 0) >= {int(view_pv_count)}
- ),
- video_monthly_avg_metrics AS (
- SELECT
- ym,
- vid,
- AVG(CASE WHEN rov_t0 = 0 THEN NULL ELSE rov_t0 END) AS vid_avg_rov,
- SUM(day_dist_pv) AS month_total_pv
- FROM cleaned_video_metrics
- GROUP BY ym, vid
- HAVING SUM(day_dist_pv) > {float(month_total_pv_threshold)}
- ),
- tag_vid_dedup AS (
- SELECT DISTINCT
- CAST(vid AS STRING) AS vid,
- 原始元素
- FROM loghubods.dwd_topic_decode_result_detail_di
- WHERE dt = MAX_PT('loghubods.dwd_topic_decode_result_detail_di')
- AND 元素维度 = '实质'
- AND 贡献分 >= {float(min_contribution_score)}
- ),
- element_monthly_metrics AS (
- SELECT
- t1.原始元素,
- t2.ym,
- COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) AS month_avg_rov
- FROM tag_vid_dedup t1
- JOIN video_monthly_avg_metrics t2
- ON t1.vid = t2.vid
- GROUP BY t1.原始元素, t2.ym
- HAVING COALESCE(ROUND(AVG(t2.vid_avg_rov), 6), 0) >= {float(rov_avg)}
- ),
- element_total_rov AS (
- SELECT
- 原始元素,
- ROUND(SUM(month_avg_rov), 6) AS avg_rov
- FROM element_monthly_metrics
- GROUP BY 原始元素
- ),
- element_vid_dedup AS (
- SELECT DISTINCT
- em.原始元素,
- vm.vid
- FROM element_monthly_metrics em
- JOIN tag_vid_dedup tv
- ON em.原始元素 = tv.原始元素
- JOIN video_monthly_avg_metrics vm
- ON tv.vid = vm.vid
- AND em.ym = vm.ym
- ),
- element_vid_stats AS (
- SELECT
- 原始元素,
- COUNT(DISTINCT vid) AS vid_count,
- COLLECT_SET(vid) AS vid_list
- FROM element_vid_dedup
- GROUP BY 原始元素
- ),
- element_freq AS (
- SELECT
- 原始元素,
- COUNT(1) AS 频次
- FROM element_monthly_metrics
- GROUP BY 原始元素
- )
- ,element_month_list AS (
- SELECT
- 原始元素,
- TO_JSON(SORT_ARRAY(COLLECT_SET(ym))) AS month_list
- FROM element_monthly_metrics
- GROUP BY 原始元素
- )
- SELECT
- '逐月' AS strategy,
- md5(CONCAT('逐月', r.原始元素, '{bizdate}')) AS demand_id,
- r.原始元素 AS demand_name,
- r.avg_rov AS weight,
- COALESCE(v.vid_count, 0) AS video_count,
- v.vid_list AS video_list,
- ml.month_list AS month_list,
- COALESCE(f.频次, 0) AS frequency,
- '{{}}' AS ext_info
- FROM element_total_rov r
- LEFT JOIN element_vid_stats v
- ON r.原始元素 = v.原始元素
- LEFT JOIN element_freq f
- ON r.原始元素 = f.原始元素
- LEFT JOIN element_month_list ml -- 新增 JOIN
- ON r.原始元素 = ml.原始元素
- WHERE r.原始元素 NOT IN (
- '元旦','腊八节','小年','除夕','春节','正月初一','正月初二','正月初三','正月初四','正月初五',
- '情人节','元宵节','龙抬头','妇女节','植树节','劳动节','母亲节','儿童节','端午节','父亲节',
- '建党节','建军节','七夕节','中元节','中秋节','国庆节','重阳节','感恩节','公祭日','平安夜',
- '圣诞节','小寒','大寒','立春','雨水','惊蛰','春分','清明','谷雨','立夏','小满','芒种',
- '夏至','小暑','大暑','立秋','处暑','白露','秋分','寒露','霜降','立冬','小雪','大雪','冬至',
- '早上好','中午好','下午好','晚上好','晚安',
- '祝福','祝愿','祝你','祝贺','祝大家','祝您','祝好运','祝群主','祝朋友'
- )
- AND COALESCE(f.频次, 0) >= {int(min_frequency)}
- ORDER BY r.avg_rov DESC
- """
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- rows: list[dict[str, object]] = []
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- rows.append(
- {
- "strategy": record["strategy"],
- "demand_id": record["demand_id"],
- "demand_name": record["demand_name"],
- "weight": _normalize_scalar(record["weight"]),
- "video_count": record["video_count"],
- "video_list": _serialize_video_list(record["video_list"]),
- "month_list": _serialize_video_list(record["month_list"]),
- "frequency": _normalize_scalar(record["frequency"]),
- "ext_info": record["ext_info"],
- }
- )
- return rows
- def query_video_decode_url2_for_today(vid: str) -> str | None:
- """按上海时区当天分区,从 dwd_topic_decode_result_di 取视频解构页 url2。"""
- vid_clean = vid.strip()
- if not _VID_QUERY_RE.match(vid_clean):
- raise ValueError("vid 须为 1~128 位字母、数字、下划线或连字符")
- bizdate = _validate_partition_dt("bizdate", _partition_yyyymmdd(_today_shanghai()))
- sql = f"""
- SELECT url2
- FROM loghubods.dwd_topic_decode_result_di
- WHERE dt = '{bizdate}'
- AND CAST(vid AS STRING) = '{vid_clean}'
- LIMIT 1
- """
- odps_client = get_odps_client()
- instance = odps_client.execute_sql(sql)
- with instance.open_reader(tunnel=True) as reader:
- for record in reader:
- raw = record["url2"]
- if raw is None:
- return None
- text_val = str(raw).strip()
- return text_val if text_val else None
- return None
|