|
- import os
- import time
- import json
- import traceback
- import ast
- import pandas as pd
- from datetime import date, timedelta, datetime
- from region_rule_rank_h import region_code
- from utils import filter_video_status, send_msg_to_feishu, filter_video_status_app
- from db_helper import RedisHelper, MysqlHelper
- from config import set_config
- from log import Log
- config_, env = set_config()
- log_ = Log()
- def filter_position_videos():
- """按位置排序视频过滤"""
- log_.info("position videos filter start...")
- position_key_list = [config_.RECALL_POSITION1_KEY_NAME, config_.RECALL_POSITION2_KEY_NAME]
- redis_helper = RedisHelper()
- for key_name in position_key_list:
- position = key_name.split('.')[-1]
- log_.info("position = {}".format(position))
- # 获取数据
- position_videos = redis_helper.get_data_from_redis(key_name=key_name)
- if position_videos is None:
- log_.info('position {} videos is None!'.format(position))
- continue
- else:
- # 过滤
- position_video_ids = [int(video_id) for video_id in ast.literal_eval(position_videos)]
- filter_video_ids = filter_video_status(video_ids=position_video_ids)
- # 重新写入redis
- redis_helper.set_data_to_redis(key_name=key_name,
- value=str(filter_video_ids),
- expire_time=30 * 3600)
- log_.info('position {} videos filter end!'.format(position))
- log_.info("position videos filter end!")
- def filter_relevant_videos():
- """运营强插相关推荐视频过滤"""
- log_.info("relevant videos with op filter filter start...")
- # 读取需要过滤的头部视频id
- redis_helper = RedisHelper()
- head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME)
- if head_videos is None or len(head_videos) == 0:
- log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos))
- return
- # 过滤
- remove_head_vids = []
- for head_vid in head_videos:
- key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
- # 头部视频 对应的key不存在时,将head_vid移除对应redis
- if not redis_helper.key_exists(key_name=key_name):
- remove_head_vids.append(head_vid)
- log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid))
- continue
- # 获取头部视频对应的相关视频
- relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
- # 该视频没有指定的相关性视频,将head_vid移除对应redis
- if relevant_videos is None:
- remove_head_vids.append(head_vid)
- log_.info('head_vid = {} not have relevant videos!'.format(head_vid))
- continue
- # 过滤
- relevant_videos = json.loads(relevant_videos)
- relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos]
- filtered_videos = filter_video_status(video_ids=relevant_video_ids)
- # 保留可推荐 且生效中 的视频
- relevant_videos_new = [
- item for item in relevant_videos
- if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) > int(time.time())
- ]
- # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key
- if len(relevant_videos_new) == 0:
- remove_head_vids.append(head_vid)
- redis_helper.del_keys(key_name=key_name)
- log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
- head_vid, len(relevant_videos_new)))
- continue
- # 重新写入redis
- # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间
- finish_time_list = [item['finish_time'] for item in relevant_videos_new]
- expire_time = max(finish_time_list) - int(time.time()) + 5
- if expire_time <= 0:
- log_.info('head_vid = {} expire_time <= 0!'.format(head_vid))
- continue
- # 存入redis
- redis_helper.set_data_to_redis(key_name=key_name,
- value=json.dumps(relevant_videos_new),
- expire_time=expire_time)
- log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
- head_vid, len(relevant_videos_new)))
- # 将需要移除的头部视频id进行移除
- if len(remove_head_vids) == 0:
- log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
- log_.info("relevant videos with op filter end!")
- return
- redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids))
- log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
- log_.info("relevant videos with op filter end!")
- def filter_rov_pool(app_type=None):
- """ROV召回池视频过滤"""
- log_.info("rov recall pool filter start ...")
- # 拼接redis-key
- if app_type is None:
- key_name, _ = get_pool_redis_key(pool_type='rov')
- else:
- log_.info("appType = {}".format(app_type))
- key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type)
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("rov recall pool filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- if app_type == config_.APP_TYPE['APP']:
- filtered_result = filter_video_status_app(video_ids=video_ids)
- else:
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("rov recall pool filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- log_.info("rov recall pool filter end!")
- def filter_flow_pool():
- """流量池视频过滤"""
- log_.info("flow pool filter start ...")
- app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
- for _, app_type in config_.APP_TYPE.items():
- log_.info('app_type {} videos filter start...'.format(app_type))
- if app_type in app_type_list:
- filter_flow_pool_18_19(app_type=app_type)
- else:
- # 拼接redis-key
- key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("app_type {} videos filter end!".format(app_type))
- continue
- # videoId与flowPool做mapping
- video_ids = []
- mapping = {}
- for video in data:
- video_id, flow_pool = video.split('-')
- video_id = int(video_id)
- if video_id not in video_ids:
- video_ids.append(video_id)
- mapping[video_id] = [flow_pool]
- else:
- mapping[video_id].append(flow_pool)
- # 过滤
- if len(video_ids) == 0:
- log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
- log_.info("app_type {} videos filter end!".format(app_type))
- continue
- if app_type == config_.APP_TYPE['APP']:
- filtered_result = filter_video_status_app(video_ids=video_ids)
- else:
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
- len(data), len(video_ids), len(filtered_result), len(filter_videos)))
- # 移除
- if len(filter_videos) == 0:
- log_.info("app_type {} videos filter end!".format(app_type))
- continue
- remove_videos = ['{}-{}'.format(video_id, flow_pool)
- for video_id in filter_videos
- for flow_pool in mapping[video_id]]
- redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
- log_.info("app_type {} videos filter end!".format(app_type))
- log_.info("flow pool filter end!")
- def filter_flow_pool_18_19(app_type):
- """流量池视频过滤"""
- log_.info('app_type {} videos filter start...'.format(app_type))
- # 拼接redis-key
- key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("app_type {} videos filter end!".format(app_type))
- return
- video_ids = [int(video_id) for video_id in data]
- # 过滤
- if len(video_ids) == 0:
- log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
- log_.info("app_type {} videos filter end!".format(app_type))
- return
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
- len(data), len(video_ids), len(filtered_result), len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- # 移除
- if len(filter_videos) == 0:
- log_.info("app_type {} videos filter end!".format(app_type))
- return
- redis_helper.remove_value_from_zset(key_name=key_name, value=filter_videos)
- log_.info("app_type {} videos filter end!".format(app_type))
- log_.info("flow pool filter end!")
- def filter_bottom():
- """兜底视频过滤"""
- log_.info("bottom videos filter start ...")
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_all_data_from_zset(key_name=config_.BOTTOM_KEY_NAME)
- if data is None:
- log_.info("data is None")
- log_.info("bottom videos filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- if len(filter_videos) == 0:
- log_.info("bottom videos filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
- log_.info("bottom videos filter end!")
- def filter_rov_updated():
- """修改过ROV的视频过滤"""
- log_.info("update rov videos filter start ...")
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME)
- if data is None:
- log_.info("data is None")
- log_.info("update rov videos filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': config_.UPDATE_ROV_KEY_NAME, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("update rov videos filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
- log_.info("update rov videos filter end!")
- def filter_rov_updated_app():
- """修改过ROV的视频过滤-app推荐状态过滤"""
- log_.info("update rov videos app filter start ...")
- # 获取视频
- redis_helper = RedisHelper()
- data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP)
- if data is None:
- log_.info("data is None")
- log_.info("update rov videos app filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status_app(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- if len(filter_videos) == 0:
- log_.info("update rov videos app filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=list(filter_videos))
- log_.info("update rov videos app filter end!")
- def get_pool_redis_key(pool_type, app_type=None):
- """
- 拼接key
- :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
- :param app_type: 产品标识
- :return: key_name
- """
- redis_helper = RedisHelper()
- if pool_type == 'rov':
- # appType = 6
- if app_type == config_.APP_TYPE['SHORT_VIDEO']:
- # 获取当前所在小时
- redis_date = datetime.now().hour
- # 判断热度列表是否更新,未更新则使用前一小时的热度列表
- key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
- if redis_helper.key_exists(key_name):
- return key_name, redis_date
- else:
- if redis_date == 0:
- redis_date = 23
- else:
- redis_date = redis_date - 1
- key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
- return key_name, redis_date
- # appType: [18, 19]
- elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.'
- now_date = datetime.today().strftime('%Y%m%d')
- now_h = datetime.now().hour
- key_name = f"{key_name_prefix}{now_date}.{now_h}"
- if redis_helper.key_exists(key_name):
- return key_name, now_h
- else:
- if now_h == 0:
- redis_h = 23
- redis_date = (datetime.today() - timedelta(days=1)).strftime('%Y%m%d')
- else:
- redis_h = now_h - 1
- redis_date = now_date
- key_name = f"{key_name_prefix}{redis_date}.{redis_h}"
- return key_name, redis_h
- else:
- # appType = 13 票圈视频app
- if app_type == config_.APP_TYPE['APP']:
- key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP
- # # appType: [18, 19]
- # elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- # key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.'
- # 其他
- else:
- key_name_prefix = config_.RECALL_KEY_NAME_PREFIX
- # 判断热度列表是否更新,未更新则使用前一天的热度列表
- key_name = key_name_prefix + time.strftime('%Y%m%d')
- if redis_helper.key_exists(key_name):
- redis_date = date.today().strftime('%Y%m%d')
- else:
- redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
- key_name = key_name_prefix + redis_date
- return key_name, redis_date
- elif pool_type == 'flow':
- # 流量池
- return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
- else:
- log_.error('pool type error')
- return None, None
- def filter_app_pool():
- """过滤票圈视频APP小时级数据"""
- log_.info("app pool filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- # 获取当前所在小时
- now_h = datetime.now().hour
- log_.info(f'now_date = {now_date}, now_h = {now_h}.')
- if now_h < 7:
- redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
- redis_h = 21
- elif now_h > 21:
- redis_date = now_date
- redis_h = 21
- else:
- if now_h % 2 == 0:
- redis_date = now_date
- redis_h = now_h - 1
- else:
- redis_date = now_date
- redis_h = now_h
- log_.info(f'redis_date = {redis_date}, redis_h = {redis_h}.')
- # 拼接key
- key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}'
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("app pool filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status_app(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("app pool filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- log_.info("app pool filter end!")
- def filter_rov_h():
- """过滤小程序小时级数据"""
- rule_params = config_.RULE_PARAMS
- log_.info("rov_h pool filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- # 获取当前所在小时
- now_h = datetime.now().hour
- log_.info(f'now_date = {now_date}, now_h = {now_h}.')
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 需过滤两个视频列表
- key_prefix_list = [
- config_.RECALL_KEY_NAME_PREFIX_BY_H,
- config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H,
- config_.RECALL_KEY_NAME_PREFIX_DUP_H
- ]
- for i, key_prefix in enumerate(key_prefix_list):
- # 拼接key
- key_name = f"{key_prefix}{key}.{now_date}.{now_h}"
- log_.info(f"key_name: {key_name}")
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("filter end!")
- continue
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("filter end!")
- continue
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- if i == 0:
- # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
- redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER}{key}",
- values=filter_videos, expire_time=2*3600)
- log_.info("rov_h pool filter end!")
- def filter_rov_day():
- """过滤小程序天级数据"""
- rule_params = config_.RULE_PARAMS_DAY
- log_.info("rov_day pool filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- log_.info(f'now_date = {now_date}.')
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 需过滤三个视频列表
- key_prefix_list = [
- config_.RECALL_KEY_NAME_PREFIX_BY_DAY,
- config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE,
- config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
- ]
- for i, key_prefix in enumerate(key_prefix_list):
- # 拼接key
- key_name = f"{key_prefix}{key}.{now_date}"
- log_.info(f"key_name: {key_name}")
- # 获取视频
- data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
- if data is None:
- log_.info("data is None")
- log_.info("filter end!")
- continue
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("filter end!")
- continue
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- log_.info("rov_day pool filter end!")
- def filter_old_videos():
- """过滤老视频数据"""
- log_.info("old videos filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- log_.info(f'now_date = {now_date}.')
- # 拼接key
- key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_date}'
- # 获取视频
- data = redis_helper.get_data_from_set(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("old videos filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- if len(filter_videos) == 0:
- log_.info("old videos filter end!")
- return
- redis_helper.remove_value_from_set(key_name=key_name, values=filter_videos)
- log_.info("old videos filter end!")
- def filter_region_videos():
- """过滤地域分组规则视频"""
- region_code_list = [code for region, code in region_code.items()]
- rule_params = config_.RULE_PARAMS_REGION
- log_.info("region_h videos filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- # 获取当前所在小时
- now_h = datetime.now().hour
- log_.info(f'now_date = {now_date}, now_h = {now_h}.')
- for region in region_code_list:
- log_.info(f"region = {region}")
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 需过滤视频列表
- key_prefix_list = [
- config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,
- config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H,
- # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H,
- # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H,
- config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H,
- config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H
- ]
- for i, key_prefix in enumerate(key_prefix_list):
- # 拼接key
- key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}"
- log_.info(f"key_name: {key_name}")
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("filter end!")
- continue
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("filter end!")
- continue
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- if i == 0:
- # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
- redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{key}",
- values=filter_videos, expire_time=2 * 3600)
- elif i == 1:
- # 将地域分组24h的数据需要过滤的视频加入到线上过滤应用列表中
- redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{key}",
- values=filter_videos, expire_time=2 * 3600)
- elif i == 2:
- # 将相对24h的数据需要过滤的视频加入到线上过滤应用列表中
- redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{key}",
- values=filter_videos, expire_time=2 * 3600)
- log_.info(f"region = {region} videos filter end!")
- log_.info("region_h videos filter end!")
- def filter_region_videos_by_day():
- """过滤地域分组天级规则视频"""
- region_code_list = [code for region, code in region_code.items()]
- rule_params = config_.RULE_PARAMS_REGION_DAY
- log_.info("region_day videos filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- log_.info(f'now_date = {now_date}.')
- for region in region_code_list:
- log_.info(f"region = {region}")
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 需过滤视频列表
- key_prefix_list = [
- config_.RECALL_KEY_NAME_PREFIX_REGION_BY_DAY
- ]
- for i, key_prefix in enumerate(key_prefix_list):
- # 拼接key
- key_name = f"{key_prefix}{region}.{key}.{now_date}"
- log_.info(f"key_name: {key_name}")
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("filter end!")
- continue
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("filter end!")
- continue
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- log_.info(f"region = {region} videos filter end!")
- log_.info("region_day videos filter end!")
- def filter_rov_h_24h():
- """过滤小程序小时级更新24h数据"""
- rule_params = config_.RULE_PARAMS_24H
- log_.info("rov_h_by24h pool filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- # 获取当前所在小时
- now_h = datetime.now().hour
- log_.info(f'now_date = {now_date}, now_h = {now_h}.')
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 需过滤两个视频列表
- key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H]
- for i, key_prefix in enumerate(key_prefix_list):
- # 拼接key
- key_name = f"{key_prefix}{key}.{now_date}.{now_h}"
- log_.info(f"key_name: {key_name}")
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("filter end!")
- continue
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("filter end!")
- continue
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- if i == 0:
- # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
- redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}{key}",
- values=filter_videos, expire_time=2*3600)
- log_.info("rov_h_by24h pool filter end!")
- def filter_region_videos_24h():
- """过滤地域分组24h规则视频"""
- region_code_list = [code for region, code in region_code.items()]
- rule_params = config_.RULE_PARAMS_REGION_24H
- log_.info("region_24h videos filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- # 获取当前所在小时
- now_h = datetime.now().hour
- log_.info(f'now_date = {now_date}, now_h = {now_h}.')
- for region in region_code_list:
- log_.info(f"region = {region}")
- for key, value in rule_params.items():
- log_.info(f"rule = {key}, param = {value}")
- # 需过滤视频列表
- key_prefix_list = [
- config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
- config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H,
- config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H
- ]
- for i, key_prefix in enumerate(key_prefix_list):
- # 拼接key
- key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}"
- log_.info(f"key_name: {key_name}")
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("filter end!")
- continue
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("filter end!")
- continue
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- if i == 0:
- # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
- redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{key}",
- values=filter_videos, expire_time=2 * 3600)
- # filter_data(filter_videos, region)
- log_.info(f"region = {region} videos filter end!")
- log_.info("region_24h videos filter end!")
- def filter_data(videos, region):
- now_dt = datetime.now().strftime('%Y%m%d%H%M')
- filepath = './data/filter_data'
- if not os.path.exists(filepath):
- os.makedirs(filepath)
- res = []
- mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
- video_status_sql = "SELECT t1.id AS 'video_id', " \
- "t1.transcode_status AS 'transcoding_status', " \
- "t2.audit_status AS 'audit_status', " \
- "t2.video_status AS 'open_status', " \
- "t2.recommend_status AS 'applet_rec_status', " \
- "t2.app_recommend_status AS 'app_rec_status', " \
- "t3.charge AS 'payment_status', " \
- "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
- "FROM longvideo.wx_video t1 " \
- "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
- "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
- "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
- for video_id in list(videos):
- sql = "SELECT video_id, audit_status, applet_rec_status, open_status, " \
- "payment_status, encryption_status, transcoding_status " \
- "FROM ({}) " \
- "WHERE video_id = {};".format(video_status_sql, video_id)
- data = mysql_helper.get_data(sql=sql)
- res += data
- columns = ['video_id', 'audit_status', 'applet_rec_status', 'open_status',
- 'payment_status', 'encryption_status', 'transcoding_status']
- filter_df = pd.DataFrame(data=res, columns=columns)
- filename = f"filter_data_region_{region}_{now_dt}.csv"
- file = os.path.join(filepath, filename)
- filter_df.to_csv(file, index=False)
- def filter_whole_movies():
- """过滤完整电影数据"""
- log_.info("whole movies filter start ...")
- redis_helper = RedisHelper()
- # 获取当前日期
- now_date = date.today().strftime('%Y%m%d')
- now_h = datetime.now().hour
- log_.info(f'now_date = {now_date}, now_h = {now_h}.')
- # 拼接key
- key_name = f'{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{now_date}.{now_h}'
- # 获取视频
- data = redis_helper.get_all_data_from_zset(key_name=key_name)
- if data is None:
- log_.info("data is None")
- log_.info("whole movies filter end!")
- return
- # 过滤
- video_ids = [int(video_id) for video_id in data]
- filtered_result = filter_video_status(video_ids=video_ids)
- # 求差集,获取需要过滤掉的视频,并从redis中移除
- filter_videos = set(video_ids) - set(filtered_result)
- log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
- len(filtered_result),
- len(filter_videos)))
- log_.info({'key_name': key_name, 'filter_videos': filter_videos})
- if len(filter_videos) == 0:
- log_.info("whole movies filter end!")
- return
- redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
- log_.info("whole movies filter end!")
- def main():
- try:
- # ROV召回池视频过滤
- filter_rov_pool()
- # appType = 6,ROV召回池视频过滤
- # filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO'])
- # appType = 13,票圈视频APP视频过滤
- filter_rov_pool(app_type=config_.APP_TYPE['APP'])
- # appType = 18, ROV召回池视频过滤
- filter_rov_pool(app_type=config_.APP_TYPE['LAO_HAO_KAN_VIDEO'])
- # appType = 19, ROV召回池视频过滤
- filter_rov_pool(app_type=config_.APP_TYPE['ZUI_JING_QI'])
- # 流量池视频过滤
- filter_flow_pool()
- # 兜底视频过滤
- filter_bottom()
- # 修改过ROV的视频过滤
- filter_rov_updated()
- filter_rov_updated_app()
- # 运营强插相关推荐视频过滤
- # filter_relevant_videos()
- # 按位置排序视频过滤
- # filter_position_videos()
- # 过滤票圈视频APP小时级数据
- filter_app_pool()
- # 过滤小程序小时级数据
- filter_rov_h()
- # 过滤小程序天级数据
- # filter_rov_day()
- # 过滤老视频数据
- # filter_old_videos()
- # 过滤地域分组小时级视频
- filter_region_videos()
- # 过滤地域分组天级视频
- filter_region_videos_by_day()
- # 过滤小时级更新24h视频
- filter_rov_h_24h()
- # 过滤地域分组24h规则视频
- filter_region_videos_24h()
- # 过滤完整电影数据
- filter_whole_movies()
- except Exception as e:
- log_.error(traceback.format_exc())
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text='{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc())
- )
- return
- if __name__ == '__main__':
- main()
|