import os import time import json import traceback import ast import pandas as pd from datetime import date, timedelta, datetime from region_rule_rank_h import region_code from utils import filter_video_status, send_msg_to_feishu, filter_video_status_app from db_helper import RedisHelper, MysqlHelper from config import set_config from log import Log config_, env = set_config() log_ = Log() def filter_position_videos(): """按位置排序视频过滤""" log_.info("position videos filter start...") position_key_list = [config_.RECALL_POSITION1_KEY_NAME, config_.RECALL_POSITION2_KEY_NAME] redis_helper = RedisHelper() for key_name in position_key_list: position = key_name.split('.')[-1] log_.info("position = {}".format(position)) # 获取数据 position_videos = redis_helper.get_data_from_redis(key_name=key_name) if position_videos is None: log_.info('position {} videos is None!'.format(position)) continue else: # 过滤 position_video_ids = [int(video_id) for video_id in ast.literal_eval(position_videos)] filter_video_ids = filter_video_status(video_ids=position_video_ids) # 重新写入redis redis_helper.set_data_to_redis(key_name=key_name, value=str(filter_video_ids), expire_time=30 * 3600) log_.info('position {} videos filter end!'.format(position)) log_.info("position videos filter end!") def filter_relevant_videos(): """运营强插相关推荐视频过滤""" log_.info("relevant videos with op filter filter start...") # 读取需要过滤的头部视频id redis_helper = RedisHelper() head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME) if head_videos is None or len(head_videos) == 0: log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos)) return # 过滤 remove_head_vids = [] for head_vid in head_videos: key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid) # 头部视频 对应的key不存在时,将head_vid移除对应redis if not redis_helper.key_exists(key_name=key_name): remove_head_vids.append(head_vid) log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid)) continue # 获取头部视频对应的相关视频 relevant_videos = redis_helper.get_data_from_redis(key_name=key_name) # 该视频没有指定的相关性视频,将head_vid移除对应redis if relevant_videos is None: remove_head_vids.append(head_vid) log_.info('head_vid = {} not have relevant videos!'.format(head_vid)) continue # 过滤 relevant_videos = json.loads(relevant_videos) relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos] filtered_videos = filter_video_status(video_ids=relevant_video_ids) # 保留可推荐 且生效中 的视频 relevant_videos_new = [ item for item in relevant_videos if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) > int(time.time()) ] # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key if len(relevant_videos_new) == 0: remove_head_vids.append(head_vid) redis_helper.del_keys(key_name=key_name) log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format( head_vid, len(relevant_videos_new))) continue # 重新写入redis # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间 finish_time_list = [item['finish_time'] for item in relevant_videos_new] expire_time = max(finish_time_list) - int(time.time()) + 5 if expire_time <= 0: log_.info('head_vid = {} expire_time <= 0!'.format(head_vid)) continue # 存入redis redis_helper.set_data_to_redis(key_name=key_name, value=json.dumps(relevant_videos_new), expire_time=expire_time) log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format( head_vid, len(relevant_videos_new))) # 将需要移除的头部视频id进行移除 if len(remove_head_vids) == 0: log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids)) log_.info("relevant videos with op filter end!") return redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids)) log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids)) log_.info("relevant videos with op filter end!") def filter_rov_pool(app_type=None): """ROV召回池视频过滤""" log_.info("rov recall pool filter start ...") # 拼接redis-key if app_type is None: key_name, _ = get_pool_redis_key(pool_type='rov') else: log_.info("appType = {}".format(app_type)) key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("rov recall pool filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("rov recall pool filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("rov recall pool filter end!") def filter_flow_pool(): """流量池视频过滤""" log_.info("flow pool filter start ...") app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']] for _, app_type in config_.APP_TYPE.items(): log_.info('app_type {} videos filter start...'.format(app_type)) if app_type in app_type_list: filter_flow_pool_18_19(app_type=app_type) else: # 拼接redis-key key_name = get_pool_redis_key(pool_type='flow', app_type=app_type) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("app_type {} videos filter end!".format(app_type)) continue # videoId与flowPool做mapping video_ids = [] mapping = {} for video in data: video_id, flow_pool = video.split('-') video_id = int(video_id) if video_id not in video_ids: video_ids.append(video_id) mapping[video_id] = [flow_pool] else: mapping[video_id].append(flow_pool) # 过滤 if len(video_ids) == 0: log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data)) log_.info("app_type {} videos filter end!".format(app_type)) continue if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) # 移除 if len(filter_videos) == 0: log_.info("app_type {} videos filter end!".format(app_type)) continue remove_videos = ['{}-{}'.format(video_id, flow_pool) for video_id in filter_videos for flow_pool in mapping[video_id]] redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos) log_.info("app_type {} videos filter end!".format(app_type)) log_.info("flow pool filter end!") def filter_flow_pool_18_19(app_type): """流量池视频过滤""" log_.info('app_type {} videos filter start...'.format(app_type)) # 拼接redis-key key_name = get_pool_redis_key(pool_type='flow', app_type=app_type) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("app_type {} videos filter end!".format(app_type)) return video_ids = [int(video_id) for video_id in data] # 过滤 if len(video_ids) == 0: log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data)) log_.info("app_type {} videos filter end!".format(app_type)) return filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) # 移除 if len(filter_videos) == 0: log_.info("app_type {} videos filter end!".format(app_type)) return redis_helper.remove_value_from_zset(key_name=key_name, value=filter_videos) log_.info("app_type {} videos filter end!".format(app_type)) log_.info("flow pool filter end!") def filter_bottom(): """兜底视频过滤""" log_.info("bottom videos filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=config_.BOTTOM_KEY_NAME) if data is None: log_.info("data is None") log_.info("bottom videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("bottom videos filter end!") return redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos)) log_.info("bottom videos filter end!") def filter_rov_updated(): """修改过ROV的视频过滤""" log_.info("update rov videos filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME) if data is None: log_.info("data is None") log_.info("update rov videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': config_.UPDATE_ROV_KEY_NAME, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("update rov videos filter end!") return redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos)) log_.info("update rov videos filter end!") def filter_rov_updated_app(): """修改过ROV的视频过滤-app推荐状态过滤""" log_.info("update rov videos app filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP) if data is None: log_.info("data is None") log_.info("update rov videos app filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status_app(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("update rov videos app filter end!") return redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=list(filter_videos)) log_.info("update rov videos app filter end!") def get_pool_redis_key(pool_type, app_type=None): """ 拼接key :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池} :param app_type: 产品标识 :return: key_name """ redis_helper = RedisHelper() if pool_type == 'rov': # appType = 6 if app_type == config_.APP_TYPE['SHORT_VIDEO']: # 获取当前所在小时 redis_date = datetime.now().hour # 判断热度列表是否更新,未更新则使用前一小时的热度列表 key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date) if redis_helper.key_exists(key_name): return key_name, redis_date else: if redis_date == 0: redis_date = 23 else: redis_date = redis_date - 1 key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date) return key_name, redis_date # appType: [18, 19] elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.' now_date = datetime.today().strftime('%Y%m%d') now_h = datetime.now().hour key_name = f"{key_name_prefix}{now_date}.{now_h}" if redis_helper.key_exists(key_name): return key_name, now_h else: if now_h == 0: redis_h = 23 redis_date = (datetime.today() - timedelta(days=1)).strftime('%Y%m%d') else: redis_h = now_h - 1 redis_date = now_date key_name = f"{key_name_prefix}{redis_date}.{redis_h}" return key_name, redis_h else: # appType = 13 票圈视频app if app_type == config_.APP_TYPE['APP']: key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP # # appType: [18, 19] # elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.' # 其他 else: key_name_prefix = config_.RECALL_KEY_NAME_PREFIX # 判断热度列表是否更新,未更新则使用前一天的热度列表 key_name = key_name_prefix + time.strftime('%Y%m%d') if redis_helper.key_exists(key_name): redis_date = date.today().strftime('%Y%m%d') else: redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') key_name = key_name_prefix + redis_date return key_name, redis_date elif pool_type == 'flow': # 流量池 return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type) else: log_.error('pool type error') return None, None def filter_app_pool(): """过滤票圈视频APP小时级数据""" log_.info("app pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') if now_h < 7: redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') redis_h = 21 elif now_h > 21: redis_date = now_date redis_h = 21 else: if now_h % 2 == 0: redis_date = now_date redis_h = now_h - 1 else: redis_date = now_date redis_h = now_h log_.info(f'redis_date = {redis_date}, redis_h = {redis_h}.') # 拼接key key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}' # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("app pool filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status_app(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("app pool filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("app pool filter end!") def filter_rov_h(): """过滤小程序小时级数据""" rule_params = config_.RULE_PARAMS log_.info("rov_h pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤两个视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER}{key}", values=filter_videos, expire_time=2*3600) log_.info("rov_h pool filter end!") def filter_rov_day(): """过滤小程序天级数据""" rule_params = config_.RULE_PARAMS_DAY log_.info("rov_day pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') log_.info(f'now_date = {now_date}.') for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤三个视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_BY_DAY, config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE, config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{key}.{now_date}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("rov_day pool filter end!") def filter_old_videos(): """过滤老视频数据""" log_.info("old videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') log_.info(f'now_date = {now_date}.') # 拼接key key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_date}' # 获取视频 data = redis_helper.get_data_from_set(key_name=key_name) if data is None: log_.info("data is None") log_.info("old videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("old videos filter end!") return redis_helper.remove_value_from_set(key_name=key_name, values=filter_videos) log_.info("old videos filter end!") def filter_region_videos(): """过滤地域分组规则视频""" region_code_list = [code for region, code in region_code.items()] rule_params = config_.RULE_PARAMS_REGION log_.info("region_h videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for region in region_code_list: log_.info(f"region = {region}") for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H, # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H, config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{key}", values=filter_videos, expire_time=2 * 3600) elif i == 1: # 将地域分组24h的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{key}", values=filter_videos, expire_time=2 * 3600) elif i == 2: # 将相对24h的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{key}", values=filter_videos, expire_time=2 * 3600) log_.info(f"region = {region} videos filter end!") log_.info("region_h videos filter end!") def filter_region_videos_by_day(): """过滤地域分组天级规则视频""" region_code_list = [code for region, code in region_code.items()] rule_params = config_.RULE_PARAMS_REGION_DAY log_.info("region_day videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') log_.info(f'now_date = {now_date}.') for region in region_code_list: log_.info(f"region = {region}") for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_DAY ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{region}.{key}.{now_date}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info(f"region = {region} videos filter end!") log_.info("region_day videos filter end!") def filter_rov_h_24h(): """过滤小程序小时级更新24h数据""" rule_params = config_.RULE_PARAMS_24H log_.info("rov_h_by24h pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤两个视频列表 key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}{key}", values=filter_videos, expire_time=2*3600) log_.info("rov_h_by24h pool filter end!") def filter_region_videos_24h(): """过滤地域分组24h规则视频""" region_code_list = [code for region, code in region_code.items()] rule_params = config_.RULE_PARAMS_REGION_24H log_.info("region_24h videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for region in region_code_list: log_.info(f"region = {region}") for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{key}", values=filter_videos, expire_time=2 * 3600) # filter_data(filter_videos, region) log_.info(f"region = {region} videos filter end!") log_.info("region_24h videos filter end!") def filter_data(videos, region): now_dt = datetime.now().strftime('%Y%m%d%H%M') filepath = './data/filter_data' if not os.path.exists(filepath): os.makedirs(filepath) res = [] mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO) video_status_sql = "SELECT t1.id AS 'video_id', " \ "t1.transcode_status AS 'transcoding_status', " \ "t2.audit_status AS 'audit_status', " \ "t2.video_status AS 'open_status', " \ "t2.recommend_status AS 'applet_rec_status', " \ "t2.app_recommend_status AS 'app_rec_status', " \ "t3.charge AS 'payment_status', " \ "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \ "FROM longvideo.wx_video t1 " \ "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \ "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \ "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id" for video_id in list(videos): sql = "SELECT video_id, audit_status, applet_rec_status, open_status, " \ "payment_status, encryption_status, transcoding_status " \ "FROM ({}) " \ "WHERE video_id = {};".format(video_status_sql, video_id) data = mysql_helper.get_data(sql=sql) res += data columns = ['video_id', 'audit_status', 'applet_rec_status', 'open_status', 'payment_status', 'encryption_status', 'transcoding_status'] filter_df = pd.DataFrame(data=res, columns=columns) filename = f"filter_data_region_{region}_{now_dt}.csv" file = os.path.join(filepath, filename) filter_df.to_csv(file, index=False) def filter_whole_movies(): """过滤完整电影数据""" log_.info("whole movies filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') # 拼接key key_name = f'{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{now_date}.{now_h}' # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("whole movies filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("whole movies filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("whole movies filter end!") def main(): try: # ROV召回池视频过滤 filter_rov_pool() # appType = 6,ROV召回池视频过滤 # filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO']) # appType = 13,票圈视频APP视频过滤 filter_rov_pool(app_type=config_.APP_TYPE['APP']) # appType = 18, ROV召回池视频过滤 filter_rov_pool(app_type=config_.APP_TYPE['LAO_HAO_KAN_VIDEO']) # appType = 19, ROV召回池视频过滤 filter_rov_pool(app_type=config_.APP_TYPE['ZUI_JING_QI']) # 流量池视频过滤 filter_flow_pool() # 兜底视频过滤 filter_bottom() # 修改过ROV的视频过滤 filter_rov_updated() filter_rov_updated_app() # 运营强插相关推荐视频过滤 # filter_relevant_videos() # 按位置排序视频过滤 # filter_position_videos() # 过滤票圈视频APP小时级数据 filter_app_pool() # 过滤小程序小时级数据 filter_rov_h() # 过滤小程序天级数据 # filter_rov_day() # 过滤老视频数据 # filter_old_videos() # 过滤地域分组小时级视频 filter_region_videos() # 过滤地域分组天级视频 filter_region_videos_by_day() # 过滤小时级更新24h视频 filter_rov_h_24h() # 过滤地域分组24h规则视频 filter_region_videos_24h() # 过滤完整电影数据 filter_whole_movies() except Exception as e: log_.error(traceback.format_exc()) send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text='{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()) ) return if __name__ == '__main__': main()