import multiprocessing import os import sys import time import traceback import gevent import datetime import pandas as pd import math from functools import reduce from odps import ODPS from threading import Timer, Thread from my_utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \ check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos from my_config import set_config from log import Log from check_video_limit_distribute import update_limit_video_score config_, _ = set_config() log_ = Log() region_code = config_.REGION_CODE def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove): redis_helper = RedisHelper() if redis_helper.key_exists(key_name=initial_key_name): initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True) # 屏蔽视频过滤 initial_video_ids = [int(video_id) for video_id, _ in initial_data] shield_key_name_list = shield_config.get(region, None) if shield_key_name_list is not None: initial_video_ids = filter_shield_video(video_ids=initial_video_ids, shield_key_name_list=shield_key_name_list) # 涉政视频过滤 if political_filter is True: initial_video_ids = filter_political_videos(video_ids=initial_video_ids) dup_data = {} # 视频去重逻辑 if dup_remove is True: for video_id, score in initial_data: if int(video_id) not in h_video_ids and int(video_id) in initial_video_ids: dup_data[int(video_id)] = score h_video_ids.append(int(video_id)) else: for video_id, score in initial_data: if int(video_id) in initial_video_ids: dup_data[int(video_id)] = score if len(dup_data) > 0: redis_helper.add_data_with_zset(key_name=dup_key_name, data=dup_data, expire_time=2 * 24 * 3600) # 限流视频score调整 update_limit_video_score(initial_videos=dup_data, key_name=dup_key_name) return h_video_ids def dup_to_redis(now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key, region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove): """将地域分组小时级数据与其他召回视频池去重,存入对应的redis""" log_.info(f"region = {region} dup start ...") # ##### 获取地域小时级数据 region_h_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" redis_helper = RedisHelper() if redis_helper.key_exists(key_name=region_h_key_name): region_h_data = redis_helper.get_all_data_from_zset(key_name=region_h_key_name, with_scores=True) h_video_ids = [int(video_id) for video_id, _ in region_h_data] else: h_video_ids = [] # ##### 去重更新不区分地域小时级列表,并另存为redis中 if h_rule_key is not None: h_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{h_rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_dup_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_key_name, dup_key_name=h_dup_key_name, region=region, political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove) # ##### 去重更新地域分组小时级24h列表,并另存为redis中 region_24h_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" region_24h_dup_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name, dup_key_name=region_24h_dup_key_name, region=region, political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove) if rule_rank_h_flag == '48h': # ##### 去重小程序相对48h更新结果,并另存为redis中 h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_48h_dup_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name, dup_key_name=h_48h_dup_key_name, region=region, political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove) # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中 if by_48h_rule_key == 'rule1': other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \ f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" other_h_48h_dup_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name, dup_key_name=other_h_48h_dup_key_name, region=region, political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove) else: # ##### 去重小程序相对24h更新结果,并另存为redis中 h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_24h_dup_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name, dup_key_name=h_24h_dup_key_name, region=region, political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove) # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中 # if by_24h_rule_key in ['rule3', 'rule4']: other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \ f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" other_h_24h_dup_key_name = \ f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name, dup_key_name=other_h_24h_dup_key_name, region=region, political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove) log_.info(f"region = {region} dup end!") def copy_data_for_city(region, city_code, data_key, rule_key, now_date, now_h, shield_config): """copy 对应数据到城市对应redis,并做相应屏蔽视频过滤""" log_.info(f"city_code = {city_code} start ...") redis_helper = RedisHelper() key_prefix_list = [ config_.RECALL_KEY_NAME_PREFIX_DUP_H_H, # 不区分地域小时级 config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域相对24h config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h筛选后 ] for key_prefix in key_prefix_list: region_key = f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" city_key = f"{key_prefix}{city_code}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" if not redis_helper.key_exists(key_name=region_key): continue region_data = redis_helper.get_all_data_from_zset(key_name=region_key, with_scores=True) if not region_data: continue # 屏蔽视频过滤 region_video_ids = [int(video_id) for video_id, _ in region_data] shield_key_name_list = shield_config.get(city_code, None) # shield_key_name_list = config_.SHIELD_CONFIG.get(city_code, None) if shield_key_name_list is not None: filtered_video_ids = filter_shield_video(video_ids=region_video_ids, shield_key_name_list=shield_key_name_list) else: filtered_video_ids = region_video_ids city_data = {} for video_id, score in region_data: if int(video_id) in filtered_video_ids: city_data[int(video_id)] = score if len(city_data) > 0: redis_helper.add_data_with_zset(key_name=city_key, data=city_data, expire_time=2 * 24 * 3600) log_.info(f"city_code = {city_code} end!") def dup_with_param(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag): log_.info(f"param = {param} start...") data_key = param.get('data') data_param = data_params_item.get(data_key) log_.info(f"data_key = {data_key}, data_param = {data_param}") rule_key = param.get('rule') rule_param = rule_params_item.get(rule_key) log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}") h_rule_key = rule_param.get('h_rule_key', None) region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1') by_24h_rule_key = rule_param.get('24h_rule_key', None) by_48h_rule_key = rule_param.get('48h_rule_key', None) dup_remove = rule_param.get('dup_remove', True) # 屏蔽视频过滤 shield_config = rule_param.get('shield_config', config_.SHIELD_CONFIG) # 涉政视频过滤 political_filter = rule_param.get('political_filter', None) task_list = [ gevent.spawn(dup_to_redis, now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key, region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove) for region in region_code_list ] gevent.joinall(task_list) # 特殊城市视频数据准备 for region, city_list in config_.REGION_CITY_MAPPING.items(): t = [ gevent.spawn( copy_data_for_city, region, city_code, data_key, rule_key, now_date, now_h, shield_config ) for city_code in city_list ] gevent.joinall(t) log_.info(f"param = {param} end!") def dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params): # 获取特征数据 data_params_item = rule_params.get('data_params') rule_params_item = rule_params.get('rule_params') params_list = rule_params.get('params_list') pool = multiprocessing.Pool(processes=len(params_list)) for param in params_list: pool.apply_async( func=dup_with_param, args=(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag) ) pool.close() pool.join() def h_timer_check(): try: rule_rank_h_flag = sys.argv[1] if rule_rank_h_flag == '48h': rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H else: rule_params = config_.RULE_PARAMS_REGION_APP_TYPE now_date = datetime.datetime.today() log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}") now_h = datetime.datetime.now().hour region_code_list = [code for region, code in region_code.items()] # 获取数据更新状态 redis_helper = RedisHelper() rule_24h_status = redis_helper.get_data_from_redis( key_name=f"{config_.RULE_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}") region_24h_status = redis_helper.get_data_from_redis( key_name=f"{config_.REGION_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}") rule_h_status = redis_helper.get_data_from_redis( key_name=f"{config_.RULE_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}") region_h_status = redis_helper.get_data_from_redis( key_name=f"{config_.REGION_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}") log_.info(f"rule_24h_status: {rule_24h_status}, region_24h_status: {region_24h_status}, " f"rule_h_status: {rule_h_status}, region_h_status: {region_h_status}") if rule_24h_status == '1' and region_24h_status == '1' and rule_h_status == '1' and region_h_status == '1': dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params) log_.info(f"recommend region data dup end!") else: # 数据未更新好,1分钟后重新检查 Timer(60, h_timer_check).start() except Exception as e: log_.error(f"地域相关推荐数据去重更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 地域相关推荐数据去重更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': log_.info(f"recommend region data dup start...") h_timer_check()