import multiprocessing import gevent import os import time import json import traceback import ast import pandas as pd from datetime import date, timedelta, datetime from region_rule_rank_h import region_code from utils import filter_video_status, send_msg_to_feishu, filter_video_status_app from db_helper import RedisHelper, MysqlHelper from config import set_config from log import Log config_, env = set_config() log_ = Log() redis_helper = RedisHelper() def filter_position_videos(): """按位置排序视频过滤""" log_.info("position videos filter start...") position_key_list = [config_.RECALL_POSITION1_KEY_NAME, config_.RECALL_POSITION2_KEY_NAME] redis_helper = RedisHelper() for key_name in position_key_list: position = key_name.split('.')[-1] log_.info("position = {}".format(position)) # 获取数据 position_videos = redis_helper.get_data_from_redis(key_name=key_name) if position_videos is None: log_.info('position {} videos is None!'.format(position)) continue else: # 过滤 position_video_ids = [int(video_id) for video_id in ast.literal_eval(position_videos)] filter_video_ids = filter_video_status(video_ids=position_video_ids) # 重新写入redis redis_helper.set_data_to_redis(key_name=key_name, value=str(filter_video_ids), expire_time=30 * 3600) log_.info('position {} videos filter end!'.format(position)) log_.info("position videos filter end!") def filter_relevant_videos(): """运营强插相关推荐视频过滤""" log_.info("relevant videos with op filter filter start...") # 读取需要过滤的头部视频id redis_helper = RedisHelper() head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME) if head_videos is None or len(head_videos) == 0: log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos)) return # 过滤 remove_head_vids = [] for head_vid in head_videos: key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid) # 头部视频 对应的key不存在时,将head_vid移除对应redis if not redis_helper.key_exists(key_name=key_name): remove_head_vids.append(head_vid) log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid)) continue # 获取头部视频对应的相关视频 relevant_videos = redis_helper.get_data_from_redis(key_name=key_name) # 该视频没有指定的相关性视频,将head_vid移除对应redis if relevant_videos is None: remove_head_vids.append(head_vid) log_.info('head_vid = {} not have relevant videos!'.format(head_vid)) continue # 过滤 relevant_videos = json.loads(relevant_videos) relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos] filtered_videos = filter_video_status(video_ids=relevant_video_ids) # 保留可推荐 且生效中 的视频 relevant_videos_new = [ item for item in relevant_videos if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) > int(time.time()) ] # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key if len(relevant_videos_new) == 0: remove_head_vids.append(head_vid) redis_helper.del_keys(key_name=key_name) log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format( head_vid, len(relevant_videos_new))) continue # 重新写入redis # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间 finish_time_list = [item['finish_time'] for item in relevant_videos_new] expire_time = max(finish_time_list) - int(time.time()) + 5 if expire_time <= 0: log_.info('head_vid = {} expire_time <= 0!'.format(head_vid)) continue # 存入redis redis_helper.set_data_to_redis(key_name=key_name, value=json.dumps(relevant_videos_new), expire_time=expire_time) log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format( head_vid, len(relevant_videos_new))) # 将需要移除的头部视频id进行移除 if len(remove_head_vids) == 0: log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids)) log_.info("relevant videos with op filter end!") return redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids)) log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids)) log_.info("relevant videos with op filter end!") def filter_rov_pool(app_type=None): """ROV召回池视频过滤""" log_.info("rov recall pool filter start ...") # 拼接redis-key if app_type is None: key_name, _ = get_pool_redis_key(pool_type='rov') else: log_.info("appType = {}".format(app_type)) key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("rov recall pool filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("rov recall pool filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("rov recall pool filter end!") def filter_flow_pool(): """流量池视频过滤""" log_.info("flow pool filter start ...") app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']] for _, app_type in config_.APP_TYPE.items(): log_.info('app_type {} videos filter start...'.format(app_type)) if app_type in app_type_list: filter_flow_pool_18_19(app_type=app_type) else: for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]: log_.info(f"flow_pool_id = {flow_pool_id}") # 拼接redis-key key_name = get_pool_redis_key(pool_type='flow', app_type=app_type, flow_pool_id=flow_pool_id) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info(f"flow_pool_id = {flow_pool_id}, data is None") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") continue # videoId与flowPool做mapping video_ids = [] mapping = {} for video in data: video_id, flow_pool = video.split('-') video_id = int(video_id) if video_id not in video_ids: video_ids.append(video_id) mapping[video_id] = [flow_pool] else: mapping[video_id].append(flow_pool) # 过滤 if len(video_ids) == 0: log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") continue if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) # 移除 if len(filter_videos) == 0: log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") continue remove_videos = ['{}-{}'.format(video_id, flow_pool) for video_id in filter_videos for flow_pool in mapping[video_id]] redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos) log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") log_.info(f"app_type = {app_type} videos filter end!") log_.info("flow pool filter end!") def filter_flow_pool_new(): """流量池视频过滤""" log_.info("flow pool filter start ...") app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']] for _, app_type in config_.APP_TYPE.items(): log_.info('app_type {} videos filter start...'.format(app_type)) if app_type in app_type_list: filter_flow_pool_18_19(app_type=app_type) else: for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]: log_.info(f"flow_pool_id = {flow_pool_id}") # 拼接redis-key key_name = get_pool_redis_key(pool_type='flow_set', app_type=app_type, flow_pool_id=flow_pool_id) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_data_from_set(key_name=key_name) if data is None: log_.info(f"flow_pool_id = {flow_pool_id}, data is None") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") continue # videoId与flowPool做mapping video_ids = [] mapping = {} for video in data: video_id, flow_pool = video.split('-') video_id = int(video_id) if video_id not in video_ids: video_ids.append(video_id) mapping[video_id] = [flow_pool] else: mapping[video_id].append(flow_pool) # 过滤 if len(video_ids) == 0: log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") continue if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) # 移除 if len(filter_videos) == 0: log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") continue remove_videos = ['{}-{}'.format(video_id, flow_pool) for video_id in filter_videos for flow_pool in mapping[video_id]] redis_helper.remove_value_from_set(key_name=key_name, values=tuple(remove_videos)) log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") log_.info(f"app_type = {app_type} videos filter end!") log_.info("flow pool filter end!") def filter_flow_pool_process(app_type, flow_pool_id, key_name): # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_data_from_set(key_name=key_name) if data is None: log_.info(f"flow_pool_id = {flow_pool_id}, data is None") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") return # videoId与flowPool做mapping video_ids = [] mapping = {} for video in data: video_id, flow_pool = video.split('-') video_id = int(video_id) if video_id not in video_ids: video_ids.append(video_id) mapping[video_id] = [flow_pool] else: mapping[video_id].append(flow_pool) # 过滤 if len(video_ids) == 0: log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") return if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) # 移除 if len(filter_videos) == 0: log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") return remove_videos = ['{}-{}'.format(video_id, flow_pool) for video_id in filter_videos for flow_pool in mapping[video_id]] redis_helper.remove_value_from_set(key_name=key_name, values=tuple(remove_videos)) log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") def filter_flow_pool_new_with_level(): """流量池视频过滤""" log_.info("flow pool filter start ...") level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME) level_list = [level for level in json.loads(level_weight)] log_.info(f"level_list: {level_list}") app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']] for _, app_type in config_.APP_TYPE.items(): log_.info('app_type {} videos filter start...'.format(app_type)) if app_type in app_type_list: filter_flow_pool_18_19(app_type=app_type) else: for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]: log_.info(f"flow_pool_id = {flow_pool_id}") # 拼接redis-key if flow_pool_id == config_.QUICK_FLOW_POOL_ID: key_name = get_pool_redis_key(pool_type='flow_set', app_type=app_type, flow_pool_id=flow_pool_id) filter_flow_pool_process(app_type, flow_pool_id, key_name) else: for level in level_list: key_name = get_pool_redis_key(pool_type='flow_set', app_type=app_type, flow_pool_id=flow_pool_id, level=level) filter_flow_pool_process(app_type, flow_pool_id, key_name) log_.info(f"app_type = {app_type}, level = {level} videos filter end!") log_.info(f"app_type = {app_type} videos filter end!") log_.info("flow pool filter end!") def filter_flow_pool_level_score_process(app_type, flow_pool_id, key_name): # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info(f"flow_pool_id = {flow_pool_id}, data is None") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") return # videoId与flowPool做mapping video_ids = [] mapping = {} for video in data: video_id, flow_pool = video.split('-') video_id = int(video_id) if video_id not in video_ids: video_ids.append(video_id) mapping[video_id] = [flow_pool] else: mapping[video_id].append(flow_pool) # 过滤 if len(video_ids) == 0: log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}") log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!") return if app_type == config_.APP_TYPE['APP']: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) # 移除 if len(filter_videos) == 0: log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") return remove_videos = ['{}-{}'.format(video_id, flow_pool) for video_id in filter_videos for flow_pool in mapping[video_id]] redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos) log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!") def filter_flow_pool_new_with_level_score(): """流量池视频过滤""" log_.info("flow pool filter start ...") level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME) level_list = [level for level in json.loads(level_weight)] log_.info(f"level_list: {level_list}") app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']] for _, app_type in config_.APP_TYPE.items(): log_.info('app_type {} videos filter start...'.format(app_type)) if app_type in app_type_list: filter_flow_pool_18_19(app_type=app_type) else: for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]: log_.info(f"flow_pool_id = {flow_pool_id}") # 拼接redis-key if flow_pool_id == config_.QUICK_FLOW_POOL_ID: key_name = get_pool_redis_key(pool_type='flow_level_score', app_type=app_type, flow_pool_id=flow_pool_id) filter_flow_pool_level_score_process(app_type, flow_pool_id, key_name) else: for level in level_list: key_name = get_pool_redis_key(pool_type='flow_level_score', app_type=app_type, flow_pool_id=flow_pool_id, level=level) filter_flow_pool_level_score_process(app_type, flow_pool_id, key_name) log_.info(f"app_type = {app_type}, level = {level} videos filter end!") log_.info(f"app_type = {app_type} videos filter end!") log_.info("flow pool filter end!") def filter_flow_pool_18_19(app_type): """流量池视频过滤""" log_.info('app_type {} videos filter start...'.format(app_type)) # 拼接redis-key key_name = get_pool_redis_key(pool_type='flow', app_type=app_type) # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("app_type {} videos filter end!".format(app_type)) return video_ids = [int(video_id) for video_id in data] # 过滤 if len(video_ids) == 0: log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data)) log_.info("app_type {} videos filter end!".format(app_type)) return filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format( len(data), len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) # 移除 if len(filter_videos) == 0: log_.info("app_type {} videos filter end!".format(app_type)) return redis_helper.remove_value_from_zset(key_name=key_name, value=filter_videos) log_.info("app_type {} videos filter end!".format(app_type)) log_.info("flow pool filter end!") def filter_bottom(): """兜底视频过滤""" log_.info("bottom videos filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=config_.BOTTOM_KEY_NAME) if data is None: log_.info("data is None") log_.info("bottom videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("bottom videos filter end!") return redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos)) log_.info("bottom videos filter end!") def filter_rov_updated(): """修改过ROV的视频过滤""" log_.info("update rov videos filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME) if data is None: log_.info("data is None") log_.info("update rov videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': config_.UPDATE_ROV_KEY_NAME, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("update rov videos filter end!") return redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos)) log_.info("update rov videos filter end!") def filter_rov_updated_app(): """修改过ROV的视频过滤-app推荐状态过滤""" log_.info("update rov videos app filter start ...") # 获取视频 redis_helper = RedisHelper() data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP) if data is None: log_.info("data is None") log_.info("update rov videos app filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status_app(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("update rov videos app filter end!") return redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=list(filter_videos)) log_.info("update rov videos app filter end!") def get_pool_redis_key(pool_type, app_type=None, flow_pool_id=None, level=None): """ 拼接key :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池} :param app_type: 产品标识 :param flow_pool_id: 流量池ID :return: key_name """ redis_helper = RedisHelper() if pool_type == 'rov': # appType = 6 if app_type == config_.APP_TYPE['SHORT_VIDEO']: # 获取当前所在小时 redis_date = datetime.now().hour # 判断热度列表是否更新,未更新则使用前一小时的热度列表 key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}:{redis_date}' if redis_helper.key_exists(key_name): return key_name, redis_date else: if redis_date == 0: redis_date = 23 else: redis_date = redis_date - 1 key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}:{redis_date}' return key_name, redis_date # appType: [18, 19] elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}:' now_date = datetime.today().strftime('%Y%m%d') now_h = datetime.now().hour key_name = f"{key_name_prefix}{now_date}:{now_h}" if redis_helper.key_exists(key_name): return key_name, now_h else: if now_h == 0: redis_h = 23 redis_date = (datetime.today() - timedelta(days=1)).strftime('%Y%m%d') else: redis_h = now_h - 1 redis_date = now_date key_name = f"{key_name_prefix}{redis_date}:{redis_h}" return key_name, redis_h else: # appType = 13 票圈视频app if app_type == config_.APP_TYPE['APP']: key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP # # appType: [18, 19] # elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.' # 其他 else: key_name_prefix = config_.RECALL_KEY_NAME_PREFIX # 判断热度列表是否更新,未更新则使用前一天的热度列表 key_name = f"{key_name_prefix}{time.strftime('%Y%m%d')}" if redis_helper.key_exists(key_name): redis_date = date.today().strftime('%Y%m%d') else: redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') key_name = f"{key_name_prefix}{redis_date}" return key_name, redis_date elif pool_type == 'flow': # 流量池 if flow_pool_id == config_.QUICK_FLOW_POOL_ID: return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{flow_pool_id}" else: return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}" elif pool_type == 'flow_set': if flow_pool_id == config_.QUICK_FLOW_POOL_ID: return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{flow_pool_id}" else: if level is None: return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}" else: return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{app_type}:{level}" elif pool_type == 'flow_level_score': if flow_pool_id == config_.QUICK_FLOW_POOL_ID: return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{flow_pool_id}" else: return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{app_type}:{level}" else: log_.error('pool type error') return None, None def filter_app_pool(): """过滤票圈视频APP小时级数据""" log_.info("app pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') if now_h < 7: redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') redis_h = 21 elif now_h > 21: redis_date = now_date redis_h = 21 else: if now_h % 2 == 0: redis_date = now_date redis_h = now_h - 1 else: redis_date = now_date redis_h = now_h log_.info(f'redis_date = {redis_date}, redis_h = {redis_h}.') # 拼接key key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}' # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("app pool filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status_app(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("app pool filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("app pool filter end!") def filter_rov_h(): """过滤小程序小时级数据""" rule_params = config_.RULE_PARAMS log_.info("rov_h pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤两个视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER}{key}", values=filter_videos, expire_time=2*3600) log_.info("rov_h pool filter end!") def filter_rov_day(): """过滤小程序天级数据""" rule_params = config_.RULE_PARAMS_DAY log_.info("rov_day pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') log_.info(f'now_date = {now_date}.') for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤三个视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_BY_DAY, config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE, config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{key}.{now_date}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("rov_day pool filter end!") def filter_old_videos(): """过滤老视频数据""" log_.info("old videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') log_.info(f'now_date = {now_date}.') # 拼接key key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_date}' # 获取视频 data = redis_helper.get_data_from_set(key_name=key_name) if data is None: log_.info("data is None") log_.info("old videos filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("old videos filter end!") return redis_helper.remove_value_from_set(key_name=key_name, values=filter_videos) log_.info("old videos filter end!") def filter_process_with_region(data_key, rule_key, region, now_date, now_h): log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region}") # 需过滤视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H_H, config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H, # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H, config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H, config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H, # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{now_date}:{now_h}" # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] if data_key in ['data7', ]: filtered_result = filter_video_status_app(video_ids=video_ids) else: filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) # if i == 0: # # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 # redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER}" # f"{region}.{app_type}.{data_key}.{rule_key}", # values=filter_videos, expire_time=2 * 3600) # elif i == 1: # # 将地域分组24h的数据需要过滤的视频加入到线上过滤应用列表中 # redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}" # f"{region}.{app_type}.{data_key}.{rule_key}", # values=filter_videos, expire_time=2 * 3600) # elif i == 2: # # 将相对24h的数据需要过滤的视频加入到线上过滤应用列表中 # redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}" # f"{region}.{app_type}.{data_key}.{rule_key}", # values=filter_videos, expire_time=2 * 3600) log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region} videos filter end!") def filter_process_with_param(param, region_code_list, now_date, now_h): data_key = param.get('data') rule_key = param.get('rule') log_.info(f"param = {param} videos filter start... ") task_list = [ gevent.spawn(filter_process_with_region, data_key, rule_key, region, now_date, now_h) for region in region_code_list ] gevent.joinall(task_list) log_.info(f"param = {param} videos filter end!") def filter_region_videos(rule_params): """过滤地域分组规则视频""" region_code_list = [code for region, code in region_code.items()] + \ [code for city, code in config_.CITY_CODE.items()] log_.info("region_h videos filter start ...") # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') params_list = rule_params.get('params_list') pool = multiprocessing.Pool(processes=len(params_list)) for param in params_list: pool.apply_async( func=filter_process_with_param, args=(param, region_code_list, now_date, now_h) ) pool.close() pool.join() # task_list = [] # for param in rule_params.get('params_list'): # data_key = param.get('data') # rule_key = param.get('rule') # log_.info(f"data_key = {data_key}, rule_key = {rule_key}") # task_list.extend( # [ # gevent.spawn(filter_process_with_region, data_key, rule_key, region, now_date, now_h) # for region in region_code_list # ] # ) # gevent.joinall(task_list) log_.info("region_h videos filter end!") def filter_region_videos_by_day(): """过滤地域分组天级规则视频""" region_code_list = [code for region, code in region_code.items()] rule_params = config_.RULE_PARAMS_REGION_DAY log_.info("region_day videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') log_.info(f'now_date = {now_date}.') for region in region_code_list: log_.info(f"region = {region}") for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_DAY ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{region}.{key}.{now_date}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info(f"region = {region} videos filter end!") log_.info("region_day videos filter end!") def filter_rov_h_24h(): """过滤小程序小时级更新24h数据""" rule_params = config_.RULE_PARAMS_24H log_.info("rov_h_by24h pool filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤两个视频列表 key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}{key}", values=filter_videos, expire_time=2*3600) log_.info("rov_h_by24h pool filter end!") def filter_region_videos_24h(): """过滤地域分组24h规则视频""" region_code_list = [code for region, code in region_code.items()] rule_params = config_.RULE_PARAMS_REGION_24H log_.info("region_24h videos filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') # 获取当前所在小时 now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') for region in region_code_list: log_.info(f"region = {region}") for key, value in rule_params.items(): log_.info(f"rule = {key}, param = {value}") # 需过滤视频列表 key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H ] for i, key_prefix in enumerate(key_prefix_list): # 拼接key key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) if i == 0: # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中 redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{key}", values=filter_videos, expire_time=2 * 3600) # filter_data(filter_videos, region) log_.info(f"region = {region} videos filter end!") log_.info("region_24h videos filter end!") def filter_data(videos, region): now_dt = datetime.now().strftime('%Y%m%d%H%M') filepath = './data/filter_data' if not os.path.exists(filepath): os.makedirs(filepath) res = [] mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO) video_status_sql = "SELECT t1.id AS 'video_id', " \ "t1.transcode_status AS 'transcoding_status', " \ "t2.audit_status AS 'audit_status', " \ "t2.video_status AS 'open_status', " \ "t2.recommend_status AS 'applet_rec_status', " \ "t2.app_recommend_status AS 'app_rec_status', " \ "t3.charge AS 'payment_status', " \ "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \ "FROM longvideo.wx_video t1 " \ "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \ "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \ "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id" for video_id in list(videos): sql = "SELECT video_id, audit_status, applet_rec_status, open_status, " \ "payment_status, encryption_status, transcoding_status " \ "FROM ({}) " \ "WHERE video_id = {};".format(video_status_sql, video_id) data = mysql_helper.get_data(sql=sql) res += data columns = ['video_id', 'audit_status', 'applet_rec_status', 'open_status', 'payment_status', 'encryption_status', 'transcoding_status'] filter_df = pd.DataFrame(data=res, columns=columns) filename = f"filter_data_region_{region}_{now_dt}.csv" file = os.path.join(filepath, filename) filter_df.to_csv(file, index=False) def filter_whole_movies(): """过滤完整电影数据""" log_.info("whole movies filter start ...") redis_helper = RedisHelper() # 获取当前日期 now_date = date.today().strftime('%Y%m%d') now_h = datetime.now().hour log_.info(f'now_date = {now_date}, now_h = {now_h}.') # 拼接key key_name = f'{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{now_date}.{now_h}' # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("whole movies filter end!") return # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) log_.info({'key_name': key_name, 'filter_videos': filter_videos}) if len(filter_videos) == 0: log_.info("whole movies filter end!") return redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("whole movies filter end!") def filter_day_30day(): """过滤小程序天级更新30天数据""" log_.info("day_by_30day pool filter start ...") # 获取当前日期 now_date = date.today().strftime('%Y%m%d') rule_params = config_.RULE_PARAMS_30DAY_APP_TYPE params_list = rule_params.get('params_list') redis_helper = RedisHelper() log_.info(f'now_date = {now_date}.') for param in params_list: data_key = param.get('data') rule_key = param.get('rule') log_.info(f"param = {param} videos filter start... ") # 需过滤视频列表 key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY key_name = f"{key_prefix}{data_key}:{rule_key}:{now_date}" log_.info(f"key_name: {key_name}") # 获取视频 data = redis_helper.get_all_data_from_zset(key_name=key_name) if data is None: log_.info("data is None") log_.info("filter end!") continue # 过滤 video_ids = [int(video_id) for video_id in data] filtered_result = filter_video_status(video_ids=video_ids) # 求差集,获取需要过滤掉的视频,并从redis中移除 filter_videos = set(video_ids) - set(filtered_result) log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids), len(filtered_result), len(filter_videos))) if len(filter_videos) == 0: log_.info("filter end!") continue redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos)) log_.info("day_by_30day pool filter end!") def main(): try: # ROV召回池视频过滤 # filter_rov_pool() # appType = 6,ROV召回池视频过滤 # filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO']) # appType = 13,票圈视频APP视频过滤 # filter_rov_pool(app_type=config_.APP_TYPE['APP']) # appType = 18, ROV召回池视频过滤 # filter_rov_pool(app_type=config_.APP_TYPE['LAO_HAO_KAN_VIDEO']) # appType = 19, ROV召回池视频过滤 # filter_rov_pool(app_type=config_.APP_TYPE['ZUI_JING_QI']) # 流量池视频过滤 filter_flow_pool() # filter_flow_pool_new() filter_flow_pool_new_with_level() filter_flow_pool_new_with_level_score() # 兜底视频过滤 # filter_bottom() # 修改过ROV的视频过滤 # filter_rov_updated() # filter_rov_updated_app() # 运营强插相关推荐视频过滤 # filter_relevant_videos() # 按位置排序视频过滤 # filter_position_videos() # 过滤票圈视频APP小时级数据 # filter_app_pool() # 过滤小程序小时级数据 # filter_rov_h() # 过滤小程序天级数据 # filter_rov_day() # 过滤老视频数据 # filter_old_videos() # 过滤地域分组小时级视频 filter_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE) # filter_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H) # 过滤地域分组天级视频 # filter_region_videos_by_day() # 过滤小时级更新24h视频 # filter_rov_h_24h() # 过滤地域分组24h规则视频 # filter_region_videos_24h() # 过滤完整电影数据 # filter_whole_movies() # 过滤小程序天级更新30天数据 # filter_day_30day() except Exception as e: log_.error(traceback.format_exc()) send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text='{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()) ) return if __name__ == '__main__': main()