from odps import ODPS from odps.errors import ODPSError from datetime import date, timedelta import json from pathlib import Path from agent import tool def get_odps_data(sql): # 配置信息 access_id = 'LTAI9EBa0bd5PrDa' access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5' project = 'loghubods' endpoint = 'http://service.odps.aliyun.com/api' # 1. 初始化 ODPS 入口 o = ODPS(access_id, access_key, project, endpoint=endpoint) try: # 2. 执行 SQL 并获取结果 # execute_sql 会等待任务完成,使用 open_reader 读取数据 with o.execute_sql(sql).open_reader() as reader: # reader 类似于 Java 中的 List # 我们可以直接将其转换为 Python 的 list records = [record for record in reader] return records except ODPSError as e: print(f"ODPS 错误: {e}") return None def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids): merge_level_in_clause = f"'{merge_leve2}'" video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids]) end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d") start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d") sql_query = f''' SELECT v.videoid, CASE WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0 ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0) END AS avg_rov_t0 FROM ( SELECT t2.videoid, t2.merge_leve2 FROM videoods.content_profile t1 JOIN loghubods.video_merge_tag t2 ON t1.content_id = t2.videoid WHERE t1.status = 3 AND t1.is_deleted = 0 AND t2.merge_leve2 IN ({merge_level_in_clause}) ) v LEFT JOIN loghubods.video_dimension_detail_add_column t3 ON v.videoid = t3.视频id AND t3.dt >= '{start_date}' AND t3.dt <= '{end_date}' WHERE v.videoid in ({video_ids_in_clause}) GROUP BY v.videoid ; ''' data = get_odps_data(sql_query) result_dict = {} if data: result_dict = {r[0]: r[1] for r in data} return result_dict def get_changwen_weight(account_name): bizdatemax_date = date.today() - timedelta(days=1) bizdatemin_date = bizdatemax_date - timedelta(days=30) bizdatemax = bizdatemax_date.strftime("%Y%m%d") bizdatemin = bizdatemin_date.strftime("%Y%m%d") sql_query = f''' SELECT 公众号名 ,videoid ,一级品类 ,二级品类 ,头部曝光 ,头部realplay ,头部分享 ,头部回流人数 AS 头部回流数 ,推荐曝光数 ,推荐realplay ,推荐分享数 ,推荐回流数 ,当日回流进入分发曝光次数 AS vov分子 FROM ( SELECT DISTINCT a.公众号名 ,a.videoid ,e.merge_leve1 AS 一级品类 ,e.merge_leve2 AS 二级品类 ,a.title ,a.进入分发人数 ,头部曝光pv AS 头部曝光 ,头部realplay_pv AS 头部realplay ,头部分享pv AS 头部分享 ,a.当日分发曝光pv AS 推荐曝光数 ,a.当日分发播放pv ,分发realplay_pv AS 推荐realplay ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv ,当日分发播放uv ,c.realplay_uv AS 分发真实播uv ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv ,a.当日分发分享pv AS 推荐分享数 ,a.当日分发分享pv / a.当日分发曝光pv AS str ,NVL(b.当日分发回流人数,0) AS 推荐回流数 ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数 ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数 ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子 ,d.头部回流人数 FROM ( SELECT account_name AS 公众号名 ,videoid ,title ,COUNT(DISTINCT mid) AS 进入分发人数 ,COUNT( CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END ) AS 当日分发曝光pv ,COUNT( CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END ) AS 头部曝光pv ,COUNT( CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END ) AS 当日分发播放pv ,COUNT(DISTINCT CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END ) AS 当日分发播放uv ,COUNT( CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END ) AS 当日分发分享pv ,COUNT( CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END ) AS 头部分享pv FROM ( SELECT DISTINCT a.mid ,a.videoid ,a.businesstype ,a.pagesource ,a.subsessionid ,account_name ,e.title FROM loghubods.video_action_log_rp a LEFT JOIN loghubods.user_wechat_identity_info_ha b ON a.mid = CONCAT('weixin_openid_',b.open_id) AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha") LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") LEFT JOIN videoods.wx_video e ON a.videoid = e.id WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND businesstype IN ('videoView','videoPlay','videoShareFriend') AND d.user_create_time IS NOT NULL AND account_name = '{account_name}' AND a.videoid IN ( SELECT DISTINCT content_id AS videoid FROM videoods.content_profile WHERE status=3 AND is_deleted = 0 ) ) t GROUP BY 公众号名 ,videoid ,title ) a LEFT JOIN ( SELECT t.account_name AS 公众号名 ,t.videoid ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数 ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数 ,COUNT(v.mid) AS 当日回流进入分发曝光次数 FROM ( SELECT DISTINCT a.subsessionid ,a.videoid ,a.mid ,d.account_name ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid FROM loghubods.video_action_log_rp a LEFT JOIN loghubods.user_wechat_identity_info_ha b ON a.mid = CONCAT('weixin_openid_',b.open_id) AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha") LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND a.businesstype = 'videoShareFriend' AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND d.user_create_time IS NOT NULL AND d.account_name = '{account_name}' ) t LEFT JOIN ( SELECT DISTINCT subsessionid ,machinecode ,recomtraceid ,clickobjectid FROM loghubods.user_share_log WHERE dt >= '{bizdatemin}' AND dt <= '{bizdatemax}' AND topic = 'click' ) s ON t.recomtraceid = s.recomtraceid AND t.videoid = s.clickobjectid LEFT JOIN ( SELECT subsessionid ,mid ,videoid FROM loghubods.video_action_log_rp WHERE dt >= '{bizdatemin}' AND dt <= '{bizdatemax}' AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' ) v ON s.subsessionid = v.subsessionid AND s.machinecode = v.mid GROUP BY account_name ,t.videoid ) b ON a.公众号名 = b.公众号名 AND a.videoid = b.videoid LEFT JOIN ( SELECT d.account_name AS 公众号名 ,a.videoid ,COUNT(DISTINCT a.mid) AS realplay_uv ,COUNT( CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END ) AS 分发realplay_pv ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv FROM loghubods.ods_video_play_log_day a LEFT JOIN ( SELECT DISTINCT open_id ,union_id FROM loghubods.user_wechat_identity_info_ha WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha") ) b ON a.mid = CONCAT('weixin_openid_',b.open_id) LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND a.businesstype = 'videoRealPlay' AND d.user_create_time IS NOT NULL AND d.account_name = '{account_name}' GROUP BY d.account_name ,a.videoid ORDER BY 分发realplay_pv DESC ) c ON a.公众号名 = c.公众号名 AND a.videoid = c.videoid LEFT JOIN ( SELECT t.account_name AS 公众号名 ,t.videoid ,COUNT(DISTINCT s.machinecode) AS 头部回流人数 FROM ( SELECT DISTINCT a.shareobjectid AS videoid ,a.shareid ,a.machinecode ,d.account_name FROM loghubods.user_share_log a LEFT JOIN loghubods.user_wechat_identity_info_ha b ON a.machinecode = CONCAT('weixin_openid_',b.open_id) AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha") LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND a.topic = 'share' AND a.pagesource REGEXP 'pages/user-videos-share$' AND d.user_create_time IS NOT NULL AND d.account_name = '{account_name}' ) t LEFT JOIN ( SELECT DISTINCT shareid ,machinecode ,clickobjectid FROM loghubods.user_share_log WHERE dt >= '{bizdatemin}' AND dt <= '{bizdatemax}' AND topic = 'click' ) s ON t.shareid = s.shareid GROUP BY account_name ,t.videoid ) d ON a.公众号名 = d.公众号名 AND a.videoid = d.videoid LEFT JOIN loghubods.video_merge_tag e ON a.videoid = e.videoid ) ORDER BY 推荐曝光数 DESC ''' result_list = [] data = get_odps_data(sql_query) if data: for r in data: result_list.append( { "account_name": r[0], "videoid": r[1], "一级品类": r[2], "二级品类": r[3], "ext_data": { "头部曝光": r[4], "头部realplay": r[5], "头部分享": r[6], "头部回流数": r[7], "推荐曝光数": r[8], "推荐realplay": r[9], "推荐分享数": r[10], "推荐回流数": r[11], "vov分子": r[12], }, } ) output_file = Path(__file__).parent / f"{account_name}.json" with output_file.open("w", encoding="utf-8") as f: json.dump(result_list, f, ensure_ascii=False, indent=2) return result_list if __name__ == '__main__': result_list = get_changwen_weight('青史铁事漫谈') print(result_list)