import json import random import time import multiprocessing import traceback import hashlib from datetime import datetime, timedelta import config from log import Log from config import set_config from video_recall import PoolRecall from video_rank import video_new_rank2, video_sank_pos_rank,video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2 from db_helper import RedisHelper import gevent from utils import FilterVideos, get_user_has30day_return import ast log_ = Log() config_ = set_config() def relevant_video_top_recommend(request_id, app_type, mid, uid, head_vid, videos, size): """ 相关推荐强插 运营给定置顶相关性视频 :param request_id: request_id :param app_type: 产品标识 type-int :param mid: mid :param uid: uid :param head_vid: 相关推荐头部视频id type-int :param videos: 当前相关推荐结果 type-list :param size: 返回视频个数 type-int :return: rank_result """ # 获取头部视频对应的相关性视频 key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid) redis_helper = RedisHelper() relevant_videos = redis_helper.get_data_from_redis(key_name=key_name) if relevant_videos is None: # 该视频没有指定的相关性视频 return videos relevant_videos = json.loads(relevant_videos) # 按照指定顺序排序 relevant_videos_sorted = sorted(relevant_videos, key=lambda x: x['order'], reverse=False) # 过滤 relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos_sorted] filter_helper = FilterVideos(request_id=request_id, app_type=app_type, video_ids=relevant_video_ids, mid=mid, uid=uid) filtered_ids = filter_helper.filter_videos() if filtered_ids is None: return videos # 获取生效中的视频 now = int(time.time()) relevant_videos_in_effect = [ {'videoId': int(item['recommend_vid']), 'pushFrom': config_.PUSH_FROM['relevant_video_op'], 'abCode': config_.AB_CODE['relevant_video_op']} for item in relevant_videos_sorted if item['start_time'] < now < item['finish_time'] and int(item['recommend_vid']) in filtered_ids ] if len(relevant_videos_in_effect) == 0: return videos # 与现有排序结果 进行合并重排 # 获取现有排序结果中流量池视频 及其位置 relevant_ids = [item['videoId'] for item in relevant_videos_in_effect] flow_pool_videos = [] other_videos = [] for i, item in enumerate(videos): if item.get('pushFrom', None) == config_.PUSH_FROM['flow_recall'] and item.get('videoId') not in relevant_ids: flow_pool_videos.append((i, item)) elif item.get('videoId') not in relevant_ids: other_videos.append(item) else: continue # 重排,保持流量池视频位置不变 rank_result = relevant_videos_in_effect + other_videos for i, item in flow_pool_videos: rank_result.insert(i, item) return rank_result[:size] def video_position_recommend(request_id, mid, uid, app_type, videos): # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, # algo_type=algo_type, client_info=client_info) redis_helper = RedisHelper() pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME) pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME) if pos1_vids is not None: pos1_vids = ast.literal_eval(pos1_vids) if pos2_vids is not None: pos2_vids = ast.literal_eval(pos2_vids) pos1_vids = [] if pos1_vids is None else pos1_vids pos2_vids = [] if pos2_vids is None else pos2_vids pos1_vids = [int(i) for i in pos1_vids] pos2_vids = [int(i) for i in pos2_vids] filter_1 = FilterVideos(request_id=request_id, app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid) filter_2 = FilterVideos(request_id=request_id, app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid) t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)] gevent.joinall(t) filted_list = [i.get() for i in t] pos1_vids = filted_list[0] pos2_vids = filted_list[1] videos = positon_duplicate(pos1_vids, pos2_vids, videos) if pos1_vids is not None and len(pos1_vids) >0 : videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100, 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']}) if pos2_vids is not None and len(pos2_vids) >0 : videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100, 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']}) return videos[:10] def positon_duplicate(pos1_vids, pos2_vids, videos): s = set() if pos1_vids is not None and len(pos1_vids) >0: s.add(int(pos1_vids[0])) if pos2_vids is not None and len(pos2_vids) >0: s.add(int(pos2_vids[0])) l = [] for item in videos: if item['videoId'] in s: continue else: l.append(item) return l def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info, expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='', no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None, shield_config=None, level_weight=None, flow_pool_abtest_group=None): """ 首页线上推荐逻辑 :param request_id: request_id :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param top_K: 保证topK为召回池视频 type-int :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :param expire_time: 末位视频记录redis过期时间 :param ab_code: AB实验code :param video_id: 相关推荐头部视频id :param params: :return: """ result = {} # ####### 多进程召回 start_recall = time.time() # log_.info('====== recall') ''' cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=cores) pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code) _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() pool_list = [ # rov召回池 pool.apply_async(pool_recall.rov_pool_recall, (size,)), # 流量池 pool.apply_async(pool_recall.flow_pool_recall, (size,)) ] recall_result_list = [p.get() for p in pool_list] pool.close() pool.join() ''' recall_result_list = [] pool_recall = PoolRecall(request_id=request_id, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id, level_weight=level_weight) # _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() # # 小时级实验 # if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]: # t = [gevent.spawn(pool_recall.rule_recall_by_h, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID), # gevent.spawn(pool_recall.flow_pool_recall, size)] # # 小时级实验 # elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]: # t = [gevent.spawn(pool_recall.rov_pool_recall_by_h, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID), # gevent.spawn(pool_recall.flow_pool_recall, size)] # 地域分组实验 # if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]: # load test config exp_config = None if ab_code == 60058: exp_config = pool_recall.get_U2I_config() elif ab_code == 60059: exp_config = pool_recall.get_w2v_config() elif ab_code == 60060: exp_config = pool_recall.get_test_config() elif ab_code == 60061: exp_config = pool_recall.get_simrecall_config() elif ab_code == 60062: exp_config = pool_recall.get_u2u2i_config() elif ab_code == 60063: exp_config = pool_recall.get_simrecall_config_new() elif ab_code == 60064: exp_config = pool_recall.get_video_recall_config() #print("exp_config:", exp_config) if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)] # if ab_code == 60058: # t.append(gevent.spawn(pool_recall.get_U2I_reall, mid)) # t.append(gevent.spawn(pool_recall.get_play_reall, mid)) # elif ab_code == 60059: # t.append(gevent.spawn(pool_recall.get_word2vec_item_reall)) # elif ab_code == 60061 or ab_code == 60063: # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) # elif ab_code == 60062: # t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid)) # elif ab_code == 60064: # t.append(gevent.spawn(pool_recall.get_return_video_reall)) elif flow_pool_abtest_group == 'experimental_flow_set_level': t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group), gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size, flow_pool_abtest_group=flow_pool_abtest_group)] else: t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group), gevent.spawn(pool_recall.flow_pool_recall, size, flow_pool_abtest_group=flow_pool_abtest_group)] if ab_code == 60058: t.append(gevent.spawn(pool_recall.get_U2I_reall, mid)) t.append(gevent.spawn(pool_recall.get_play_reall, mid)) elif ab_code == 60059: t.append(gevent.spawn(pool_recall.get_word2vec_item_reall)) elif ab_code == 60061 or ab_code == 60063: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) elif ab_code == 60062: t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid)) elif ab_code == 60064: t.append(gevent.spawn(pool_recall.get_return_video_reall)) # 最惊奇相关推荐实验 # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']: # t = [gevent.spawn(pool_recall.relevant_recall_19, video_id, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall_18_19, size)] # 最惊奇完整影视实验 # elif ab_code == config_.AB_CODE['whole_movies']: # t = [gevent.spawn(pool_recall.rov_pool_recall_19, size, expire_time)] # 最惊奇/老好看实验 # elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall_18_19, size)] # # 天级实验 # elif ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]: # t = [gevent.spawn(pool_recall.rov_pool_recall_by_day, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID), # gevent.spawn(pool_recall.flow_pool_recall, size)] # 老视频实验 # elif ab_code in [config_.AB_CODE['old_video']]: # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall, size), # gevent.spawn(pool_recall.old_videos_recall, size)] # else: # if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time)] # else: # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID), # gevent.spawn(pool_recall.flow_pool_recall, size)] gevent.joinall(t) recall_result_list = [i.get() for i in t] # end_recall = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'mid': mid, # 'uid': uid, # 'operation': 'recall', # 'recall_result': recall_result_list, # 'executeTime': (time.time() - start_recall) * 1000 # }) result['recallResult'] = recall_result_list result['recallTime'] = (time.time() - start_recall) * 1000 # ####### 排序 start_rank = time.time() # log_.info('====== rank') if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: if ab_code in [ config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'], config_.AB_CODE['top_video_relevant_appType_19'] ]: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } else: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': [] } else: if recall_result_list[1]: redis_helper = RedisHelper() quick_flow_pool_P = redis_helper.get_data_from_redis( key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}" ) if quick_flow_pool_P: flow_pool_P = quick_flow_pool_P data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } else: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[2] } data['u2i_recall'] = [] data['u2i_play_recall'] = [] data['w2v_recall'] = [] data['sim_recall'] = [] data['u2u2i_recall'] = [] if ab_code == 60058: if len(recall_result_list)>=4: data['u2i_recall'] = recall_result_list[3] if len(recall_result_list)>=5: data['u2i_play_recall'] = recall_result_list[4] elif ab_code == 60059: if len(recall_result_list)>=4: data['w2v_recall'] = recall_result_list[3] elif (ab_code == 60061 or ab_code == 60063): if len(recall_result_list)>=4: data['sim_recall'] = recall_result_list[3] elif ab_code == 60062: if len(recall_result_list)>=4: data['u2u2i_recall'] = recall_result_list[3] elif ab_code == 60064: if len(recall_result_list)>=4: #if ab_code=="ab_new_test": data['return_video_recall'] = recall_result_list[3] # rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P)) #else: #print("data['hot_recall']", data['hot_recall']) # 60058: u2itag, 60059:word2vec, 60061: sim_recall, 60062: u2u2i # if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061 \ # or ab_code == 60062 or ab_code== 60063 or ab_code == 60064: # rank_result, flow_num = video_sank_pos_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=exp_config) # result['flow_num'] = flow_num # if rank_result: # result['rank_num'] = len(rank_result) # else: rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P)) # 老视频实验 # if ab_code in [config_.AB_CODE['old_video']]: # rank_result = video_rank_with_old_video(rank_result=rank_result, old_video_recall=recall_result_list[2], # size=size, top_K=top_K, old_video_index=old_video_index) # end_rank = time.time() # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'mid': mid, # 'uid': uid, # 'operation': 'rank', # 'rank_result': rank_result, # 'executeTime': (time.time() - start_rank) * 1000 # }) result['rankResult'] = rank_result result['rankTime'] = (time.time() - start_rank) * 1000 # if not rank_result: # # 兜底策略 # # log_.info('====== bottom strategy') # start_bottom = time.time() # rank_result = bottom_strategy2( # size=size, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, params=params # ) # # # if ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_130'): # # rank_result = bottom_strategy2( # # size=size, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, params=params # # ) # # else: # # rank_result = bottom_strategy( # # request_id=request_id, size=size, app_type=app_type, ab_code=ab_code, params=params # # ) # # # log_.info({ # # 'logTimestamp': int(time.time() * 1000), # # 'request_id': request_id, # # 'mid': mid, # # 'uid': uid, # # 'operation': 'bottom', # # 'bottom_result': rank_result, # # 'executeTime': (time.time() - start_bottom) * 1000 # # }) # result['bottomResult'] = rank_result # result['bottomTime'] = (time.time() - start_bottom) * 1000 # # result['rankResult'] = rank_result return result # return rank_result, last_rov_recall_key def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info, expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='', no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None, shield_config=None, env_dict=None, level_weight=None, flow_pool_abtest_group=None): """ 首页线上推荐逻辑 :param request_id: request_id :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param top_K: 保证topK为召回池视频 type-int :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :param expire_time: 末位视频记录redis过期时间 :param ab_code: AB实验code :param video_id: 相关推荐头部视频id :param params: :return: """ result = {} # ####### 多进程召回 start_recall = time.time() # log_.info('====== recall') #print("abcode",ab_code) recall_result_list = [] pool_recall = PoolRecall(request_id=request_id, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id=video_id, level_weight=level_weight) exp_config = pool_recall.get_sort_ab_codel_config() # 60054 全量: simrecall+融合排序 if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)] # if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073: # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) # if ab_code == 60056 or ab_code == 60071: # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) # t.append(gevent.spawn(pool_recall.get_U2I_reall, mid)) # if ab_code ==60067 or ab_code == 60069: # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) # t.append(gevent.spawn(pool_recall.get_return_video_reall)) # if ab_code == 60068 or ab_code == 60070: # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) # t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:')) elif flow_pool_abtest_group == 'experimental_flow_set_level': t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group), gevent.spawn(pool_recall.flow_pool_recall_new_with_level, size, flow_pool_abtest_group=flow_pool_abtest_group)] else: t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group), gevent.spawn(pool_recall.flow_pool_recall, size, flow_pool_abtest_group=flow_pool_abtest_group)] if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) elif ab_code == 60056 or ab_code == 60071: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) t.append(gevent.spawn(pool_recall.get_U2I_reall, mid)) elif ab_code == 60067 or ab_code == 60069: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) t.append(gevent.spawn(pool_recall.get_return_video_reall)) elif ab_code == 60068 or ab_code == 60070: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter)) t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:')) gevent.joinall(t) recall_result_list = [i.get() for i in t] #print("recall:",recall_result_list) if len(recall_result_list)<0: result['recallResult']= [] result['rankResult'] = [] return result #1. merge simrecall or deepfm if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074: rov_pool_recall = [] if len(recall_result_list) >= 2: region_recall = recall_result_list[0] sim_recall = [] if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: sim_recall = recall_result_list[1] else: if len(recall_result_list)>=4: sim_recall = recall_result_list[3] now_video_ids = set('') if len(region_recall) > 0: for video in region_recall: video_id = video.get('videoId') if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(sim_recall) > 0: for video in sim_recall: video_id = video.get('videoId') # print("sim video_id:", video_id) if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(rov_pool_recall) > 0: recall_result_list[0] = rov_pool_recall # merge simrecall, merge u2i title recall, deepfm if ab_code == 60056 or ab_code == 60071: rov_pool_recall = [] if len(recall_result_list)>=2: region_recall = recall_result_list[0] sim_recall = [] u2i_title_recall = [] if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: sim_recall = recall_result_list[1] if len(recall_result_list) >= 3: u2i_title_recall = recall_result_list[2] else: if len(recall_result_list) >= 4: sim_recall = recall_result_list[3] if len(recall_result_list) >= 5: u2i_title_recall = recall_result_list[4] #print("u2i_title_recall:", u2i_title_recall) now_video_ids = set('') if len(region_recall)>0: for video in region_recall: video_id = video.get('videoId') if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(sim_recall) > 0: for video in sim_recall: video_id = video.get('videoId') if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(u2i_title_recall)>0: for video in u2i_title_recall: video_id = video.get('videoId') #print("sim video_id:", video_id) if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(rov_pool_recall)>0: recall_result_list[0] = rov_pool_recall #2. merge simrecall, return video if ab_code == 60067 or ab_code==60068 or ab_code==60069 or ab_code==60070: rov_pool_recall = [] if len(recall_result_list)>=2: region_recall = recall_result_list[0] return_video_reall = [] sim_recall = [] if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: sim_recall = recall_result_list[1] if len(recall_result_list) >= 3: return_video_reall = recall_result_list[2] else: if len(recall_result_list)>=4: sim_recall = recall_result_list[3] if len(recall_result_list)>=5: return_video_reall = recall_result_list[4] #print("sim:",sim_recall) now_video_ids = set('') if len(region_recall)>0: for video in region_recall: video_id = video.get('videoId') if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(sim_recall) > 0: for video in sim_recall: video_id = video.get('videoId') if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(return_video_reall)>0: for video in return_video_reall: video_id = video.get('videoId') if video_id not in now_video_ids: rov_pool_recall.append(video) now_video_ids.add(video_id) if len(rov_pool_recall)>0: recall_result_list[0] = rov_pool_recall result['recallResult'] = recall_result_list result['recallTime'] = (time.time() - start_recall) * 1000 #print("recall:", recall_result_list) # ####### 排序 start_rank = time.time() # log_.info('====== rank') if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: if ab_code in [ config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'], config_.AB_CODE['top_video_relevant_appType_19'] ]: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } else: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': [] } else: if recall_result_list[1]: redis_helper = RedisHelper() quick_flow_pool_P = redis_helper.get_data_from_redis( key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}" ) if quick_flow_pool_P: flow_pool_P = quick_flow_pool_P data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } else: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[2] } # 3. 特征回流 # # for recall_item in data['rov_pool_recall']: # if len(recall_item) <= 0: # continue # vid = recall_item.get("videoId", 0) # rec_recall_list.append(vid) # rec_recall_item_list.append(recall_item) # redisObj = RedisHelper() # video_static_info = redisObj.get_batch_key(vidKeys) # video_hour_static_info = redisObj.get_batch_key(hour_vidKeys) # vid_day_fea_list = [] # vid_hour_fea_list = [] # if video_static_info: # vid_day_fea_list = video_static_info # if video_hour_static_info: # vid_hour_fea_list = video_hour_static_info if env_dict: province_code = client_info.get('provinceCode', -1) if province_code and province_code == "": province_code =-1 city_code = client_info.get('cityCode', -1) if city_code and city_code == "": city_code = -1 env_dict['mid'] = mid env_dict['province_code'] = province_code env_dict['city_code'] = city_code env_json = env_dict #4. rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict) #print(rank_result) if rank_result: result['rank_num'] = len(rank_result) day_vidKeys = [] hour_vidKeys = [] rec_recall_list = [] pre_str = "v_ctr:" pre_hour_str = "v_hour_ctr:" if env_dict and len(rank_result)>0: for rec_item in rank_result: vid = rec_item.get("videoId", 0) rec_recall_list.append(vid) day_vidKeys.append(pre_str+str(vid)) hour_vidKeys.append(pre_hour_str+str(vid)) redisObj = RedisHelper() video_static_info = redisObj.get_batch_key(day_vidKeys) video_hour_static_info = redisObj.get_batch_key(hour_vidKeys) vid_day_fea_list = [] vid_hour_fea_list = [] if video_static_info: vid_day_fea_list = video_static_info if video_hour_static_info: vid_hour_fea_list = video_hour_static_info env_dict['recall_list'] = rec_recall_list env_dict['vid_day_fea_list'] = vid_day_fea_list env_dict['vid_hour_fea_list'] = vid_hour_fea_list env_json = env_dict result['rankResult'] = rank_result result['flow_num'] = flow_num result['rankTime'] = (time.time() - start_rank) * 1000 return result, env_json # return rank_result, last_rov_recall_key def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info, expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='', no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None, shield_config=None): """ 首页线上推荐逻辑 :param request_id: request_id :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param top_K: 保证topK为召回池视频 type-int :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :param expire_time: 末位视频记录redis过期时间 :param ab_code: AB实验code :param video_id: 相关推荐头部视频id :param params: :return: """ #1. recall result = {} result['rankResult'] = [] # ####### 多进程召回 start_recall = time.time() # 1. 根据城市或者省份获取region_code city_code_list = [code for _, code in config_.CITY_CODE.items()] # 获取provinceCode province_code = client_info.get('provinceCode', '-1') # 获取cityCode city_code = client_info.get('cityCode', '-1') if city_code in city_code_list: # 分城市数据存在时,获取城市分组数据 region_code = city_code else: region_code = province_code if region_code == '': region_code = '-1' #print("region_code:", region_code) #size =1000 pool_recall = PoolRecall(request_id=request_id, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id) if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: t = [gevent.spawn(pool_recall.get_region_hour_recall, size, region_code), gevent.spawn(pool_recall.get_region_day_recall, size, region_code), gevent.spawn(pool_recall.get_selected_recall, size, region_code), gevent.spawn(pool_recall.get_no_selected_recall, size, region_code) ] if ab_code == 60049: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall)) else: t = [ gevent.spawn(pool_recall.get_region_hour_recall, size, region_code), gevent.spawn(pool_recall.get_region_day_recall, size, region_code), gevent.spawn(pool_recall.get_selected_recall, size, region_code), gevent.spawn(pool_recall.get_no_selected_recall, size, region_code), gevent.spawn(pool_recall.new_flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID), gevent.spawn(pool_recall.new_flow_pool_recall, size)] if ab_code ==60049: t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall)) gevent.joinall(t) # all recall_result all_recall_result_list = [i.get() for i in t] all_recall_result = [] #print(all_recall_result_list) result['recallTime'] = (time.time() - start_recall) * 1000 #print("recall time:", result['recallTime']) if not all_recall_result_list or len(all_recall_result_list)==0: return result for recall_item in all_recall_result_list: if not recall_item or len(recall_item)==0: continue for per_item in recall_item: all_recall_result.append(per_item) #print("all_recall_result:", all_recall_result) #2. duplicate dup_time = time.time() recall_dict = {} fast_flow_set = set('') flow_flow_set = set('') region_h_recall = [] region_day_recall = [] select_day_recall = [] no_selected_recall = [] sim_hot_recall = [] flow_recall = [] flowFlag_dict = {} for per_item in all_recall_result: #print(per_item) try: vId = int(per_item.get("videoId",0)) if vId==0: continue recall_name = per_item.get("pushFrom",'') flow_pool = per_item.get("flowPool", '') if flow_pool != '': flow_pool_id = int(flow_pool.split('#')[0]) if flow_pool_id == config_.QUICK_FLOW_POOL_ID: fast_flow_set.add(vId) else: flow_flow_set.add(vId) flowFlag_dict[vId] = flow_pool #duplicate divide into if vId not in recall_dict: if recall_name == config_.PUSH_FROM['rov_recall_region_h']: region_h_recall.append(per_item) elif recall_name == config_.PUSH_FROM['rov_recall_region_24h']: region_day_recall.append(per_item) elif recall_name == config_.PUSH_FROM['rov_recall_24h']: select_day_recall.append(per_item) elif recall_name == config_.PUSH_FROM['rov_recall_24h_dup']: no_selected_recall.append(per_item) elif recall_name == config_.PUSH_FROM['sim_hot_vid_recall']: sim_hot_recall.append(per_item) elif recall_name == config_.PUSH_FROM['flow_recall']: flow_recall.append(per_item) if vId not in recall_dict: recall_dict[vId] = recall_name else: recall_name = recall_dict[vId] + "," + recall_name recall_dict[vId] = recall_name except: continue #print("recall_dict:", recall_dict) #print("recall time:", (time.time()-dup_time)*1000) #3. filter video, 先过预曝光 filter_time = time.time() filter_ = FilterVideos(request_id=request_id, app_type=app_type, mid=mid, uid=uid, video_ids=list(recall_dict.keys())) #print("filer:", list(recall_dict.keys())) #a).expose filter #all_recall_list = list(recall_dict.keys()) all_recall_list = filter_.filter_videos_new(region_code=region_code, shield_config=shield_config, flow_set=flowFlag_dict.keys()) #print("filer after:", all_recall_list) #print("filter_ time:", (time.time() - filter_time) * 1000) #4. sort: old sort: flow 按概率出 start_rank = time.time() #quick_flow_pool_P get from redis redis_helper = RedisHelper() quick_flow_pool_P = redis_helper.get_data_from_redis( key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}" ) if quick_flow_pool_P: flow_pool_P = quick_flow_pool_P rank_result= [] if ab_code==60048 or ab_code==60049: rank_ids, add_flow_set = video_new_rank(videoIds=all_recall_list,fast_flow_set=fast_flow_set, flow_set=flow_flow_set,size=size, top_K=top_K, flow_pool_P=float(flow_pool_P)) #print("rank_ids:", rank_ids) for rank_item in rank_ids: rank_id = rank_item[0] rank_score = rank_item[1] pushFrom = recall_dict.get(rank_id, '') #print(pushFrom, rank_id) flowPoolFlag = '' if rank_id in add_flow_set: flowPoolFlag = flowFlag_dict.get(rank_id,'') rank_result.append({'videoId': rank_id, 'flowPool': flowPoolFlag, 'rovScore': rank_score, 'pushFrom': pushFrom, 'abCode': ab_code}) # #print("rank_result:", rank_result) else: all_dup_recall_result = region_h_recall+region_day_recall+select_day_recall+no_selected_recall+flow_recall rank_result = refactor_video_rank(rov_recall_rank=all_dup_recall_result,fast_flow_set=fast_flow_set, flow_set=flow_flow_set, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P)) result['rankResult'] = rank_result result['rankTime'] = (time.time() - start_rank) * 1000 #print("rank time:", result['rankTime']) return result # return rank_result, last_rov_recall_key def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs): """ 对排序后的结果 按照AB实验进行对应的分组操作 :param rank_result: 排序后的结果 :param ab_code_list: 此次请求参与的 ab实验组 :param app_type: 产品标识 :param mid: mid :param uid: uid :param kwargs: 其他参数 :return: """ # ####### 视频宽高比AB实验 # 对内容精选进行 视频宽高比分发实验 # if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []): # rank_result = video_rank_by_w_h_rate(videos=rank_result) # log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format( # app_type, mid, uid, rank_result)) # 按position位置排序 if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []): rank_result = video_position_recommend(mid, uid, app_type, rank_result) print('===========================') print(rank_result) log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format( app_type, mid, uid, rank_result)) # 相关推荐强插 # if config_.AB_CODE['relevant_video_op'] in ab_code_list \ # and app_type in config_.AB_TEST.get('relevant_video_op', []): # head_vid = kwargs['head_vid'] # size = kwargs['size'] # rank_result = relevant_video_top_recommend( # app_type=app_type, mid=mid, uid=uid, head_vid=head_vid, videos=rank_result, size=size # ) # log_.info('app_type: {}, mid: {}, uid: {}, head_vid: {}, rank_by_relevant_video_op_result: {}'.format( # app_type, mid, uid, head_vid, rank_result)) return rank_result def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None, flow_pool_abtest_group=None): """ 根据最终的排序结果更新相关redis数据 :param result: 排序结果 :param app_type: 产品标识 :param mid: mid :param top_K: 保证topK为召回池视频 type-int :param expire_time: 末位视频记录redis过期时间 :return: None """ # ####### redis数据刷新 try: redis_helper = RedisHelper() # log_.info('====== update redis') if mid and mid != 'null': # mid为空时,不做预曝光和定位数据更新 # 预曝光数据同步刷新到Redis, 过期时间为0.5h preview_key_name = f"{config_.PREVIEW_KEY_PREFIX}{app_type}:{mid}" preview_video_ids = [int(item['videoId']) for item in result] if preview_video_ids: # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids))) redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60) # log_.info('preview redis update success!') # # 将此次获取的ROV召回池top_K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天 # rov_recall_video = [item['videoId'] for item in result[:top_K] # if item['pushFrom'] == config_.PUSH_FROM['rov_recall']] # if len(rov_recall_video) > 0: # if app_type == config_.APP_TYPE['APP']: # key_name = config_.UPDATE_ROV_KEY_NAME_APP # else: # key_name = config_.UPDATE_ROV_KEY_NAME # if not redis_helper.get_score_with_value(key_name=key_name, value=rov_recall_video[-1]): # redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1], # expire_time=expire_time) # log_.info('last video redis update success!') # 将此次获取的 地域分组小时级数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置 rov_recall_h_video = [item['videoId'] for item in result[:top_K] if item['pushFrom'] == config_.PUSH_FROM['rov_recall_region_h']] if len(rov_recall_h_video) > 0: last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_H_PREFIX}{app_type}:{mid}' redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_h_video[-1], expire_time=expire_time) # 将此次获取的 地域分组相对24h数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置 rov_recall_24h_dup1_video = [item['videoId'] for item in result[:top_K] if item['pushFrom'] == config_.PUSH_FROM['rov_recall_region_24h']] if len(rov_recall_24h_dup1_video) > 0: last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP1_24H_PREFIX}{app_type}:{mid}' redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup1_video[-1], expire_time=expire_time) # 将此次获取的 相对24h筛选数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置 rov_recall_24h_dup2_video = [item['videoId'] for item in result[:top_K] if item['pushFrom'] == config_.PUSH_FROM['rov_recall_24h']] if len(rov_recall_24h_dup2_video) > 0: last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP2_24H_PREFIX}{app_type}:{mid}' redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup2_video[-1], expire_time=expire_time) # 将此次获取的 相对24h筛选后剩余数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置 rov_recall_24h_dup3_video = [item['videoId'] for item in result[:top_K] if item['pushFrom'] == config_.PUSH_FROM['rov_recall_24h_dup']] if len(rov_recall_24h_dup3_video) > 0: last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX}{app_type}:{mid}' redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup3_video[-1], expire_time=expire_time) # # 将此次获取的 相对48h筛选数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置 # rov_recall_48h_dup2_video = [item['videoId'] for item in result[:top_K] # if item['pushFrom'] == config_.PUSH_FROM['rov_recall_48h']] # if len(rov_recall_48h_dup2_video) > 0: # last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP2_48H_PREFIX}{app_type}:{mid}' # redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_48h_dup2_video[-1], # expire_time=expire_time) # # # 将此次获取的 相对48h筛选后剩余数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置 # rov_recall_48h_dup3_video = [item['videoId'] for item in result[:top_K] # if item['pushFrom'] == config_.PUSH_FROM['rov_recall_48h_dup']] # if len(rov_recall_48h_dup3_video) > 0: # last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_48H_PREFIX}{app_type}:{mid}' # redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_48h_dup3_video[-1], # expire_time=expire_time) # 将此次分发的流量池视频,对 本地分发数-1 进行记录 if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # 获取本地分发数-1策略开关 switch = redis_helper.get_data_from_redis(key_name=config_.IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME) if switch is not None: if int(switch) == 1: flow_recall_video = [item for item in result if item.get('flowPool', None) is not None] else: flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']] else: flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']] if flow_recall_video: if flow_pool_abtest_group == 'experimental_flow_set_level': update_local_distribute_count_new_with_level(flow_recall_video, level_weight) else: update_local_distribute_count(flow_recall_video) # update_local_distribute_count_new(flow_recall_video) # log_.info('update local distribute count success!') # 限流视频分发数记录 if app_type == config_.APP_TYPE['APP']: # APP 不计入 return limit_video_id_list = redis_helper.get_data_from_set( key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_SET}{datetime.today().strftime('%Y%m%d')}" ) if limit_video_id_list is not None: limit_video_id_list = [int(item) for item in limit_video_id_list] for item in result: video_id = item['videoId'] if video_id in limit_video_id_list: key_name = f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}" redis_helper.setnx_key(key_name=key_name, value=0, expire_time=24*2600) redis_helper.incr_key(key_name=key_name, amount=1, expire_time=24*3600) except Exception as e: log_.error("update redis data fail!") log_.error(traceback.format_exc()) def update_flow_redis_data(result, app_type, mid, top_K, expire_time=24*3600): """ 根据最终的排序结果更新相关redis数据 :param result: 排序结果 :param app_type: 产品标识 :param mid: mid :param top_K: 保证topK为召回池视频 type-int :param expire_time: 末位视频记录redis过期时间 :return: None """ # ####### redis数据刷新 try: redis_helper = RedisHelper() # log_.info('====== update redis') if mid and mid != 'null': # mid为空时,不做预曝光和定位数据更新 # 预曝光数据同步刷新到Redis, 过期时间为0.5h preview_key_name = f"{config_.PREVIEW_KEY_PREFIX}{app_type}:{mid}" preview_video_ids = [int(item['videoId']) for item in result] if preview_video_ids: # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids))) redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60) # log_.info('preview redis update success!') # 将此次分发的流量池视频,对 本地分发数-1 进行记录 if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # 获取本地分发数-1策略开关 switch = redis_helper.get_data_from_redis(key_name=config_.IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME) if switch is not None: if int(switch) == 1: flow_recall_video = [item for item in result if item.get('flowPool', None) is not None] else: flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']] else: flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']] if flow_recall_video: update_local_distribute_count(flow_recall_video) # log_.info('update local distribute count success!') # 限流视频分发数记录 if app_type == config_.APP_TYPE['APP']: # APP 不计入 return limit_video_id_list = redis_helper.get_data_from_set( key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_SET}{datetime.today().strftime('%Y%m%d')}" ) if limit_video_id_list is not None: limit_video_id_list = [int(item) for item in limit_video_id_list] for item in result: video_id = item['videoId'] if video_id in limit_video_id_list: key_name = f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}" redis_helper.setnx_key(key_name=key_name, value=0, expire_time=24*2600) redis_helper.incr_key(key_name=key_name, amount=1, expire_time=24*3600) except Exception as e: log_.error("update redis data fail!") log_.error(traceback.format_exc()) def update_local_distribute_count(videos): """ 更新本地分发数 :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '', 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....] :return: """ try: redis_helper = RedisHelper() for item in videos: video_id, flow_pool = item['videoId'], item['flowPool'] key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}" # 本地记录的分发数 - 1 redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60) # 对该视频做分发数检查 cur_count = redis_helper.get_data_from_redis(key_name=key_name) # 无记录 if cur_count is None: continue # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key if int(cur_count) <= 0: add_remove_log = False redis_helper.del_keys(key_name=key_name) for app_name in config_.APP_TYPE: app_type = config_.APP_TYPE.get(app_name) flow_pool_key_list = [ f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}", f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}" ] for key in flow_pool_key_list: remove_res = redis_helper.remove_value_from_zset(key_name=key, value=f"{video_id}-{flow_pool}") if remove_res > 0: add_remove_log = True video_flow_pool_key_list = [ f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}:{video_id}", f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{video_id}" ] for key in video_flow_pool_key_list: redis_helper.remove_value_from_set(key_name=key, values=(flow_pool, )) if add_remove_log is True: log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool}) # if redis_helper.key_exists(key_name=key_name): # # 该视频本地有记录,本地记录的分发数 - 1 # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # else: # # 该视频本地无记录,接口获取的分发数 - 1 # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60) except Exception as e: log_.error('update_local_distribute_count error...') log_.error(traceback.format_exc()) def update_local_distribute_count_new(videos): """ 更新本地分发数 :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '', 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....] :return: """ try: redis_helper = RedisHelper() for item in videos: video_id, flow_pool = item['videoId'], item['flowPool'] key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}" # 本地记录的分发数 - 1 redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60) # 对该视频做分发数检查 cur_count = redis_helper.get_data_from_redis(key_name=key_name) # 无记录 if cur_count is None: continue # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key if int(cur_count) <= 0: add_remove_log = False redis_helper.del_keys(key_name=key_name) for app_name in config_.APP_TYPE: app_type = config_.APP_TYPE.get(app_name) flow_pool_key_list = [ f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}", f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{config_.QUICK_FLOW_POOL_ID}" ] for key in flow_pool_key_list: remove_res = redis_helper.remove_value_from_set(key_name=key, values=(f"{video_id}-{flow_pool}", )) if remove_res > 0: add_remove_log = True video_flow_pool_key_list = [ f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}:{video_id}", f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{video_id}" ] for key in video_flow_pool_key_list: redis_helper.remove_value_from_set(key_name=key, values=(flow_pool, )) if add_remove_log is True: log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool}) # if redis_helper.key_exists(key_name=key_name): # # 该视频本地有记录,本地记录的分发数 - 1 # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # else: # # 该视频本地无记录,接口获取的分发数 - 1 # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60) except Exception as e: log_.error('update_local_distribute_count error...') log_.error(traceback.format_exc()) def update_local_distribute_count_new_with_level(videos, level_weight): """ 更新本地分发数 :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '', 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....] :return: """ try: redis_helper = RedisHelper() # level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME) # level_list = [level for level in json.loads(level_weight)] if level_weight is None: level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1} level_list = [level for level in level_weight] for item in videos: video_id, flow_pool = item['videoId'], item['flowPool'] key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}" # 本地记录的分发数 - 1 redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60) # 对该视频做分发数检查 cur_count = redis_helper.get_data_from_redis(key_name=key_name) # 无记录 if cur_count is None: continue # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key if int(cur_count) <= 0: add_remove_log = False redis_helper.del_keys(key_name=key_name) for app_name in config_.APP_TYPE: app_type = config_.APP_TYPE.get(app_name) flow_pool_key_list = [ f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{config_.QUICK_FLOW_POOL_ID}" ] for level in level_list: flow_pool_key_list.append(f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{app_type}:{level}") for key in flow_pool_key_list: remove_res = redis_helper.remove_value_from_set(key_name=key, values=(f"{video_id}-{flow_pool}", )) if remove_res > 0: add_remove_log = True if add_remove_log is True: log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool}) # if redis_helper.key_exists(key_name=key_name): # # 该视频本地有记录,本地记录的分发数 - 1 # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # else: # # 该视频本地无记录,接口获取的分发数 - 1 # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60) except Exception as e: log_.error('update_local_distribute_count error...') log_.error(traceback.format_exc()) def get_religion_class_with_mid(mid, religion_class_name): """ 判断用户是否属于对应的宗教类型 :param mid: mid type-string :param religion_class_name: 宗教类型, type-string, (catholicism-天主教, christianity-基督教) :return: religion_class_flag, type-int, (0-否,1-是), 默认: 0 """ religion_class_flag = 0 now_date = datetime.today() redis_helper = RedisHelper() if mid: hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest() hash_tag = hash_mid[-1:] key_name_prefix = config_.KEY_NAME_PREFIX_RELIGION_USER.get(religion_class_name, None) if key_name_prefix is None: return religion_class_flag key_name = f"{key_name_prefix}{hash_tag}:{datetime.strftime(now_date, '%Y%m%d')}" if not redis_helper.key_exists(key_name=key_name): key_name = f"{key_name_prefix}{hash_tag}:{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}" if redis_helper.data_exists_with_set(key_name=key_name, value=mid): religion_class_flag = 1 return religion_class_flag def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_type, page_type=0, flow_pool_abtest_config=None): """ 根据实验分组给定对应的推荐参数 :param recommend_type: 首页推荐和相关推荐区分参数(0-首页推荐,1-相关推荐) :param ab_exp_info: AB实验组参数 :param ab_info_data: app实验组参数 :param mid: mid :param app_type: app_type, type-int :param page_type: 页面区分参数,默认:0(首页) :param flow_pool_abtest_config: 流量池abtest配置 :return: """ top_K = config_.K flow_pool_P = config_.P # 不获取人工干预数据标记 no_op_flag = True expire_time = 3600 old_video_index = -1 # 获取对应的默认配置 ab_initial_config = config_.INITIAL_CONFIG.get(app_type, None) if ab_initial_config is None: ab_initial_config = config_.INITIAL_CONFIG.get('other') param = config_.AB_EXP_CODE[ab_initial_config] ab_code = param.get('ab_code') rule_key = param.get('rule_key') data_key = param.get('data_key') rule_key_30day = param.get('30day_rule_key') shield_config = config_.SHIELD_CONFIG # 默认使用 095 实验的配置 # ab_code = config_.AB_EXP_CODE['095'].get('ab_code') # rule_key = config_.AB_EXP_CODE['095'].get('rule_key') # data_key = config_.AB_EXP_CODE['095'].get('data_key') # rule_key_30day = None # 获取用户近30天是否有回流 # user_30day_return_result = get_user_has30day_return(mid=mid) # 获取实验配置 if ab_exp_info: ab_exp_code_list = [] config_value_dict = {} for _, item in ab_exp_info.items(): if not item: continue for ab_item in item: ab_exp_code = ab_item.get('abExpCode', None) if not ab_exp_code: continue ab_exp_code_list.append(str(ab_exp_code)) config_value_dict[str(ab_exp_code)] = ab_item.get('configValue', None) # 流量池视频分发概率实验 if '211' in ab_exp_code_list: flow_pool_P = 0.9 elif '221' in ab_exp_code_list: flow_pool_P = 0.7 elif '299' in ab_exp_code_list: flow_pool_P = 0.5 elif '300' in ab_exp_code_list: flow_pool_P = 0.4 elif '301' in ab_exp_code_list: flow_pool_P = 0.6 elif '339' in ab_exp_code_list: flow_pool_P = 0 # if '136' in ab_exp_code_list: # # 无回流 - 消费人群 # if user_30day_return_result == 0: # param = config_.AB_EXP_CODE.get('136') # ab_code = param.get('ab_code') # expire_time = 3600 # rule_key = param.get('rule_key') # data_key = param.get('data_key') # no_op_flag = True # elif '137' in ab_exp_code_list: # # 有回流 - 分享人群 # if user_30day_return_result == 1: # param = config_.AB_EXP_CODE.get('137') # ab_code = param.get('ab_code') # expire_time = 3600 # rule_key = param.get('rule_key') # data_key = param.get('data_key') # no_op_flag = True # elif '161' in ab_exp_code_list: # # 无回流 - 消费人群 # if user_30day_return_result == 0: # param = config_.AB_EXP_CODE.get('136') # ab_code = param.get('ab_code') # expire_time = 3600 # rule_key = param.get('rule_key') # data_key = param.get('data_key') # no_op_flag = True # # 有回流 - 分享人群 # else: # param = config_.AB_EXP_CODE.get('137') # ab_code = param.get('ab_code') # expire_time = 3600 # rule_key = param.get('rule_key') # data_key = param.get('data_key') # no_op_flag = True # elif '162' in ab_exp_code_list: # # 有回流 # if user_30day_return_result == 1: # param = config_.AB_EXP_CODE.get('162') # ab_code = param.get('ab_code') # expire_time = 3600 # rule_key = param.get('rule_key') # data_key = param.get('data_key') # no_op_flag = True # 老好看视频 宗教人群实验 if '228' in ab_exp_code_list: # 天主教 religion_param = config_.AB_EXP_CODE['228'] religion_class_name = religion_param.get('religion_class_name') religion_class_flag = get_religion_class_with_mid(mid=mid, religion_class_name=religion_class_name) if religion_class_flag == 1: ab_code = religion_param.get('ab_code') rule_key = religion_param.get('rule_key') data_key = religion_param.get('data_key') rule_key_30day = religion_param.get('30day_rule_key') elif '229' in ab_exp_code_list: # 基督教 religion_param = config_.AB_EXP_CODE['229'] religion_class_name = religion_param.get('religion_class_name') religion_class_flag = get_religion_class_with_mid(mid=mid, religion_class_name=religion_class_name) if religion_class_flag == 1: ab_code = religion_param.get('ab_code') rule_key = religion_param.get('rule_key') data_key = religion_param.get('data_key') rule_key_30day = religion_param.get('30day_rule_key') else: for code, param in config_.AB_EXP_CODE.items(): if code in ab_exp_code_list: ab_code = param.get('ab_code') rule_key = param.get('rule_key') data_key = param.get('data_key') rule_key_30day = param.get('30day_rule_key') shield_config = param.get('shield_config', config_.SHIELD_CONFIG) break """ # 推荐条数 10->4 实验 # if config_.AB_EXP_CODE['rec_size_home'] in ab_exp_code_list: # config_value = config_value_dict.get(config_.AB_EXP_CODE['rec_size_home'], None) # if config_value: # config_value = eval(str(config_value)) # else: # config_value = {} # log_.info(f'config_value: {config_value}, type: {type(config_value)}') # size = int(config_value.get('size', 4)) # top_K = int(config_value.get('K', 3)) # flow_pool_P = float(config_value.get('P', 0.3)) # else: # size = size # top_K = config_.K # flow_pool_P = config_.P # 算法实验相对对照组 # if config_.AB_EXP_CODE['ab_initial'] in ab_exp_code_list: # ab_code = config_.AB_CODE['ab_initial'] # expire_time = 24 * 3600 # rule_key = config_.RULE_KEY['initial'] # no_op_flag = True # 小时级更新-规则1 实验 # elif config_.AB_EXP_CODE['rule_rank1'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank1') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank1'] # no_op_flag = True # elif config_.AB_EXP_CODE['rule_rank2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank2') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank2'] # elif config_.AB_EXP_CODE['rule_rank3'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank3') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank3'] # no_op_flag = True # elif config_.AB_EXP_CODE['rule_rank4'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank4') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank4'] # elif config_.AB_EXP_CODE['rule_rank5'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank5') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank5'] # elif config_.AB_EXP_CODE['day_rule_rank1'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank1') # expire_time = 24 * 3600 # rule_key = config_.RULE_KEY_DAY['day_rule_rank1'] # no_op_flag = True # if config_.AB_EXP_CODE['rule_rank6'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank6') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank6'] # no_op_flag = True # elif config_.AB_EXP_CODE['day_rule_rank2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank2') # expire_time = 24 * 3600 # rule_key = config_.RULE_KEY_DAY['day_rule_rank2'] # no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank1'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank1') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank1'] # no_op_flag = True # elif config_.AB_EXP_CODE['24h_rule_rank1'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_24h'].get('24h_rule_rank1') # expire_time = 3600 # rule_key = config_.RULE_KEY_24H['24h_rule_rank1'] # no_op_flag = True # elif config_.AB_EXP_CODE['24h_rule_rank2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_24h'].get('24h_rule_rank2') # expire_time = 3600 # rule_key = config_.RULE_KEY_24H['24h_rule_rank2'] # no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank2'] # no_op_flag = True # if config_.AB_EXP_CODE['region_rule_rank3'] in ab_exp_code_list or\ # config_.AB_EXP_CODE['region_rule_rank3_appType_19'] in ab_exp_code_list or\ # config_.AB_EXP_CODE['region_rule_rank3_appType_4'] in ab_exp_code_list or\ # config_.AB_EXP_CODE['region_rule_rank3_appType_6'] in ab_exp_code_list or\ # config_.AB_EXP_CODE['region_rule_rank3_appType_18'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank3') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank3'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank3'].get('data_key') # no_op_flag = True # if config_.AB_EXP_CODE['region_rule_rank4'] in ab_exp_code_list or\ if config_.AB_EXP_CODE['region_rule_rank4_appType_19'] in ab_exp_code_list or \ config_.AB_EXP_CODE['region_rule_rank4_appType_4'] in ab_exp_code_list or\ config_.AB_EXP_CODE['region_rule_rank4_appType_6'] in ab_exp_code_list or\ config_.AB_EXP_CODE['region_rule_rank4_appType_18'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4'].get('data_key') no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank4_appType_5_data1'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data1'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data1'].get('data_key') # no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank3_appType_5_data2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank3_appType_5_data2') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank3_appType_5_data2'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank3_appType_5_data2'].get('data_key') # no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_5_data3'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_5_data3') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data3'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data3'].get('data_key') no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_5_data4'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_5_data4') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data4'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data4'].get('data_key') no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_0_data2'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_0_data2') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_0_data2'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_0_data2'].get('data_key') no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank4_appType_19_data2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_19_data2') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data2'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data2'].get('data_key') # no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank4_appType_19_data3'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_19_data3') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data3'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data3'].get('data_key') # no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank5_appType_0_data1'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank5_appType_0_data1') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank5_appType_0_data1'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank5_appType_0_data1'].get('data_key') no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_4_data2'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_4_data2') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data2'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data2'].get('data_key') no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_4_data3'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_4_data3') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data3'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data3'].get('data_key') no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_6_data2'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_6_data2') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data2'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data2'].get('data_key') no_op_flag = True elif config_.AB_EXP_CODE['region_rule_rank4_appType_6_data3'] in ab_exp_code_list: ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_6_data3') expire_time = 3600 rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data3'].get('rule_key') data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data3'].get('data_key') no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank4_appType_18_data2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_18_data2') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_18_data2'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_18_data2'].get('data_key') # no_op_flag = True # elif config_.AB_EXP_CODE['region_rule_rank6_appType_0_data1'] in ab_exp_code_list: # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank6_appType_0_data1') # expire_time = 3600 # rule_key = config_.RULE_KEY_REGION['region_rule_rank6_appType_0_data1'].get('rule_key') # data_key = config_.RULE_KEY_REGION['region_rule_rank6_appType_0_data1'].get('data_key') # no_op_flag = True else: ab_code = config_.AB_CODE['initial'] expire_time = 24 * 3600 rule_key = config_.RULE_KEY_REGION['initial'].get('rule_key') data_key = config_.RULE_KEY_REGION['initial'].get('data_key') # # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验 # if config_.AB_EXP_CODE['rov_rank_appType_18_19'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rov_rank_appType_18_19'] # expire_time = 3600 # flow_pool_P = config_.P_18_19 # no_op_flag = True # # elif config_.AB_EXP_CODE['rov_rank_appType_19'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rov_rank_appType_19'] # expire_time = 3600 # top_K = 0 # flow_pool_P = config_.P_18_19 # no_op_flag = True # # elif config_.AB_EXP_CODE['top_video_relevant_appType_19'] in ab_exp_code_list and page_type == 2: # ab_code = config_.AB_CODE['top_video_relevant_appType_19'] # expire_time = 3600 # top_K = 1 # flow_pool_P = config_.P_18_19 # no_op_flag = True # # # 票圈最惊奇完整影视资源实验 # elif config_.AB_EXP_CODE['whole_movies'] in ab_exp_code_list: # ab_code = config_.AB_CODE['whole_movies'] # expire_time = 24 * 3600 # no_op_flag = True # 老视频实验 # if config_.AB_EXP_CODE['old_video'] in ab_exp_code_list: # ab_code = config_.AB_CODE['old_video'] # no_op_flag = True # old_video_index = 2 # else: # old_video_index = -1 """ # APP实验组 if ab_info_data: ab_info_app = {} for page_code, item in json.loads(ab_info_data).items(): if not item: continue ab_info_code = item.get('eventId', None) if ab_info_code: ab_info_app[page_code] = ab_info_code # print(f"======{ab_info_app}") # 首页推荐 if recommend_type == 0: app_ab_code = ab_info_app.get('10003', None) for code, param in config_.APP_AB_CODE['10003'].items(): if code == app_ab_code: ab_code = param.get('ab_code') rule_key = param.get('rule_key') data_key = param.get('data_key') break # # 相关推荐 # elif recommend_type == 1: # if config_.APP_AB_CODE['10037'] == ab_info_app.get('10037', None): # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4') # expire_time = 3600 # rule_key = 'rule3' # data_key = 'data1' # no_op_flag = True # 流量池分发实验组划分 # 1. 随机选取流量池id flow_pool_id_choice = random.choice(config_.FLOWPOOL_ID_LIST) # 2. 判断流量id所属实验配置分组 flow_pool_abtest_group = 'control_group' for key, items in flow_pool_abtest_config.items(): if int(flow_pool_id_choice) in items: flow_pool_abtest_group = key # log_.info(f"flow_pool_id_choice: {flow_pool_id_choice}, flow_pool_abtest_group: {flow_pool_abtest_group}") return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day, \ shield_config, flow_pool_abtest_group def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type, client_info, ab_exp_info, params, ab_info_data, version_audit_status, env_dict, level_weight, flow_pool_abtest_config): """ 首页线上推荐逻辑 :param request_id: request_id :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...] :param params: :param ab_info_data: app实验分组参数 :param version_audit_status: 小程序版本审核参数:1-审核中,2-审核通过 :return: """ # 对 vlog 切换10%的流量做实验 # 对mid进行哈希 # hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest() # if app_type in config_.AB_TEST['rank_by_h'] and hash_mid[-1:] in ['8', '0', 'a', 'b']: # # 简单召回 - 排序 - 兜底 # rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, # algo_type=algo_type, client_info=client_info, # expire_time=3600, # ab_code=config_.AB_CODE['rank_by_h']) # # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert']], # app_type=app_type, mid=mid, uid=uid) # # redis数据刷新 # update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key, # expire_time=3600) # if app_type == config_.APP_TYPE['APP']: # # 票圈视频APP # top_K = config_.K # flow_pool_P = config_.P # # 简单召回 - 排序 - 兜底 # rank_result, last_rov_recall_key = video_recommend(request_id=request_id, # mid=mid, uid=uid, app_type=app_type, # size=size, top_K=top_K, flow_pool_P=flow_pool_P, # algo_type=algo_type, client_info=client_info, # expire_time=12 * 3600, params=params) # # ab-test # # result = ab_test_op(rank_result=rank_result, # # ab_code_list=[config_.AB_CODE['position_insert']], # # app_type=app_type, mid=mid, uid=uid) # # redis数据刷新 # update_redis_data(result=rank_result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key, # top_K=top_K, expire_time=12 * 3600) # # else: recommend_result = {} param_st = time.time() recommend_result['fea_info'] = env_dict # 特殊mid 和 小程序审核版本推荐处理 if mid in get_special_mid_list() or version_audit_status == 1: rank_result = special_mid_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size) recommend_result['videos'] = rank_result return recommend_result # 普通mid推荐处理 top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \ no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group = \ get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, mid=mid, app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'app_type': app_type, # 'mid': mid, # 'uid': uid, # 'operation': 'get_recommend_params', # 'executeTime': (time.time() - param_st) * 1000 # }) recommend_result['getRecommendParamsTime'] = (time.time() - param_st) * 1000 # 简单召回 - 排序 - 兜底 get_result_st = time.time() #print("ab_code:", ab_code) #new pipeline # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049: # result = new_video_recommend(request_id=request_id, # mid=mid, uid=uid, app_type=app_type, # size=size, top_K=top_K, flow_pool_P=flow_pool_P, # algo_type=algo_type, client_info=client_info, # ab_code=ab_code, expire_time=expire_time, # rule_key=rule_key, data_key=data_key, # no_op_flag=no_op_flag, old_video_index=old_video_index, # params=params, rule_key_30day=rule_key_30day, shield_config=shield_config) # simrecal: 60054 +融合, 全量 # return video, return video2 # old video: 60056, test2 if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \ or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \ or ab_code == 60074: result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type='', client_info=client_info, ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, old_video_index=old_video_index, video_id=None, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, env_dict=env_dict, level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group) recommend_result['fea_info'] = fea_info else: result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type=algo_type, client_info=client_info, ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, old_video_index=old_video_index, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'app_type': app_type, # 'mid': mid, # 'uid': uid, # 'operation': 'get_recommend_result', # 'executeTime': (time.time() - get_result_st) * 1000 # }) recommend_result['recommendOperation'] = result rank_result = result.get('rankResult') for i, item in enumerate(rank_result): item['position'] = i + 1 recommend_result['videos'] = rank_result recommend_result['getRecommendResultTime'] = (time.time() - get_result_st) * 1000 # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert']], # app_type=app_type, mid=mid, uid=uid) # redis数据刷新 update_redis_st = time.time() # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049: # update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K) # else: # update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K) update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K, level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'app_type': app_type, # 'mid': mid, # 'uid': uid, # 'operation': 'update_redis_data', # 'executeTime': (time.time() - update_redis_st) * 1000 # }) recommend_result['updateRedisDataTime'] = (time.time() - update_redis_st) * 1000 return recommend_result # return rank_result def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_exp_info, client_info, page_type, params, ab_info_data, version_audit_status, env_dict, level_weight, flow_pool_abtest_config): """ 相关推荐逻辑 :param request_id: request_id :param video_id: 相关推荐的头部视频id :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...] :param client_info: 地域参数 :param page_type: 页面区分参数 1:详情页;2:分享页 :param params: :param ab_info_data: app实验分组参数 :param version_audit_status: 小程序版本审核参数:1-审核中,2-审核通过 :return: videos type-list """ recommend_result = {} param_st = time.time() recommend_result['fea_info'] = env_dict # 特殊mid 和 小程序审核版本推荐处理 if mid in get_special_mid_list() or version_audit_status == 1: rank_result = special_mid_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size) recommend_result['videos'] = rank_result return recommend_result # return rank_result # 普通mid推荐处理 top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \ no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group = \ get_recommend_params(recommend_type=1, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, page_type=page_type, mid=mid, app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'app_type': app_type, # 'mid': mid, # 'uid': uid, # 'operation': 'get_recommend_params', # 'executeTime': (time.time() - param_st) * 1000 # }) recommend_result['getRecommendParamsTime'] = (time.time() - param_st) * 1000 # 简单召回 - 排序 - 兜底 get_result_st = time.time() #print("ab_code:", ab_code) # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049: # result = new_video_recommend(request_id=request_id, # mid=mid, uid=uid, app_type=app_type, # size=size, top_K=top_K, flow_pool_P=flow_pool_P, # algo_type='', client_info=client_info, # ab_code=ab_code, expire_time=expire_time, # rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, # old_video_index=old_video_index, video_id=video_id, # params=params, rule_key_30day=rule_key_30day, shield_config=shield_config) if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \ or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \ or ab_code == 60074: result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type='', client_info=client_info, ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, old_video_index=old_video_index, video_id=video_id, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, env_dict=env_dict, level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group) recommend_result['fea_info'] = fea_info else: result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type='', client_info=client_info, ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag, old_video_index=old_video_index, video_id=video_id, params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'app_type': app_type, # 'mid': mid, # 'uid': uid, # 'operation': 'get_recommend_result', # 'executeTime': (time.time() - get_result_st) * 1000 # }) recommend_result['recommendOperation'] = result rank_result = result.get('rankResult') for i, item in enumerate(rank_result): item['position'] = i+1 recommend_result['videos'] = rank_result recommend_result['getRecommendResultTime'] = (time.time() - get_result_st) * 1000 # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert'], config_.AB_CODE['relevant_video_op']], # app_type=app_type, mid=mid, uid=uid, head_vid=video_id, size=size) # redis数据刷新 update_redis_st = time.time() # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049: # update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K) # else: # update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K) update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K, level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group) # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': request_id, # 'app_type': app_type, # 'mid': mid, # 'uid': uid, # 'operation': 'update_redis_data', # 'executeTime': (time.time() - update_redis_st) * 1000 # }) recommend_result['updateRedisDataTime'] = (time.time() - update_redis_st) * 1000 return recommend_result # return rank_result def special_mid_recommend(request_id, mid, uid, app_type, size, ab_code=config_.AB_CODE['special_mid'], push_from=config_.PUSH_FROM['special_mid'], expire_time=24*3600): redis_helper = RedisHelper() # 特殊mid推荐指定视频列表 pool_recall = PoolRecall(request_id=request_id, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code) # 获取相关redis key special_key_name, redis_date = pool_recall.get_pool_redis_key(pool_type='special') # 用户上一次在rov召回池对应的位置 last_special_recall_key = f'{config_.LAST_VIDEO_FROM_SPECIAL_POOL_PREFIX}{app_type}:{mid}:{redis_date}' value = redis_helper.get_data_from_redis(last_special_recall_key) if value: idx = redis_helper.get_index_with_data(special_key_name, value) if not idx: idx = 0 else: idx += 1 else: idx = 0 recall_result = [] # 每次获取的视频数 get_size = size * 5 # 记录获取频次 freq = 0 while len(recall_result) < size: freq += 1 if freq > config_.MAX_FREQ_FROM_ROV_POOL: break # 获取数据 data = redis_helper.get_data_zset_with_index(key_name=special_key_name, start=idx, end=idx + get_size - 1, with_scores=True) if not data: break # 获取视频id,并转换类型为int,并存储为key-value{videoId: score} # 添加视频源参数 pushFrom, abCode temp_result = [{'videoId': int(value[0]), 'rovScore': value[1], 'pushFrom': push_from, 'abCode': ab_code} for value in data] recall_result.extend(temp_result) idx += get_size # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天 if mid and recall_result: # mid为空时,不做记录 redis_helper.set_data_to_redis(key_name=last_special_recall_key, value=recall_result[:size][-1]['videoId'], expire_time=expire_time) return recall_result[:size] def get_special_mid_list(): redis_helper = RedisHelper() special_mid_list = redis_helper.get_data_from_set(key_name=config_.KEY_NAME_SPECIAL_MID) if special_mid_list: return special_mid_list else: return [] if __name__ == '__main__': videos = [ {"videoId": 10136461, "rovScore": 99.971, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10239014, "rovScore": 99.97, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 9851154, "rovScore": 99.969, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10104347, "rovScore": 99.968, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10141507, "rovScore": 99.967, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10292817, "flowPool": "2#6#2#1641780979606", "rovScore": 53.926690610816486, "pushFrom": "flow_pool", "abCode": 10000}, {"videoId": 10224932, "flowPool": "2#5#1#1641800279644", "rovScore": 53.47890460059617, "pushFrom": "flow_pool", "abCode": 10000}, {"videoId": 9943255, "rovScore": 99.966, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10282970, "flowPool": "2#5#1#1641784814103", "rovScore": 52.682815076325575, "pushFrom": "flow_pool", "abCode": 10000}, {"videoId": 10282205, "rovScore": 99.965, "pushFrom": "recall_pool", "abCode": 10000} ] res = relevant_video_top_recommend(app_type=4, mid='', uid=1111, head_vid=123, videos=videos, size=10) print(res)