import time import traceback from datetime import date, timedelta from utils import filter_video_status, send_msg_to_feishu from db_helper import RedisHelper from config import set_config from log import Log config_, env = set_config() log_ = Log() def filter_rov_pool(): """ROV召回池视频过滤""" log_.info("rov recall pool filter start ...") # 拼接redis-key key_name, _ = get_pool_redis_key(pool_type='rov') # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1) if data is None: log_.info("data is None") log_.info("rov recall pool filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("rov recall pool filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("rov recall pool filter end!") def filter_flow_pool(): """流量池视频过滤""" log_.info("flow pool filter start ...") for _, app_type in config_.APP_TYPE.items(): log_.info('app_type {} videos filter start...'.format(app_type)) # 拼接redis-key key_name = get_pool_redis_key(pool_type='flow', app_type=app_type) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1) if data is None: log_.info("data is None") log_.info("app_type {} videos filter end!".format(app_type)) continue # videoId与flowPool做mapping video_ids = [] mapping = {} for video in data: video_id, flow_pool = video.split('-') video_id = int(video_id) if video_id not in video_ids: video_ids.append(video_id) mapping[video_id] = [flow_pool] else: mapping[video_id].append(flow_pool) # 过滤 if len(video_ids) == 0: log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data)) log_.info("app_type {} videos filter end!".format(app_type)) continue filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) # 移除 if len(filter_videos) == 0: log_.info("app_type {} videos filter end!".format(app_type)) continue remove_videos = ['{}-{}'.format(video_id, flow_pool) for video_id in filter_videos for flow_pool in mapping[video_id]] redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos) log_.info("app_type {} videos filter end!".format(app_type)) log_.info("flow pool filter end!") def filter_bottom(): """兜底视频过滤""" log_.info("bottom videos filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1) if data is None: log_.info("data is None") log_.info("bottom videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("bottom videos filter end!") return redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos)) log_.info("bottom videos filter end!") def filter_rov_updated(): """修改过ROV的视频过滤""" log_.info("update rov videos filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1) if data is None: log_.info("data is None") log_.info("update rov videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("update rov videos filter end!") return redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos)) log_.info("update rov videos filter end!") def get_pool_redis_key(pool_type, app_type=None): """ 拼接key :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池} :param app_type: 产品标识 :return: key_name """ redis_helper = RedisHelper() if pool_type == 'rov': # 判断热度列表是否更新,未更新则使用前一天的热度列表 key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d') if redis_helper.key_exists(key_name): redis_date = date.today().strftime('%Y%m%d') else: redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date return key_name, redis_date elif pool_type == 'flow': # 流量池 return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type) else: log_.error('pool type error') return None, None def main(): try: # ROV召回池视频过滤 filter_rov_pool() # 流量池视频过滤 filter_flow_pool() # 兜底视频过滤 filter_bottom() # 修改过ROV的视频过滤 filter_rov_updated() except Exception as e: log_.error(traceback.format_exc()) send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc())) return if __name__ == '__main__': main()