# coding:utf-8 import datetime import json import traceback import os import json from utils import execute_sql_from_odps, request_post, update_video_w_h_rate from db_helper import RedisHelper from config import set_config from log import Log config_, env = set_config() log_ = Log() def get_bottom_videos_test(): """获取测试环境兜底视频""" try: # 获取总播放量top1000的视频 sql = "SELECT id " \ ",play_count_total " \ "FROM videoods.wx_video_test " \ "WHERE transcode_status = 3 " \ "AND STATUS = 1 " \ "AND recommend_status IN ( - 6, 1) " \ "ORDER BY play_count_total DESC " \ "LIMIT 2000" \ ";" records = execute_sql_from_odps(project='videoods', sql=sql) # 视频按照总播放量写入redis video_id_list = [] videos = {} with records.open_reader() as reader: for record in reader: video_id = int(record['id']) video_id_list.append(video_id) videos[video_id] = record['play_count_total'] return video_id_list, videos except Exception as e: log_.error(traceback.format_exc()) def get_bottom_videos_pro(now_date): """获取生产环境兜底视频""" try: # 获取昨日播放量top1000的视频 delta_date = now_date - datetime.timedelta(days=1) sql = "SELECT video_playcount.videoid " \ ",video_playcount.play_count " \ "FROM ( " \ "SELECT videoid " \ ",COUNT(*) play_count " \ "FROM loghubods.video_action_log_applet " \ "WHERE dt = {} " \ "AND business = 'videoPlay' " \ "GROUP BY videoid " \ ") video_playcount INNER " \ "JOIN ( " \ "SELECT DISTINCT videoid " \ "FROM videoods.dim_video " \ "WHERE video_edit = '通过' " \ "AND video_data_stat = '有效' " \ "AND video_recommend IN ( '待推荐', '普通推荐') " \ "AND charge = '免费' " \ "AND is_pwd = '未加密' " \ ") video_status " \ "ON video_playcount.videoid = video_status.videoid " \ "ORDER BY video_playcount.play_count DESC " \ "LIMIT 2000 " \ ";".format(delta_date.strftime('%Y%m%d')) records = execute_sql_from_odps(project='loghubods', sql=sql) # 视频按照昨日播放量写入redis video_id_list = [] videos = {} with records.open_reader() as reader: for record in reader: video_id = int(record['videoid']) video_id_list.append(video_id) videos[video_id] = record['play_count'] return video_id_list, videos except Exception as e: log_.error(traceback.format_exc()) def update_bottom_videos(): """更新兜底视频""" try: # 获取对应环境的兜底视频 now_date = datetime.datetime.today() if env in ['dev', 'test']: video_id_list, videos = get_bottom_videos_test() elif env in ['pre', 'pro']: video_id_list, videos = get_bottom_videos_pro(now_date=now_date) else: log_.error('env error') return # redis数据更新 redis_helper = RedisHelper() redis_helper.del_keys(key_name=config_.BOTTOM_KEY_NAME) redis_helper.add_data_with_zset(key_name=config_.BOTTOM_KEY_NAME, data=videos) # 与原有兜底视频排序,保留top1000 # redis_helper.remove_by_rank_from_zset(key_name=config_.BOTTOM_KEY_NAME, start=config_.BOTTOM_NUM, stop=-1) # 移除bottom key的过期时间,将其转换为永久状态 redis_helper.persist_key(key_name=config_.BOTTOM_KEY_NAME) log_.info('{} update bottom videos success!, count = {}'.format(now_date, len(videos))) # ###### 下线 # # 更新视频的宽高比数据 # video_ids = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1) # if video_ids: # update_video_w_h_rate(video_ids=video_ids, # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last']) # log_.info('update video w_h_rate to redis finished!') # # 获取今日兜底视频的json,并存入redis # video_json_list = [] # for i in range(0, len(video_id_list)//10): # video_json = get_video_info_json(video_ids=video_id_list[i*10:(i+1)*10]) # if video_json is not None: # video_json_list.extend(video_json) # if len(video_json_list) >= 1000: # break # # 写入redis,先删除key,再重新写入 # redis_helper.del_keys(config_.BOTTOM_JSON_KEY_NAME) # redis_helper.add_data_with_set(key_name=config_.BOTTOM_JSON_KEY_NAME, values=video_json_list[:1000]) # # 移除过期时间,将其转换为永久状态 # redis_helper.persist_key(key_name=config_.BOTTOM_JSON_KEY_NAME) # # log_.info('{} update bottom videos info json success!, count = {}'.format(now_date, # len(video_json_list[:1000]))) except Exception as e: log_.error(traceback.format_exc()) def get_video_info_json(video_ids): """ 获取视频对应json :param video_ids: type-list [int, int, ...] :return: json_data_list """ request_data = { "appType": 4, "platform": "android", "versionCode": 295, "pageSize": 10, "machineCode": "weixin_openid_otjoB5WWWmkRjpMzkV5ltZ3osg3A", "uid": 6281917, "videoIds": video_ids } res = request_post(request_url=config_.BOTTOM_JSON_URL, request_data=request_data) if res is None: return None if res['code'] != 0: log_.info('获取视频json失败!') return None json_data_list = [json.dumps(item) for item in res['data']] return json_data_list if __name__ == '__main__': update_bottom_videos()