import json from typing import List from datetime import datetime, timedelta from odps.types import Record from app.infra.shared.tools import fetch_from_odps from app.infra.internal.piaoquan import fetch_piaoquan_video_list_detail class VideoDecodeUtils: # 获取检测的账号 list @staticmethod def get_top_head_videos(execute_dt): uv_threshold = 500 query = f""" WITH top_video AS ( SELECT dt, channel, 合作方名,包名,公众号名,hotsencetype, videoid, rootsourceid,title,`merge二级品类` ,COUNT(DISTINCT mid) AS 访问uv FROM loghubods.opengid_base_data WHERE dt = '{execute_dt}' AND hotsencetype != 1167 AND videoid IS NOT NULL AND usersharedepth = 0 AND channel IN ('公众号合作-即转-稳定','小程序投流-稳定','服务号合作-Daily-自选','群/企微合作-稳定','公众号买号','服务号买号','公众号投流-稳定','公众号代运营-Daily-系统','公众号合作-Daily-自选','服务号投流','公众号完全代投放','群/企微合作-稳定') GROUP BY dt ,channel ,合作方名 ,包名 ,公众号名 ,hotsencetype ,videoid ,rootsourceid ,title ,`merge二级品类` ,推荐状态 HAVING 访问uv >= {uv_threshold} ) SELECT DISTINCT channel ,公众号名 ,hotsencetype ,videoid ,rootsourceid ,title FROM top_video WHERE 访问uv >= {uv_threshold} ; """ result = fetch_from_odps(query) return result @staticmethod def process_odps_data(odps_list: List[Record]): return [ { "channel": i.channel, "account_name": i.公众号名, "hot_scene_type": i.hotsencetype, # 大数据单词写错。场景: scene "video_id": i.videoid, "root_source_id": i.rootsourceid, "title": i.title, } for i in odps_list ] @staticmethod def get_match_video_real_path(raw_match_videos: List, video_id: int): _response = raw_match_videos[0]["response"] match_video = json.loads(_response) for i in match_video: if i["videoId"] == video_id: return i["videoOss"] return None @staticmethod async def get_pq_video_real_path(video_id): response = await fetch_piaoquan_video_list_detail(video_list=[video_id]) video_path = response["data"][0]["ossVideoPath"] return video_path @staticmethod def get_yesterday_dt(): return (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")