import time import multiprocessing import traceback from datetime import datetime from log import Log from config import set_config from video_recall import PoolRecall from video_rank import video_rank, bottom_strategy from db_helper import RedisHelper log_ = Log() config_ = set_config() def video_recommend(mid, uid, size, app_type, algo_type): """ 首页线上推荐逻辑 :param mid: mid type-string :param uid: uid type-string :param size: 请求视频数量 type-int :param app_type: 产品标识 type-int :param algo_type: 算法类型 type-string :return: """ ab_code = config_.AB_CODE # ####### 多进程召回 start_recall = time.time() log_.info('====== recall') cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=cores) pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code) _, last_rov_recall_key, _ = pool_recall.get_video_last_idx() pool_list = [ # rov召回池 pool.apply_async(pool_recall.rov_pool_recall, (size,)), # 流量池 pool.apply_async(pool_recall.flow_pool_recall, (size,)) ] recall_result_list = [p.get() for p in pool_list] pool.close() pool.join() end_recall = time.time() log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format( mid, uid, recall_result_list, (end_recall - start_recall) * 1000)) # ####### 排序 start_rank = time.time() log_.info('====== rank') data = { 'rov_pool_recall': recall_result_list[0], 'flow_pool_recall': recall_result_list[1] } rank_result = video_rank(data=data, size=size) end_rank = time.time() log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format( mid, uid, rank_result, (end_rank - start_rank) * 1000)) if not rank_result: # 兜底策略 log_.info('====== bottom strategy') start_bottom = time.time() rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code) end_bottom = time.time() log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format( mid, uid, rank_result, (end_bottom - start_bottom) * 1000)) # ####### redis数据刷新 log_.info('====== update redis') # 预曝光数据同步刷新到Redis, 过期时间为0.5h redis_helper = RedisHelper() preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid) preview_video_ids = [item['videoId'] for item in rank_result] if preview_video_ids: redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60) log_.info('preview redis update success!') # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天 rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool'] if 0 < len(rov_recall_video) <= config_.K: redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1]) elif len(rov_recall_video) > config_.K: redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1]) log_.info('last video redis update success!') # 将此次分发的流量池视频,对 本地分发数-1 进行记录 flow_recall_video = [item for item in rank_result if item['pushFrom'] == 'flow_pool'] if flow_recall_video: update_local_distribute_count(flow_recall_video) log_.info('update local distribute count success!') return rank_result def update_local_distribute_count(videos): """ 更新本地分发数 :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '', 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....] :return: """ try: redis_h = datetime.now().hour if datetime.now().minute >= 30: redis_h += 0.5 key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h) print(key_name) redis_helper = RedisHelper() update_data = {} for item in videos: video = '{}-{}'.format(item['videoId'], item['flowPool']) current_count = redis_helper.get_score_with_value(key_name=key_name, value=video) if current_count is not None: # 该视频本地有记录,本地记录的分发数 - 1 new_count = current_count - 1 else: # 该视频本地无记录,接口获取的分发数 - 1 new_count = int(item['distributeCount']) - 1 update_data[video] = new_count log_.info('now update video local distribute count: {}, key: {}'.format(update_data, key_name)) # 更新redis中的数据 redis_helper.add_data_with_zset(key_name=key_name, data=update_data, expire_time=0.5*3600) except Exception as e: log_.error(traceback.format_exc()) if __name__ == '__main__': videos = [{'videoId': '12345', 'flowPool': '133#442#2', 'distributeCount': 10}] update_local_distribute_count(videos)