import json import time import multiprocessing import traceback import hashlib from datetime import datetime import config from log import Log from config import set_config from video_recall import PoolRecall from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate from db_helper import RedisHelper import gevent from utils import FilterVideos import ast log_ = Log() config_ = set_config() def relevant_video_top_recommend(app_type, mid, uid, head_vid, videos, size): """ 相关推荐强插 运营给定置顶相关性视频 :param app_type: 产品标识 type-int :param mid: mid :param uid: uid :param head_vid: 相关推荐头部视频id type-int :param videos: 当前相关推荐结果 type-list :param size: 返回视频个数 type-int :return: rank_result """ # 获取头部视频对应的相关性视频 key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid) redis_helper = RedisHelper() relevant_videos = redis_helper.get_data_from_redis(key_name=key_name) if relevant_videos is None: # 该视频没有指定的相关性视频 return videos relevant_videos = json.loads(relevant_videos) # 按照指定顺序排序 relevant_videos_sorted = sorted(relevant_videos, key=lambda x: x['order'], reverse=False) # 过滤 relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos_sorted] filter_helper = FilterVideos(app_type=app_type, video_ids=relevant_video_ids, mid=mid, uid=uid) filtered_ids = filter_helper.filter_videos() if filtered_ids is None: return videos # 获取生效中的视频 now = int(time.time()) relevant_videos_in_effect = [ {'videoId': int(item['recommend_vid']), 'pushFrom': config_.PUSH_FROM['relevant_video_op'], 'abCode': config_.AB_CODE['relevant_video_op']} for item in relevant_videos_sorted if item['start_time'] < now < item['finish_time'] and int(item['recommend_vid']) in filtered_ids ] if len(relevant_videos_in_effect) == 0: return videos # 与现有排序结果 进行合并重排 # 获取现有排序结果中流量池视频 及其位置 relevant_ids = [item['videoId'] for item in relevant_videos_in_effect] flow_pool_videos = [] other_videos = [] for i, item in enumerate(videos): if item.get('pushFrom', None) == config_.PUSH_FROM['flow_recall'] and item.get('videoId') not in relevant_ids: flow_pool_videos.append((i, item)) elif item.get('videoId') not in relevant_ids: other_videos.append(item) else: continue # 重排,保持流量池视频位置不变 rank_result = relevant_videos_in_effect + other_videos for i, item in flow_pool_videos: rank_result.insert(i, item) return rank_result[:size] def video_position_recommend(mid, uid, app_type, videos): # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, # algo_type=algo_type, client_info=client_info) redis_helper = RedisHelper() pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME) pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME) if pos1_vids is not None: pos1_vids = ast.literal_eval(pos1_vids) if pos2_vids is not None: pos2_vids = ast.literal_eval(pos2_vids) pos1_vids = [] if pos1_vids is None else pos1_vids pos2_vids = [] if pos2_vids is None else pos2_vids pos1_vids = [int(i) for i in pos1_vids] pos2_vids = [int(i) for i in pos2_vids] filter_1 = FilterVideos(app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid) filter_2 = FilterVideos(app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid) t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)] gevent.joinall(t) filted_list = [i.get() for i in t] pos1_vids = filted_list[0] pos2_vids = filted_list[1] videos = positon_duplicate(pos1_vids, pos2_vids, videos) if pos1_vids is not None and len(pos1_vids) >0 : videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100, 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']}) if pos2_vids is not None and len(pos2_vids) >0 : videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100, 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']}) return videos[:10] def positon_duplicate(pos1_vids, pos2_vids, videos): s = set() if pos1_vids is not None and len(pos1_vids) >0: s.add(int(pos1_vids[0])) if pos2_vids is not None and len(pos2_vids) >0: s.add(int(pos2_vids[0])) l = [] for item in videos: if item['videoId'] in s: continue else: l.append(item) return l def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info, expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', no_op_flag=False): """ 首页线上推荐逻辑 :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param top_K: 保证topK为召回池视频 type-int :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :param expire_time: 末位视频记录redis过期时间 :param ab_code: AB实验code :return: """ # ####### 多进程召回 start_recall = time.time() # log_.info('====== recall') ''' cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=cores) pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code) _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() pool_list = [ # rov召回池 pool.apply_async(pool_recall.rov_pool_recall, (size,)), # 流量池 pool.apply_async(pool_recall.flow_pool_recall, (size,)) ] recall_result_list = [p.get() for p in pool_list] pool.close() pool.join() ''' recall_result_list = [] pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, rule_key=rule_key, no_op_flag=no_op_flag) _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]: t = [gevent.spawn(pool_recall.rov_pool_recall_by_h, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)] elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # if ab_code == config_.AB_CODE['rov_rank_appType_18_19']: t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall_18_19, size)] # else: # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time)] elif ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]: t = [gevent.spawn(pool_recall.rov_pool_recall_by_day, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)] else: t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)] gevent.joinall(t) recall_result_list = [i.get() for i in t] end_recall = time.time() log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format( mid, uid, recall_result_list, (end_recall - start_recall) * 1000)) # ####### 排序 start_rank = time.time() # log_.info('====== rank') if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: if ab_code in [config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19']]: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } else: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': [] } else: data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=flow_pool_P) end_rank = time.time() log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format( mid, uid, rank_result, (end_rank - start_rank) * 1000)) if not rank_result: # 兜底策略 # log_.info('====== bottom strategy') start_bottom = time.time() rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code) end_bottom = time.time() log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format( mid, uid, rank_result, (end_bottom - start_bottom) * 1000)) return rank_result, last_rov_recall_key def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs): """ 对排序后的结果 按照AB实验进行对应的分组操作 :param rank_result: 排序后的结果 :param ab_code_list: 此次请求参与的 ab实验组 :param app_type: 产品标识 :param mid: mid :param uid: uid :param kwargs: 其他参数 :return: """ # ####### 视频宽高比AB实验 # 对内容精选进行 视频宽高比分发实验 # if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []): # rank_result = video_rank_by_w_h_rate(videos=rank_result) # log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format( # app_type, mid, uid, rank_result)) # 按position位置排序 if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []): rank_result = video_position_recommend(mid, uid, app_type, rank_result) print('===========================') print(rank_result) log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format( app_type, mid, uid, rank_result)) # 相关推荐强插 # if config_.AB_CODE['relevant_video_op'] in ab_code_list \ # and app_type in config_.AB_TEST.get('relevant_video_op', []): # head_vid = kwargs['head_vid'] # size = kwargs['size'] # rank_result = relevant_video_top_recommend( # app_type=app_type, mid=mid, uid=uid, head_vid=head_vid, videos=rank_result, size=size # ) # log_.info('app_type: {}, mid: {}, uid: {}, head_vid: {}, rank_by_relevant_video_op_result: {}'.format( # app_type, mid, uid, head_vid, rank_result)) return rank_result def update_redis_data(result, app_type, mid, last_rov_recall_key, top_K, expire_time=24*3600): """ 根据最终的排序结果更新相关redis数据 :param result: 排序结果 :param app_type: 产品标识 :param mid: mid :param last_rov_recall_key: 用户上一次在rov召回池对应的位置 redis key :param top_K: 保证topK为召回池视频 type-int :param expire_time: 末位视频记录redis过期时间 :return: None """ # ####### redis数据刷新 try: # log_.info('====== update redis') # 预曝光数据同步刷新到Redis, 过期时间为0.5h redis_helper = RedisHelper() preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid) preview_video_ids = [int(item['videoId']) for item in result] if preview_video_ids: # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids))) redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60) log_.info('preview redis update success!') # 将此次获取的ROV召回池top_K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天 rov_recall_video = [item['videoId'] for item in result[:top_K] if item['pushFrom'] == config_.PUSH_FROM['rov_recall']] if len(rov_recall_video) > 0: if app_type == config_.APP_TYPE['APP']: key_name = config_.UPDATE_ROV_KEY_NAME_APP else: key_name = config_.UPDATE_ROV_KEY_NAME if not redis_helper.get_score_with_value(key_name=key_name, value=rov_recall_video[-1]): redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1], expire_time=expire_time) log_.info('last video redis update success!') # 将此次分发的流量池视频,对 本地分发数-1 进行记录 if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']] if flow_recall_video: update_local_distribute_count(flow_recall_video) log_.info('update local distribute count success!') except Exception as e: log_.error("update redis data fail!") log_.error(traceback.format_exc()) def update_local_distribute_count(videos): """ 更新本地分发数 :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '', 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....] :return: """ try: redis_helper = RedisHelper() for item in videos: key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool']) # 本地记录的分发数 - 1 redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # if redis_helper.key_exists(key_name=key_name): # # 该视频本地有记录,本地记录的分发数 - 1 # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # else: # # 该视频本地无记录,接口获取的分发数 - 1 # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60) except Exception as e: log_.error('update_local_distribute_count error...') log_.error(traceback.format_exc()) def get_recommend_params(ab_exp_info): """根据实验分组给定对应的推荐参数""" top_K = config_.K flow_pool_P = config_.P # 不获取人工干预数据标记 no_op_flag = False if not ab_exp_info: ab_code = config_.AB_CODE['initial'] expire_time = 24 * 3600 rule_key = config_.RULE_KEY['initial'] else: ab_exp_code_list = [] config_value_dict = {} for _, item in ab_exp_info.items(): if not item: continue for ab_item in item: ab_exp_code = ab_item.get('abExpCode', None) if not ab_exp_code: continue ab_exp_code_list.append(str(ab_exp_code)) config_value_dict[str(ab_exp_code)] = ab_item.get('configValue', None) # 推荐条数 10->4 实验 # if config_.AB_EXP_CODE['rec_size_home'] in ab_exp_code_list: # config_value = config_value_dict.get(config_.AB_EXP_CODE['rec_size_home'], None) # if config_value: # config_value = eval(str(config_value)) # else: # config_value = {} # log_.info(f'config_value: {config_value}, type: {type(config_value)}') # size = int(config_value.get('size', 4)) # top_K = int(config_value.get('K', 3)) # flow_pool_P = float(config_value.get('P', 0.3)) # else: # size = size # top_K = config_.K # flow_pool_P = config_.P # 算法实验相对对照组 if config_.AB_EXP_CODE['ab_initial'] in ab_exp_code_list: ab_code = config_.AB_CODE['ab_initial'] expire_time = 24 * 3600 rule_key = config_.RULE_KEY['initial'] no_op_flag = True # 小时级更新-规则1 实验 elif config_.AB_EXP_CODE['rule_rank1'] in ab_exp_code_list: ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank1') expire_time = 3600 rule_key = config_.RULE_KEY['rule_rank1'] no_op_flag = True # elif config_.AB_EXP_CODE['rule_rank2'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank2') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank2'] elif config_.AB_EXP_CODE['rule_rank3'] in ab_exp_code_list: ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank3') expire_time = 3600 rule_key = config_.RULE_KEY['rule_rank3'] no_op_flag = True # elif config_.AB_EXP_CODE['rule_rank4'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank4') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank4'] # elif config_.AB_EXP_CODE['rule_rank5'] in ab_exp_code_list: # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank5') # expire_time = 3600 # rule_key = config_.RULE_KEY['rule_rank5'] elif config_.AB_EXP_CODE['day_rule_rank1'] in ab_exp_code_list: ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank1') expire_time = 24 * 3600 rule_key = config_.RULE_KEY_DAY['day_rule_rank1'] no_op_flag = True elif config_.AB_EXP_CODE['day_rule_rank2'] in ab_exp_code_list: ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank2') expire_time = 24 * 3600 rule_key = config_.RULE_KEY_DAY['day_rule_rank2'] no_op_flag = True else: ab_code = config_.AB_CODE['initial'] expire_time = 24 * 3600 rule_key = config_.RULE_KEY['initial'] # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验 if config_.AB_EXP_CODE['rov_rank_appType_18_19'] in ab_exp_code_list: ab_code = config_.AB_CODE['rov_rank_appType_18_19'] expire_time = 3600 flow_pool_P = config_.P_18_19 no_op_flag = True elif config_.AB_EXP_CODE['rov_rank_appType_19'] in ab_exp_code_list: ab_code = config_.AB_CODE['rov_rank_appType_19'] expire_time = 3600 top_K = 2 flow_pool_P = config_.P_18_19 no_op_flag = True return top_K, flow_pool_P, ab_code, rule_key, expire_time, no_op_flag def video_homepage_recommend(mid, uid, size, app_type, algo_type, client_info, ab_exp_info): """ 首页线上推荐逻辑 :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...] :return: """ # 对 vlog 切换10%的流量做实验 # 对mid进行哈希 # hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest() # if app_type in config_.AB_TEST['rank_by_h'] and hash_mid[-1:] in ['8', '0', 'a', 'b']: # # 简单召回 - 排序 - 兜底 # rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, # algo_type=algo_type, client_info=client_info, # expire_time=3600, # ab_code=config_.AB_CODE['rank_by_h']) # # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert']], # app_type=app_type, mid=mid, uid=uid) # # redis数据刷新 # update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key, # expire_time=3600) if app_type == config_.APP_TYPE['APP']: # 票圈视频APP top_K = config_.K flow_pool_P = config_.P # 简单召回 - 排序 - 兜底 rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type=algo_type, client_info=client_info, expire_time=12 * 3600) # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert']], # app_type=app_type, mid=mid, uid=uid) # redis数据刷新 update_redis_data(result=rank_result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key, top_K=top_K, expire_time=12 * 3600) else: top_K, flow_pool_P, ab_code, rule_key, expire_time, no_op_flag = get_recommend_params(ab_exp_info=ab_exp_info) # 简单召回 - 排序 - 兜底 rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type=algo_type, client_info=client_info, ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, no_op_flag=no_op_flag) # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert']], # app_type=app_type, mid=mid, uid=uid) # redis数据刷新 update_redis_data(result=rank_result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key, top_K=top_K) return rank_result def video_relevant_recommend(video_id, mid, uid, size, app_type, ab_exp_info): """ 相关推荐逻辑 :param video_id: 相关推荐的头部视频id :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...] :return: videos type-list """ top_K, flow_pool_P, ab_code, rule_key, expire_time, no_op_flag = get_recommend_params(ab_exp_info=ab_exp_info) # 简单召回 - 排序 - 兜底 rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K, flow_pool_P=flow_pool_P, algo_type='', client_info=None, ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, no_op_flag=no_op_flag) # ab-test # result = ab_test_op(rank_result=rank_result, # ab_code_list=[config_.AB_CODE['position_insert'], config_.AB_CODE['relevant_video_op']], # app_type=app_type, mid=mid, uid=uid, head_vid=video_id, size=size) # redis数据刷新 update_redis_data(result=rank_result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key, top_K=top_K) return rank_result if __name__ == '__main__': videos = [ {"videoId": 10136461, "rovScore": 99.971, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10239014, "rovScore": 99.97, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 9851154, "rovScore": 99.969, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10104347, "rovScore": 99.968, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10141507, "rovScore": 99.967, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10292817, "flowPool": "2#6#2#1641780979606", "rovScore": 53.926690610816486, "pushFrom": "flow_pool", "abCode": 10000}, {"videoId": 10224932, "flowPool": "2#5#1#1641800279644", "rovScore": 53.47890460059617, "pushFrom": "flow_pool", "abCode": 10000}, {"videoId": 9943255, "rovScore": 99.966, "pushFrom": "recall_pool", "abCode": 10000}, {"videoId": 10282970, "flowPool": "2#5#1#1641784814103", "rovScore": 52.682815076325575, "pushFrom": "flow_pool", "abCode": 10000}, {"videoId": 10282205, "rovScore": 99.965, "pushFrom": "recall_pool", "abCode": 10000} ] res = relevant_video_top_recommend(app_type=4, mid='', uid=1111, head_vid=123, videos=videos, size=10) print(res)