123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- import multiprocessing
- import os
- import sys
- import time
- import traceback
- import gevent
- import datetime
- import pandas as pd
- import math
- from functools import reduce
- from odps import ODPS
- from threading import Timer, Thread
- from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
- check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
- from config import set_config
- from log import Log
- from check_video_limit_distribute import update_limit_video_score
- config_, _ = set_config()
- log_ = Log()
- region_code = config_.REGION_CODE
- def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove):
- redis_helper = RedisHelper()
- if redis_helper.key_exists(key_name=initial_key_name):
- initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
- # 屏蔽视频过滤
- initial_video_ids = [int(video_id) for video_id, _ in initial_data]
- shield_key_name_list = shield_config.get(region, None)
- if shield_key_name_list is not None:
- initial_video_ids = filter_shield_video(video_ids=initial_video_ids,
- shield_key_name_list=shield_key_name_list)
- # 涉政视频过滤
- if political_filter is True:
- initial_video_ids = filter_political_videos(video_ids=initial_video_ids)
- dup_data = {}
- # 视频去重逻辑
- if dup_remove is True:
- for video_id, score in initial_data:
- if int(video_id) not in h_video_ids and int(video_id) in initial_video_ids:
- dup_data[int(video_id)] = score
- h_video_ids.append(int(video_id))
- else:
- for video_id, score in initial_data:
- if int(video_id) in initial_video_ids:
- dup_data[int(video_id)] = score
- if len(dup_data) > 0:
- redis_helper.add_data_with_zset(key_name=dup_key_name, data=dup_data, expire_time=2 * 24 * 3600)
- # 限流视频score调整
- update_limit_video_score(initial_videos=dup_data, key_name=dup_key_name)
- return h_video_ids
- def dup_to_redis(now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key,
- region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove):
- """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
- # ##### 获取地域小时级数据
- region_h_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- redis_helper = RedisHelper()
- if redis_helper.key_exists(key_name=region_h_key_name):
- region_h_data = redis_helper.get_all_data_from_zset(key_name=region_h_key_name, with_scores=True)
- h_video_ids = [int(video_id) for video_id, _ in region_h_data]
- else:
- h_video_ids = []
- # ##### 去重更新不区分地域小时级列表,并另存为redis中
- if h_rule_key is not None:
- h_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{h_rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_dup_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_key_name,
- dup_key_name=h_dup_key_name, region=region, political_filter=political_filter,
- shield_config=shield_config, dup_remove=dup_remove)
- # ##### 去重更新地域分组小时级24h列表,并另存为redis中
- region_24h_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- region_24h_dup_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name,
- dup_key_name=region_24h_dup_key_name, region=region, political_filter=political_filter,
- shield_config=shield_config, dup_remove=dup_remove)
- if rule_rank_h_flag == '48h':
- # ##### 去重小程序相对48h更新结果,并另存为redis中
- h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_48h_dup_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name,
- dup_key_name=h_48h_dup_key_name, region=region, political_filter=political_filter,
- shield_config=shield_config, dup_remove=dup_remove)
- # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中
- if by_48h_rule_key == 'rule1':
- other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \
- f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- other_h_48h_dup_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name,
- dup_key_name=other_h_48h_dup_key_name, region=region,
- political_filter=political_filter, shield_config=shield_config,
- dup_remove=dup_remove)
- else:
- # ##### 去重小程序相对24h更新结果,并另存为redis中
- h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_24h_dup_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name,
- dup_key_name=h_24h_dup_key_name, region=region, political_filter=political_filter,
- shield_config=shield_config, dup_remove=dup_remove)
- # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
- # if by_24h_rule_key in ['rule3', 'rule4']:
- other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
- f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- other_h_24h_dup_key_name = \
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
- dup_key_name=other_h_24h_dup_key_name, region=region, political_filter=political_filter,
- shield_config=shield_config, dup_remove=dup_remove)
- def copy_data_for_city(region, city_code, data_key, rule_key, now_date, now_h, shield_config):
- """copy 对应数据到城市对应redis,并做相应屏蔽视频过滤"""
- log_.info(f"city_code = {city_code} start ...")
- redis_helper = RedisHelper()
- key_prefix_list = [
- config_.RECALL_KEY_NAME_PREFIX_DUP_H_H, # 不区分地域小时级
- config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域相对24h
- config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h
- config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h筛选后
- ]
- for key_prefix in key_prefix_list:
- region_key = f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- city_key = f"{key_prefix}{city_code}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- if not redis_helper.key_exists(key_name=region_key):
- continue
- region_data = redis_helper.get_all_data_from_zset(key_name=region_key, with_scores=True)
- if not region_data:
- continue
- # 屏蔽视频过滤
- region_video_ids = [int(video_id) for video_id, _ in region_data]
- shield_key_name_list = shield_config.get(city_code, None)
- # shield_key_name_list = config_.SHIELD_CONFIG.get(city_code, None)
- if shield_key_name_list is not None:
- filtered_video_ids = filter_shield_video(video_ids=region_video_ids,
- shield_key_name_list=shield_key_name_list)
- else:
- filtered_video_ids = region_video_ids
- city_data = {}
- for video_id, score in region_data:
- if int(video_id) in filtered_video_ids:
- city_data[int(video_id)] = score
- if len(city_data) > 0:
- redis_helper.add_data_with_zset(key_name=city_key, data=city_data, expire_time=2 * 24 * 3600)
- log_.info(f"city_code = {city_code} end!")
- def dup_with_param(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag):
- log_.info(f"param = {param} start...")
- data_key = param.get('data')
- data_param = data_params_item.get(data_key)
- log_.info(f"data_key = {data_key}, data_param = {data_param}")
- rule_key = param.get('rule')
- rule_param = rule_params_item.get(rule_key)
- log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
- h_rule_key = param.get('h_rule_key', None)
- region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
- by_24h_rule_key = param.get('24h_rule_key', None)
- by_48h_rule_key = param.get('48h_rule_key', None)
- dup_remove = param.get('dup_remove', True)
- # 屏蔽视频过滤
- shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
- # 涉政视频过滤
- political_filter = param.get('political_filter', None)
- task_list = [
- gevent.spawn(dup_to_redis,
- now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key,
- by_48h_rule_key, region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove)
- for region in region_code_list
- ]
- gevent.joinall(task_list)
- # 特殊城市视频数据准备
- for region, city_list in config_.REGION_CITY_MAPPING.items():
- t = [
- gevent.spawn(
- copy_data_for_city,
- region, city_code, data_key, rule_key, now_date, now_h, shield_config
- )
- for city_code in city_list
- ]
- gevent.joinall(t)
- log_.info(f"param = {param} end!")
- def dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params):
- # 获取特征数据
- data_params_item = rule_params.get('data_params')
- rule_params_item = rule_params.get('rule_params')
- params_list = rule_params.get('params_list')
- pool = multiprocessing.Pool(processes=len(params_list))
- for param in params_list:
- pool.apply_async(
- func=dup_with_param,
- args=(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag)
- )
- pool.close()
- pool.join()
- def h_timer_check():
- try:
- rule_rank_h_flag = sys.argv[1]
- if rule_rank_h_flag == '48h':
- rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
- else:
- rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
- now_date = datetime.datetime.today()
- log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
- now_h = datetime.datetime.now().hour
- region_code_list = [code for region, code in region_code.items()]
- # 获取数据更新状态
- redis_helper = RedisHelper()
- rule_24h_status = redis_helper.get_data_from_redis(
- key_name=f"{config_.RULE_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
- region_24h_status = redis_helper.get_data_from_redis(
- key_name=f"{config_.REGION_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
- rule_h_status = redis_helper.get_data_from_redis(
- key_name=f"{config_.RULE_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
- region_h_status = redis_helper.get_data_from_redis(
- key_name=f"{config_.REGION_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
- log_.info(f"rule_24h_status: {rule_24h_status}, region_24h_status: {region_24h_status}, "
- f"rule_h_status: {rule_h_status}, region_h_status: {region_h_status}")
- if rule_24h_status == '1' and region_24h_status == '1' and rule_h_status == '1' and region_h_status == '1':
- dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params)
- else:
- # 数据未更新好,1分钟后重新检查
- Timer(60, h_timer_check).start()
- except Exception as e:
- log_.error(f"地域相关推荐数据去重更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 地域相关推荐数据去重更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- log_.info(f"recommend region data dup start...")
- h_timer_check()
|