import datetime import traceback import gevent from db_helper import RedisHelper from my_utils import send_msg_to_feishu from my_config import set_config from log import Log config_, env = set_config() log_ = Log() # initial_param = {'data': 'data1', 'rule': 'rule4'} # new_param = config_.LHK_RULE_PARAMS redis_helper = RedisHelper() def get_religion_videos(now_date, religion_name): """获取宗教视频列表""" key_name_prefix = config_.RELIGION_VIDEOS[religion_name].get('key_name_prefix') religion_key_name = f"{key_name_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}" if not redis_helper.key_exists(religion_key_name): redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d') religion_key_name = f"{key_name_prefix}{redis_dt}" religion_videos = redis_helper.get_all_data_from_zset(key_name=religion_key_name, desc=True, with_scores=True) if religion_videos is None: return [] return religion_videos def merge_process(initial_key_name, new_key_name, now_videos, religion_video_id_list, rank_count): initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True) if initial_data is None or len(initial_data) == 0: return now_videos, religion_video_id_list initial_video_ids = [int(video_id) for video_id, _ in initial_data] initial_video_ids = [video_id for video_id in initial_video_ids if video_id not in now_videos] religion_video_id_list = [video_id for video_id in religion_video_id_list if video_id not in initial_video_ids] if len(religion_video_id_list) == 0: new_video_ids = initial_video_ids else: new_video_ids = [] for i, video_id in enumerate(initial_video_ids): new_video_ids.append(video_id) now_videos.append(video_id) if i % rank_count == 1 and len(religion_video_id_list) > 0: new_video_ids.append(religion_video_id_list[0]) now_videos.append(religion_video_id_list[0]) religion_video_id_list = religion_video_id_list[1:] # 按照排序给定分数 new_result = {} step = 100 / (len(new_video_ids) * 2) for i, video_id in enumerate(new_video_ids): score = 100 - i * step new_result[int(video_id)] = score # 写入新的key中 redis_helper.add_data_with_zset(key_name=new_key_name, data=new_result, expire_time=2 * 24 * 3600) return now_videos, religion_video_id_list def merge_with_region(now_date, now_h, region, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count): initial_data_key = initial_param.get('data') initial_rule_key = initial_param.get('rule') now_videos = [] # 地域小时级数据合并 region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{initial_data_key}:{initial_rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" new_region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{lhk_data_key}:{lhk_rule_key}:" \ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" now_videos, religion_video_id_list = merge_process(initial_key_name=region_h_key_name, new_key_name=new_region_h_key_name, now_videos=now_videos, religion_video_id_list=religion_video_id_list, rank_count=rank_count) # 地域24h数据合并 region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{initial_data_key}:" \ f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" new_region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{lhk_data_key}:" \ f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" now_videos, religion_video_id_list = merge_process(initial_key_name=region_24h_key_name, new_key_name=new_region_24h_key_name, now_videos=now_videos, religion_video_id_list=religion_video_id_list, rank_count=rank_count) # 24h筛选数据合并 dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{initial_data_key}:" \ f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" new_dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{lhk_data_key}:" \ f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" now_videos, religion_video_id_list = merge_process(initial_key_name=dup2_24h_key_name, new_key_name=new_dup2_24h_key_name, now_videos=now_videos, religion_video_id_list=religion_video_id_list, rank_count=rank_count) # 24h筛选后剩余数据合并 dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{initial_data_key}:" \ f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" new_dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{lhk_data_key}:" \ f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" now_videos, religion_video_id_list = merge_process(initial_key_name=dup3_24h_key_name, new_key_name=new_dup3_24h_key_name, now_videos=now_videos, religion_video_id_list=religion_video_id_list, rank_count=rank_count) log_.info(f"region = {region} update end!") def merge_videos(now_date, now_h, param, rule_params): """将宗教视频插入到默认视频列表中""" # 获取宗教视频列表 lhk_data_key = param.get('data') lhk_rule_key = param.get('rule') religion_name = rule_params.get(lhk_rule_key).get('religion_name') initial_param = rule_params.get(lhk_rule_key).get('initial_param') rank_count = rule_params.get(lhk_rule_key).get('rank_count') religion_videos = get_religion_videos(now_date=now_date, religion_name=religion_name) religion_video_id_list = [int(video_id) for video_id, _ in religion_videos] # 列表合并 region_code_list = [code for region, code in config_.REGION_CODE.items()] task_list = [ gevent.spawn( merge_with_region, now_date, now_h, region, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count ) for region in region_code_list ] gevent.joinall(task_list) # 特殊城市视频数据准备 for region, city_list in config_.REGION_CITY_MAPPING.items(): t = [ gevent.spawn( merge_with_region, now_date, now_h, city_code, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count ) for city_code in city_list ] gevent.joinall(t) def main(): try: log_.info(f"laohaokan recommend data update start...") now_date = datetime.datetime.today() now_h = datetime.datetime.now().hour log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}") lhk_rule_params = config_.LHK_RULE_PARAMS rule_params = lhk_rule_params.get('rule_params', {}) params_list = lhk_rule_params.get('params_list', []) for param in params_list: log_.info(f"param = {param} update start...") merge_videos(now_date, now_h, param, rule_params) log_.info(f"param = {param} update end!") log_.info(f"laohaokan recommend data update end!") # send_msg_to_feishu( # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), # msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新完成\n" # f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n" # f"now_h: {now_h}\n" # f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}" # ) except Exception as e: log_.error(f"老好看推荐视频数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': main()