123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import time
- import multiprocessing
- from log import Log
- from config import set_config
- from video_recall import PoolRecall
- from video_rank import video_rank, bottom_strategy
- from db_helper import RedisHelper
- log_ = Log()
- config_ = set_config()
- def video_recommend(mid, uid, size, app_type, algo_type):
- """
- 首页线上推荐逻辑
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :return:
- """
- ab_code = config_.AB_CODE
- # ####### 多进程召回
- start_recall = time.time()
- log_.info('====== recall')
- cores = multiprocessing.cpu_count()
- pool = multiprocessing.Pool(processes=cores)
- pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
- _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
- pool_list = [
- # rov召回池
- pool.apply_async(pool_recall.rov_pool_recall, (size,)),
- # 流量池
- pool.apply_async(pool_recall.flow_pool_recall, (size,))
- ]
- recall_result_list = [p.get() for p in pool_list]
- pool.close()
- pool.join()
- end_recall = time.time()
- log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
- mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
- # ####### 排序
- start_rank = time.time()
- log_.info('====== rank')
- data = {
- 'rov_pool_recall': recall_result_list[0],
- 'flow_pool_recall': recall_result_list[1]
- }
- rank_result = video_rank(data=data, size=size)
- end_rank = time.time()
- log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
- mid, uid, rank_result, (end_rank - start_rank) * 1000))
- if not rank_result:
- # 兜底策略
- log_.info('====== bottom strategy')
- start_bottom = time.time()
- rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
- end_bottom = time.time()
- log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
- mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
- # ####### redis数据刷新
- log_.info('====== update redis')
- # 预曝光数据同步刷新到Redis, 过期时间为0.5h
- redis_helper = RedisHelper()
- preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
- preview_video_ids = [item['videoId'] for item in rank_result]
- if preview_video_ids:
- redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
- log_.info('preview redis update success!')
- # 将此次获取的ROV召回池末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
- rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool']
- if rov_recall_video:
- redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
- log_.info('last video redis update success!')
- return rank_result
|