123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import time
- import traceback
- from datetime import date, timedelta
- from utils import filter_video_status, send_msg_to_feishu
- from db_helper import RedisHelper
- from config import set_config
- from log import Log
- config_, env = set_config()
- log_ = Log()
- def filter_rov_pool():
- """ROV召回池视频过滤"""
- log_.info("rov recall pool filter start ...")
- # 拼接redis-key
- key_name, _ = get_pool_redis_key(pool_type='rov')
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
- if data is None:
- log_.info("data is None")
- log_.info("rov recall pool filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- if len(filter_videos) == 0:
- log_.info("rov recall pool filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- log_.info("rov recall pool filter end!")
- def filter_flow_pool():
- """流量池视频过滤"""
- log_.info("flow pool filter start ...")
- for _, app_type in config_.APP_TYPE.items():
- log_.info('app_type {} videos filter start...'.format(app_type))
- # 拼接redis-key
- key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
- if data is None:
- log_.info("data is None")
- log_.info("app_type {} videos filter end!".format(app_type))
- continue
- # videoId与flowPool做mapping
- video_ids = []
- mapping = {}
- for video in data:
- video_id, flow_pool = video.split('-')
- video_id = int(video_id)
- if video_id not in video_ids:
- video_ids.append(video_id)
- mapping[video_id] = [flow_pool]
- else:
- mapping[video_id].append(flow_pool)
- # 过滤
- if len(video_ids) == 0:
- log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
- log_.info("app_type {} videos filter end!".format(app_type))
- continue
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
- len(data), len(video_ids), len(filtered_result), len(filter_videos)))
- # 移除
- if len(filter_videos) == 0:
- log_.info("app_type {} videos filter end!".format(app_type))
- continue
- remove_videos = ['{}-{}'.format(video_id, flow_pool)
- for video_id in filter_videos
- for flow_pool in mapping[video_id]]
- redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
- log_.info("app_type {} videos filter end!".format(app_type))
- log_.info("flow pool filter end!")
- def filter_bottom():
- """兜底视频过滤"""
- log_.info("bottom videos filter start ...")
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
- if data is None:
- log_.info("data is None")
- log_.info("bottom videos filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- if len(filter_videos) == 0:
- log_.info("bottom videos filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
- log_.info("bottom videos filter end!")
- def filter_rov_updated():
- """修改过ROV的视频过滤"""
- log_.info("update rov videos filter start ...")
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
- if data is None:
- log_.info("data is None")
- log_.info("update rov videos filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- if len(filter_videos) == 0:
- log_.info("update rov videos filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
- log_.info("update rov videos filter end!")
- def get_pool_redis_key(pool_type, app_type=None):
- """
- 拼接key
- :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
- :param app_type: 产品标识
- :return: key_name
- """
- redis_helper = RedisHelper()
- if pool_type == 'rov':
- # 判断热度列表是否更新,未更新则使用前一天的热度列表
- key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
- if redis_helper.key_exists(key_name):
- redis_date = date.today().strftime('%Y%m%d')
- else:
- redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
- key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
- return key_name, redis_date
- elif pool_type == 'flow':
- # 流量池
- return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
- else:
- log_.error('pool type error')
- return None, None
- def main():
- try:
- # ROV召回池视频过滤
- filter_rov_pool()
- # 流量池视频过滤
- filter_flow_pool()
- # 兜底视频过滤
- filter_bottom()
- # 修改过ROV的视频过滤
- filter_rov_updated()
- except Exception as e:
- log_.error(traceback.format_exc())
- send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
- return
- if __name__ == '__main__':
- main()
|