| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- from odps import ODPS
- from odps.errors import ODPSError
- from datetime import date, timedelta
- import json
- from pathlib import Path
- from agent import tool
- from examples.demand.mysql import mysql_db
- def get_odps_data(sql):
- # 配置信息
- access_id = 'LTAI9EBa0bd5PrDa'
- access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
- project = 'loghubods'
- endpoint = 'http://service.odps.aliyun.com/api'
- # 1. 初始化 ODPS 入口
- o = ODPS(access_id, access_key, project, endpoint=endpoint)
- try:
- # 2. 执行 SQL 并获取结果
- # execute_sql 会等待任务完成,使用 open_reader 读取数据
- with o.execute_sql(sql).open_reader() as reader:
- # reader 类似于 Java 中的 List<Record>
- # 我们可以直接将其转换为 Python 的 list
- records = [record for record in reader]
- return records
- except ODPSError as e:
- print(f"ODPS 错误: {e}")
- return None
- def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
- merge_level_in_clause = f"'{merge_leve2}'"
- video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
- end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
- start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
- sql_query = f'''
- SELECT
- v.videoid,
- CASE
- WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
- ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
- END AS avg_rov_t0
- FROM
- (
- SELECT
- t2.videoid,
- t2.merge_leve2
- FROM videoods.content_profile t1
- JOIN loghubods.video_merge_tag t2
- ON t1.content_id = t2.videoid
- WHERE
- t1.status = 3
- AND t1.is_deleted = 0
- AND t2.merge_leve2 IN ({merge_level_in_clause})
- ) v
- LEFT JOIN loghubods.video_dimension_detail_add_column t3
- ON v.videoid = t3.视频id
- AND t3.dt >= '{start_date}'
- AND t3.dt <= '{end_date}'
- WHERE v.videoid in ({video_ids_in_clause})
- GROUP BY
- v.videoid
- ;
- '''
- data = get_odps_data(sql_query)
- result_dict = {}
- if data:
- result_dict = {r[0]: r[1] for r in data}
- return result_dict
- def get_changwen_weight(account_name):
- bizdatemax_date = date.today() - timedelta(days=1)
- bizdatemin_date = bizdatemax_date - timedelta(days=30)
- bizdatemax = bizdatemax_date.strftime("%Y%m%d")
- bizdatemin = bizdatemin_date.strftime("%Y%m%d")
- sql_query = f'''
- SELECT
- 公众号名
- ,videoid
- ,一级品类
- ,二级品类
- ,头部曝光
- ,头部曝光uv
- ,头部realplay
- ,头部realplay_uv
- ,头部分享
- ,头部分享uv
- ,头部回流人数 AS 头部回流数
- ,推荐曝光数
- ,当日分发曝光uv
- ,推荐realplay
- ,分发realplay_uv
- ,推荐分享数
- ,当日分发分享uv
- ,推荐回流数
- ,当日回流进入分发曝光次数 AS vov分子
- FROM (
- SELECT DISTINCT a.公众号名
- ,a.videoid
- ,e.merge_leve1 AS 一级品类
- ,e.merge_leve2 AS 二级品类
- ,a.title
- ,a.进入分发人数
- ,头部曝光pv AS 头部曝光
- ,头部realplay_pv AS 头部realplay
- ,头部分享pv AS 头部分享
- ,a.当日分发曝光pv AS 推荐曝光数
- ,a.当日分发播放pv
- ,分发realplay_pv AS 推荐realplay
- ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv
- ,当日分发播放uv
- ,c.realplay_uv AS 分发真实播uv
- ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv
- ,a.当日分发分享pv AS 推荐分享数
- ,a.当日分发分享pv / a.当日分发曝光pv AS str
- ,NVL(b.当日分发回流人数,0) AS 推荐回流数
- ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数
- ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数
- ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子
- ,d.头部回流人数
- ,当日分发曝光uv
- ,头部曝光uv
- ,当日分发分享uv
- ,头部分享uv
- ,分发realplay_uv
- ,头部realplay_uv
- FROM (
- SELECT account_name AS 公众号名
- ,videoid
- ,title
- ,COUNT(DISTINCT mid) AS 进入分发人数
- ,COUNT(
- CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
- ) AS 当日分发曝光pv
- ,COUNT(DISTINCT
- CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END
- ) AS 当日分发曝光uv
- ,COUNT(
- CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
- ) AS 头部曝光pv
- ,COUNT(DISTINCT
- CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END
- ) AS 头部曝光uv
- ,COUNT(
- CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
- ) AS 当日分发播放pv
- ,COUNT(DISTINCT
- CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END
- ) AS 当日分发播放uv
- ,COUNT(
- CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
- ) AS 当日分发分享pv
- ,COUNT(DISTINCT
- CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END
- ) AS 当日分发分享uv
- ,COUNT(
- CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
- ) AS 头部分享pv
- ,COUNT(DISTINCT
- CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END
- ) AS 头部分享uv
- FROM (
- SELECT DISTINCT a.mid
- ,a.videoid
- ,a.businesstype
- ,a.pagesource
- ,a.subsessionid
- ,account_name
- ,e.title
- FROM loghubods.video_action_log_rp a
- LEFT JOIN loghubods.user_wechat_identity_info_ha b
- ON a.mid = CONCAT('weixin_openid_',b.open_id)
- AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
- LEFT JOIN loghubods.gzh_fans_info d
- ON b.union_id = d.union_id
- AND d.dt = MAX_PT("loghubods.gzh_fans_info")
- LEFT JOIN videoods.wx_video e
- ON a.videoid = e.id
- WHERE a.dt >= '{bizdatemin}'
- AND a.dt <= '{bizdatemax}'
- AND businesstype IN ('videoView','videoPlay','videoShareFriend')
- AND d.user_create_time IS NOT NULL
- AND account_name = '{account_name}'
- AND a.videoid IN (
- SELECT
- DISTINCT content_id AS videoid
- FROM
- videoods.content_profile
- WHERE status=3
- AND is_deleted = 0
-
- )
- ) t
- GROUP BY 公众号名
- ,videoid
- ,title
- ) a
- LEFT JOIN (
- SELECT t.account_name AS 公众号名
- ,t.videoid
- ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数
- ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数
- ,COUNT(v.mid) AS 当日回流进入分发曝光次数
- FROM (
- SELECT DISTINCT a.subsessionid
- ,a.videoid
- ,a.mid
- ,d.account_name
- ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid
- FROM loghubods.video_action_log_rp a
- LEFT JOIN loghubods.user_wechat_identity_info_ha b
- ON a.mid = CONCAT('weixin_openid_',b.open_id)
- AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
- LEFT JOIN loghubods.gzh_fans_info d
- ON b.union_id = d.union_id
- AND d.dt = MAX_PT("loghubods.gzh_fans_info")
- WHERE a.dt >= '{bizdatemin}'
- AND a.dt <= '{bizdatemax}'
- AND a.businesstype = 'videoShareFriend'
- AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
- AND d.user_create_time IS NOT NULL
- AND d.account_name = '{account_name}'
- ) t
- LEFT JOIN (
- SELECT DISTINCT subsessionid
- ,machinecode
- ,recomtraceid
- ,clickobjectid
- FROM loghubods.user_share_log
- WHERE dt >= '{bizdatemin}'
- AND dt <= '{bizdatemax}'
- AND topic = 'click'
- ) s
- ON t.recomtraceid = s.recomtraceid
- AND t.videoid = s.clickobjectid
- LEFT JOIN (
- SELECT subsessionid
- ,mid
- ,videoid
- FROM loghubods.video_action_log_rp
- WHERE dt >= '{bizdatemin}'
- AND dt <= '{bizdatemax}'
- AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$'
- AND businesstype = 'videoView'
- ) v
- ON s.subsessionid = v.subsessionid
- AND s.machinecode = v.mid
- GROUP BY account_name
- ,t.videoid
- ) b
- ON a.公众号名 = b.公众号名
- AND a.videoid = b.videoid
- LEFT JOIN (
- SELECT d.account_name AS 公众号名
- ,a.videoid
- ,COUNT(DISTINCT a.mid) AS realplay_uv
- ,COUNT(
- CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
- ) AS 分发realplay_pv
- ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv
- ,COUNT(DISTINCT
- CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END
- ) AS 分发realplay_uv
- ,COUNT(DISTINCT CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_uv
- FROM loghubods.ods_video_play_log_day a
- LEFT JOIN (
- SELECT DISTINCT open_id
- ,union_id
- FROM loghubods.user_wechat_identity_info_ha
- WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
- ) b
- ON a.mid = CONCAT('weixin_openid_',b.open_id)
- LEFT JOIN loghubods.gzh_fans_info d
- ON b.union_id = d.union_id
- AND d.dt = MAX_PT("loghubods.gzh_fans_info")
- WHERE a.dt >= '{bizdatemin}'
- AND a.dt <= '{bizdatemax}'
- AND a.businesstype = 'videoRealPlay'
- AND d.user_create_time IS NOT NULL
- AND d.account_name = '{account_name}'
- GROUP BY d.account_name
- ,a.videoid
- ORDER BY 分发realplay_pv DESC
- ) c
- ON a.公众号名 = c.公众号名
- AND a.videoid = c.videoid
- LEFT JOIN (
- SELECT t.account_name AS 公众号名
- ,t.videoid
- ,COUNT(DISTINCT s.machinecode) AS 头部回流人数
- FROM (
- SELECT DISTINCT a.shareobjectid AS videoid
- ,a.shareid
- ,a.machinecode
- ,d.account_name
- FROM loghubods.user_share_log a
- LEFT JOIN loghubods.user_wechat_identity_info_ha b
- ON a.machinecode = CONCAT('weixin_openid_',b.open_id)
- AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha")
- LEFT JOIN loghubods.gzh_fans_info d
- ON b.union_id = d.union_id
- AND d.dt = MAX_PT("loghubods.gzh_fans_info")
- WHERE a.dt >= '{bizdatemin}'
- AND a.dt <= '{bizdatemax}'
- AND a.topic = 'share'
- AND a.pagesource REGEXP 'pages/user-videos-share$'
- AND d.user_create_time IS NOT NULL
- AND d.account_name = '{account_name}'
- ) t
- LEFT JOIN (
- SELECT DISTINCT shareid
- ,machinecode
- ,clickobjectid
- FROM loghubods.user_share_log
- WHERE dt >= '{bizdatemin}'
- AND dt <= '{bizdatemax}'
- AND topic = 'click'
- ) s
- ON t.shareid = s.shareid
- GROUP BY account_name
- ,t.videoid
- ) d
- ON a.公众号名 = d.公众号名
- AND a.videoid = d.videoid
- LEFT JOIN loghubods.video_merge_tag e
- ON a.videoid = e.videoid
- )
- ORDER BY 推荐曝光数 DESC
- '''
- result_list = []
- data = get_odps_data(sql_query)
- if data:
- for r in data:
- result_list.append(
- {
- "account_name": r[0],
- "videoid": r[1],
- "一级品类": r[2],
- "二级品类": r[3],
- "ext_data": {
- "头部曝光": r[4],
- "头部曝光uv": r[5],
- "头部realplay": r[6],
- "头部realplay_uv": r[7],
- "头部分享": r[8],
- "头部分享uv": r[9],
- "头部回流数": r[10],
- "推荐曝光数": r[11],
- "当日分发曝光uv": r[12],
- "推荐realplay": r[13],
- "分发realplay_uv": r[14],
- "推荐分享数": r[15],
- "当日分发分享uv": r[16],
- "推荐回流数": r[17],
- "vov分子": r[18],
- },
- }
- )
- # 输出到 examples/demand/data/changwen_data/
- output_dir = Path(__file__).parent / "data" / "changwen_data"
- output_dir.mkdir(parents=True, exist_ok=True)
- output_file = output_dir / f"{account_name}.json"
- with output_file.open("w", encoding="utf-8") as f:
- json.dump(result_list, f, ensure_ascii=False, indent=2)
- return result_list
- def get_merge_leve2_by_video_ids(video_ids, batch_size=2000):
- result = {}
- if not video_ids:
- return result
- normalized_ids = [str(video_id) for video_id in video_ids if video_id is not None]
- for i in range(0, len(normalized_ids), batch_size):
- batch_ids = normalized_ids[i:i + batch_size]
- escaped_ids = [video_id.replace("'", "''") for video_id in batch_ids]
- video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in escaped_ids])
- sql_query = f'''
- SELECT videoid, merge_leve2
- FROM loghubods.video_merge_tag
- WHERE videoid IN ({video_ids_in_clause})
- '''
- data = get_odps_data(sql_query)
- if not data:
- continue
- for row in data:
- result[str(row[0])] = row[1]
- return result
- def get_all_decode_task_result_rows():
- return mysql_db.select(
- "workflow_decode_task_result",
- columns="id, channel_content_id, merge_leve2",
- )
- def update_decode_task_result_merge_leve2(channel_content_id, merge_leve2):
- return mysql_db.update(
- "workflow_decode_task_result",
- {"merge_leve2": str(merge_leve2)},
- "channel_content_id = %s",
- (str(channel_content_id),),
- )
- def backfill_merge_leve2_for_decode_task_result():
- rows = get_all_decode_task_result_rows()
- updated_count = 0
- skipped_count = 0
- valid_content_ids = []
- for row in rows:
- channel_content_id = row.get("channel_content_id")
- if channel_content_id is None:
- skipped_count += 1
- continue
- channel_content_id = str(channel_content_id)
- if len(channel_content_id) > 8:
- skipped_count += 1
- continue
- valid_content_ids.append(channel_content_id)
- merge_leve2_map = get_merge_leve2_by_video_ids(valid_content_ids, batch_size=2000)
- for channel_content_id in valid_content_ids:
- merge_leve2 = merge_leve2_map.get(channel_content_id)
- if not merge_leve2:
- continue
- affected = update_decode_task_result_merge_leve2(channel_content_id, merge_leve2)
- if affected > 0:
- updated_count += affected
- return {
- "total": len(rows),
- "updated": updated_count,
- "skipped": skipped_count,
- }
- if __name__ == '__main__':
- backfill_merge_leve2_for_decode_task_result()
|