from odps import ODPS from odps.errors import ODPSError from datetime import date, timedelta from agent import tool def get_odps_data(sql): # 配置信息 access_id = 'LTAI9EBa0bd5PrDa' access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5' project = 'loghubods' endpoint = 'http://service.odps.aliyun.com/api' # 1. 初始化 ODPS 入口 o = ODPS(access_id, access_key, project, endpoint=endpoint) try: # 2. 执行 SQL 并获取结果 # execute_sql 会等待任务完成,使用 open_reader 读取数据 with o.execute_sql(sql).open_reader() as reader: # reader 类似于 Java 中的 List # 我们可以直接将其转换为 Python 的 list records = [record for record in reader] return records except ODPSError as e: print(f"ODPS 错误: {e}") return None def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids): merge_level_in_clause = f"'{merge_leve2}'" video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids]) end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d") start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d") sql_query = f''' SELECT v.videoid, CASE WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0 ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0) END AS avg_rov_t0 FROM ( SELECT t2.videoid, t2.merge_leve2 FROM videoods.content_profile t1 JOIN loghubods.video_merge_tag t2 ON t1.content_id = t2.videoid WHERE t1.status = 3 AND t1.is_deleted = 0 AND t2.merge_leve2 IN ({merge_level_in_clause}) ) v LEFT JOIN loghubods.video_dimension_detail_add_column t3 ON v.videoid = t3.视频id AND t3.dt >= '{start_date}' AND t3.dt <= '{end_date}' WHERE v.videoid in ({video_ids_in_clause}) GROUP BY v.videoid ; ''' data = get_odps_data(sql_query) result_dict = {} if data: result_dict = {r[0]: r[1] for r in data} return result_dict def get_changwen_weight(account_name): bizdatemax_date = date.today() - timedelta(days=1) bizdatemin_date = bizdatemax_date - timedelta(days=30) bizdatemax = bizdatemax_date.strftime("%Y%m%d") bizdatemin = bizdatemin_date.strftime("%Y%m%d") sql_query = f''' SELECT 公众号名 ,videoid ,sum(头部曝光) as 头部曝光 ,sum(头部realplay) as 头部realplay ,sum(头部分享) as 头部分享 ,sum(头部回流人数) AS 头部回流数 ,sum(推荐曝光数) as 推荐曝光数 ,sum(推荐realplay) as 推荐realplay ,sum(推荐分享数) as 推荐分享数 ,sum(推荐回流数) as 推荐回流数 ,sum(当日回流进入分发曝光次数) AS vov分子 FROM ( SELECT DISTINCT a.公众号名 ,a.videoid ,e.merge_leve1 AS 一级品类 ,e.merge_leve2 AS 二级品类 ,a.title ,a.进入分发人数 ,头部曝光pv AS 头部曝光 ,头部realplay_pv AS 头部realplay ,头部分享pv AS 头部分享 ,a.当日分发曝光pv AS 推荐曝光数 ,a.当日分发播放pv ,分发realplay_pv AS 推荐realplay ,分发realplay_pv / a.当日分发播放pv AS 真实播放率pv ,当日分发播放uv ,c.realplay_uv AS 分发真实播uv ,c.realplay_uv / a.当日分发播放uv AS 真实播放率uv ,a.当日分发分享pv AS 推荐分享数 ,a.当日分发分享pv / a.当日分发曝光pv AS str ,NVL(b.当日分发回流人数,0) AS 推荐回流数 ,NVL(b.当日回流进入分发人数,0) AS 当日回流进入分发人数 ,NVL(b.当日回流进入分发曝光次数,0) AS 当日回流进入分发曝光次数 ,NVL(b.当日回流进入分发曝光次数,0) / a.当日分发曝光pv AS vov分子 ,d.头部回流人数 FROM ( SELECT account_name AS 公众号名 ,videoid ,title ,COUNT(DISTINCT mid) AS 进入分发人数 ,COUNT( CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' THEN mid END ) AS 当日分发曝光pv ,COUNT( CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoView' THEN mid END ) AS 头部曝光pv ,COUNT( CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END ) AS 当日分发播放pv ,COUNT(DISTINCT CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoPlay' THEN mid END ) AS 当日分发播放uv ,COUNT( CASE WHEN pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoShareFriend' THEN mid END ) AS 当日分发分享pv ,COUNT( CASE WHEN pagesource REGEXP 'pages/user-videos-share$' AND businesstype = 'videoShareFriend' THEN mid END ) AS 头部分享pv FROM ( SELECT DISTINCT a.mid ,a.videoid ,a.businesstype ,a.pagesource ,a.subsessionid ,account_name ,e.title FROM loghubods.video_action_log_rp a LEFT JOIN loghubods.user_wechat_identity_info_ha b ON a.mid = CONCAT('weixin_openid_',b.open_id) AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha") LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") LEFT JOIN videoods.wx_video e ON a.videoid = e.id WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND businesstype IN ('videoView','videoPlay','videoShareFriend') AND d.user_create_time IS NOT NULL AND account_name = '{account_name}' ) t GROUP BY 公众号名 ,videoid ,title ) a LEFT JOIN ( SELECT t.account_name AS 公众号名 ,t.videoid ,COUNT(DISTINCT s.machinecode) AS 当日分发回流人数 ,COUNT(DISTINCT v.mid) AS 当日回流进入分发人数 ,COUNT(v.mid) AS 当日回流进入分发曝光次数 FROM ( SELECT DISTINCT a.subsessionid ,a.videoid ,a.mid ,d.account_name ,GET_JSON_OBJECT(extparams,'$.recomTraceId') AS recomtraceid FROM loghubods.video_action_log_rp a LEFT JOIN loghubods.user_wechat_identity_info_ha b ON a.mid = CONCAT('weixin_openid_',b.open_id) AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha") LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND a.businesstype = 'videoShareFriend' AND a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND d.user_create_time IS NOT NULL AND d.account_name = '{account_name}' ) t LEFT JOIN ( SELECT DISTINCT subsessionid ,machinecode ,recomtraceid ,clickobjectid FROM loghubods.user_share_log WHERE dt >= '{bizdatemin}' AND dt <= '{bizdatemax}' AND topic = 'click' ) s ON t.recomtraceid = s.recomtraceid AND t.videoid = s.clickobjectid LEFT JOIN ( SELECT subsessionid ,mid ,videoid FROM loghubods.video_action_log_rp WHERE dt >= '{bizdatemin}' AND dt <= '{bizdatemax}' AND pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' AND businesstype = 'videoView' ) v ON s.subsessionid = v.subsessionid AND s.machinecode = v.mid GROUP BY account_name ,t.videoid ) b ON a.公众号名 = b.公众号名 AND a.videoid = b.videoid LEFT JOIN ( SELECT d.account_name AS 公众号名 ,a.videoid ,COUNT(DISTINCT a.mid) AS realplay_uv ,COUNT( CASE WHEN a.pagesource REGEXP 'category$|recommend$|-pages/user-videos-detail$' THEN a.mid END ) AS 分发realplay_pv ,COUNT(CASE WHEN a.pagesource REGEXP 'pages/user-videos-share$' THEN a.mid END) AS 头部realplay_pv FROM loghubods.ods_video_play_log_day a LEFT JOIN ( SELECT DISTINCT open_id ,union_id FROM loghubods.user_wechat_identity_info_ha WHERE dt = MAX_PT("loghubods.user_wechat_identity_info_ha") ) b ON a.mid = CONCAT('weixin_openid_',b.open_id) LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND a.businesstype = 'videoRealPlay' AND d.user_create_time IS NOT NULL AND d.account_name = '{account_name}' GROUP BY d.account_name ,a.videoid ORDER BY 分发realplay_pv DESC ) c ON a.公众号名 = c.公众号名 AND a.videoid = c.videoid LEFT JOIN ( SELECT t.account_name AS 公众号名 ,t.videoid ,COUNT(DISTINCT s.machinecode) AS 头部回流人数 FROM ( SELECT DISTINCT a.shareobjectid AS videoid ,a.shareid ,a.machinecode ,d.account_name FROM loghubods.user_share_log a LEFT JOIN loghubods.user_wechat_identity_info_ha b ON a.machinecode = CONCAT('weixin_openid_',b.open_id) AND b.dt = MAX_PT("loghubods.user_wechat_identity_info_ha") LEFT JOIN loghubods.gzh_fans_info d ON b.union_id = d.union_id AND d.dt = MAX_PT("loghubods.gzh_fans_info") WHERE a.dt >= '{bizdatemin}' AND a.dt <= '{bizdatemax}' AND a.topic = 'share' AND a.pagesource REGEXP 'pages/user-videos-share$' AND d.user_create_time IS NOT NULL AND d.account_name = '{account_name}' ) t LEFT JOIN ( SELECT DISTINCT shareid ,machinecode ,clickobjectid FROM loghubods.user_share_log WHERE dt >= '{bizdatemin}' AND dt <= '{bizdatemax}' AND topic = 'click' ) s ON t.shareid = s.shareid GROUP BY account_name ,t.videoid ) d ON a.公众号名 = d.公众号名 AND a.videoid = d.videoid LEFT JOIN loghubods.video_merge_tag e ON a.videoid = e.videoid ) GROUP BY 公众号名, videoid ORDER BY 推荐曝光数 DESC ''' result_list = [] data = get_odps_data(sql_query) if data: ext_keys = [ "头部曝光", "头部realplay", "头部分享", "头部回流数", "推荐曝光数", "推荐realplay", "推荐分享数", "推荐回流数", "vov分子", ] for r in data: ext_data = {k: r[i + 2] for i, k in enumerate(ext_keys)} result_list.append( { "account_name": r[0], "videoid": r[1], "ext_data": ext_data, } ) return result_list if __name__ == '__main__': result_list = get_changwen_weight('史趣探秘') print(result_list)