import gevent import datetime import numpy as np from config import set_config from log import Log from utils import RedisHelper config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() def update_limit_video_score(initial_videos, key_name): """ 调整限流视频的分数: 将视频移至所在列表的中位数之后,多个视频时,按照原本的顺序进行排列 :param initial_videos: 视频列表及score type-dict, {videoId: score, ...} :param key_name: 视频列表对应的key :return: """ if not initial_videos: return # 获取当前限流视频 data = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS) if data is None: return # 获取限流视频对应的score limit_video_initial_score = [] for video in eval(data): video_id = int(video[0]) initial_score = initial_videos.get(video_id, None) if initial_score is not None: limit_video_initial_score.append((video_id, initial_score)) log_.info(f"limit_video_initial_score = {limit_video_initial_score}") if len(limit_video_initial_score) == 0: return # 获取原始列表的分数的中位数 initial_video_score_list = sorted([val for key, val in initial_videos.items()], reverse=False) media_score = np.median(initial_video_score_list) # 取中位数后一位 if len(initial_video_score_list) % 2 == 0: temp_index = len(initial_video_score_list)//2 else: temp_index = len(initial_video_score_list) // 2 + 1 if len(initial_video_score_list) > 1: temp_score = initial_video_score_list[temp_index] else: temp_score = 0 # 对限流视频score进行调整 limit_video_final_score = {} limit_video_initial_score.sort(key=lambda x: x[1], reverse=True) limit_video_id_list = [] for video_id, initial_score in limit_video_initial_score: if initial_score > media_score: limit_video_id_list.append(video_id) if len(limit_video_id_list) > 0: limit_score_step = (temp_score - media_score) / (len(limit_video_id_list) + 1) for i, video_id in enumerate(limit_video_id_list): final_score = media_score - limit_score_step * (i + 1) limit_video_final_score[int(video_id)] = final_score log_.info(f"media_score = {media_score}, temp_score = {temp_score}, " f"limit_video_final_score = {limit_video_final_score}") # 更新限流视频的score if len(limit_video_final_score) == 0: return redis_helper.add_data_with_zset(key_name=key_name, data=limit_video_final_score, expire_time=23 * 3600) def check_videos_distribute(): """ 检查当前限流视频分发数 :return: stop_distribute_video_id_list """ # 获取当前限流视频及最大分发数 data = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS) if data is None: return [] # 判断是否已超分发 stop_distribute_video_id_list = [] for video_id, max_distribute_count in eval(data): distributed_count = redis_helper.get_data_from_redis( key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}" ) if distributed_count is None: continue if int(distributed_count) >= int(max_distribute_count): stop_distribute_video_id_list.append(int(video_id)) return stop_distribute_video_id_list def process_with_region(data_key, rule_key, region, stop_distribute_video_id_list, now_date, now_h): log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region}") # 将已超分发视频加入到地域小时级线上过滤应用列表中 # redis_helper.add_data_with_set( # key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}", # values=stop_distribute_video_id_list, # expire_time=2 * 3600 # ) # 将已超分发视频加入到地域分组24h的数据线上过滤应用列表中 # redis_helper.add_data_with_set( # key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}", # values=stop_distribute_video_id_list, # expire_time=2 * 3600 # ) # 将已超分发视频加入到不区分相对24h线上过滤应用列表中 # redis_helper.add_data_with_set( # key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}", # values=stop_distribute_video_id_list, # expire_time=2 * 3600 # ) key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表 config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表 config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h列表 config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h列表2 # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表 ] # if rule_key == 'rule4': # key_prefix_list = [ # config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表 # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表 # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h列表 # config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h列表2 # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表 # ] # elif rule_key == 'rule5': # key_prefix_list = [ # config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表 # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表 # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H, # 不区分地域相对48h列表 # config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H, # 不区分地域相对48h列表2 # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表 # ] # else: # key_prefix_list = [ # config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表 # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表 # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h列表 # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表 # ] for key_prefix in key_prefix_list: key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" if not redis_helper.key_exists(key_name=key_name): if now_h == 0: redis_date = now_date - datetime.timedelta(days=1) redis_h = 23 else: redis_date = now_date redis_h = now_h - 1 key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(redis_date, '%Y%m%d')}:{redis_h}" redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list) """ # 将已超分发视频 移除 不区分相对24h列表2 if rule_key == 'rule4': key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}" \ f"{region}.{app_type}.{data_key}.{rule_key}." \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}" if not redis_helper.key_exists(key_name=key_name): if now_h == 0: redis_date = now_date - datetime.timedelta(days=1) redis_h = 23 else: redis_date = now_date redis_h = now_h - 1 key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}" \ f"{region}.{app_type}.{data_key}.{rule_key}." \ f"{datetime.datetime.strftime(redis_date, '%Y%m%d')}.{redis_h}" redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list) # 将已超分发视频 移除 大列表 key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}" \ f"{region}.{app_type}.{data_key}.{rule_key}." \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}" if not redis_helper.key_exists(key_name=key_name): if now_h == 0: redis_date = now_date - datetime.timedelta(days=1) redis_h = 23 else: redis_date = now_date redis_h = now_h - 1 key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}" \ f"{region}.{app_type}.{data_key}.{rule_key}." \ f"{datetime.datetime.strftime(redis_date, '%Y%m%d')}.{redis_h}" redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list) """ log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region} " f"videos check end!") def check_region_videos(rule_params): """检查限流视频分发数""" # 获取当前日期 now_date = datetime.datetime.today() # 获取当前所在小时 now_h = datetime.datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') # 获取已超分发视频 stop_distribute_video_id_list = check_videos_distribute() log_.info(f"stop_distribute_video_id_list = {stop_distribute_video_id_list}, " f"count = {len(stop_distribute_video_id_list)}") if len(stop_distribute_video_id_list) == 0: return # 对已超分发的视频进行移除 region_code_list = [code for region, code in config_.REGION_CODE.items()] + \ [code for city, code in config_.CITY_CODE.items()] for param in rule_params.get('params_list'): data_key = param.get('data') rule_key = param.get('rule') log_.info(f"data_key = {data_key}, rule_key = {rule_key}") task_list = [ gevent.spawn(process_with_region, data_key, rule_key, region, stop_distribute_video_id_list, now_date, now_h) for region in region_code_list ] gevent.joinall(task_list) # 将已超分发视频 移除 天级更新30天列表 log_.info("day_by_30day check start...") day30_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY for param in config_.RULE_PARAMS_30DAY_APP_TYPE.get('params_list'): data_key = param.get('data') rule_key = param.get('rule') log_.info(f"data_key = {data_key}, rule_key = {rule_key}") key_name = f"{day30_key_prefix}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}" if not redis_helper.key_exists(key_name=key_name): redis_date = now_date - datetime.timedelta(days=1) key_name = f"{day30_key_prefix}:{data_key}:{rule_key}:{datetime.datetime.strftime(redis_date, '%Y%m%d')}" redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list) log_.info("day_by_30day check end!") # 将已超分发视频 移除 原始大列表 # key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(now_date, '%Y%m%d')}" # if not redis_helper.key_exists(key_name=key_name): # redis_date = now_date - datetime.timedelta(days=1) # key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(redis_date, '%Y%m%d')}" # redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list) if __name__ == '__main__': log_.info("start...") check_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE) # check_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H) log_.info("end!")