import time import multiprocessing import traceback from datetime import datetime from log import Log from config import set_config from video_recall import PoolRecall from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate from db_helper import RedisHelper import gevent log_ = Log() config_ = set_config() def video_recommend(mid, uid, size, app_type, algo_type, client_info): """ 首页线上推荐逻辑 :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"} :return: """ ab_code = config_.AB_CODE['initial'] # ####### 多进程召回 start_recall = time.time() # log_.info('====== recall') ''' cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=cores) pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code) _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() pool_list = [ # rov召回池 pool.apply_async(pool_recall.rov_pool_recall, (size,)), # 流量池 pool.apply_async(pool_recall.flow_pool_recall, (size,)) ] recall_result_list = [p.get() for p in pool_list] pool.close() pool.join() ''' recall_result_list = [] pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info) _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() t = [gevent.spawn(pool_recall.rov_pool_recall, size), gevent.spawn(pool_recall.flow_pool_recall, size) ] gevent.joinall(t) recall_result_list = [i.get() for i in t] end_recall = time.time() log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format( mid, uid, recall_result_list, (end_recall - start_recall) * 1000)) # ####### 排序 start_rank = time.time() # log_.info('====== rank') data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } rank_result = video_rank(data=data, size=size) end_rank = time.time() log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format( mid, uid, rank_result, (end_rank - start_rank) * 1000)) if not rank_result: # 兜底策略 # log_.info('====== bottom strategy') start_bottom = time.time() rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code) end_bottom = time.time() log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format( mid, uid, rank_result, (end_bottom - start_bottom) * 1000)) # ####### 视频宽高比AB实验 # 对内容精选进行 视频宽高比分发实验 if app_type == config_.APP_TYPE['LONG_VIDEO']: videos = rank_result rank_result = video_rank_by_w_h_rate(videos=videos) log_.info('app_type: {}, mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format( app_type, mid, uid, rank_result, (end_rank - start_rank) * 1000)) # ####### redis数据刷新 # log_.info('====== update redis') # 预曝光数据同步刷新到Redis, 过期时间为0.5h redis_helper = RedisHelper() preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid) preview_video_ids = [int(item['videoId']) for item in rank_result] if preview_video_ids: # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids))) redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60) log_.info('preview redis update success!') # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天 rov_recall_video = [item['videoId'] for item in rank_result[:config_.K] if item['pushFrom'] == config_.PUSH_FROM['rov_recall']] if len(rov_recall_video) > 0: if not redis_helper.get_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=rov_recall_video[-1]): redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1]) # if 0 < len(rov_recall_video) <= config_.K: # if not redis_helper.get_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=rov_recall_video[-1]): # redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1]) # elif len(rov_recall_video) > config_.K: # if not redis_helper.get_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=rov_recall_video[config_.K - 1]): # redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1]) log_.info('last video redis update success!') # 将此次分发的流量池视频,对 本地分发数-1 进行记录 flow_recall_video = [item for item in rank_result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']] if flow_recall_video: update_local_distribute_count(flow_recall_video) log_.info('update local distribute count success!') return rank_result def update_local_distribute_count(videos): """ 更新本地分发数 :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '', 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....] :return: """ try: redis_helper = RedisHelper() for item in videos: key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool']) # 本地记录的分发数 - 1 redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # if redis_helper.key_exists(key_name=key_name): # # 该视频本地有记录,本地记录的分发数 - 1 # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60) # else: # # 该视频本地无记录,接口获取的分发数 - 1 # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60) except Exception as e: log_.error('update_local_distribute_count error...') log_.error(traceback.format_exc()) def video_relevant_recommend(mid, uid, size, app_type): """ 相关推荐逻辑 :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :return: videos type-list """ videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type='', client_info=None) return videos if __name__ == '__main__': videos = [{'videoId': '12345', 'flowPool': '133#442#2', 'distributeCount': 10}] update_local_distribute_count(videos)