recommend.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import time
  2. import multiprocessing
  3. from log import Log
  4. from config import set_config
  5. from video_recall import PoolRecall
  6. from video_rank import video_rank, bottom_strategy
  7. from db_helper import RedisHelper
  8. log_ = Log()
  9. config_ = set_config()
  10. def video_recommend(mid, uid, size, app_type, algo_type):
  11. """
  12. 首页线上推荐逻辑
  13. :param mid: mid type-string
  14. :param uid: uid type-string
  15. :param size: 请求视频数量 type-int
  16. :param app_type: 产品标识 type-int
  17. :param algo_type: 算法类型 type-string
  18. :return:
  19. """
  20. ab_code = config_.AB_CODE
  21. # ####### 多进程召回
  22. start_recall = time.time()
  23. log_.info('====== recall')
  24. cores = multiprocessing.cpu_count()
  25. pool = multiprocessing.Pool(processes=cores)
  26. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  27. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  28. pool_list = [
  29. # rov召回池
  30. pool.apply_async(pool_recall.rov_pool_recall, (size,)),
  31. # 流量池
  32. pool.apply_async(pool_recall.flow_pool_recall, (size,))
  33. ]
  34. recall_result_list = [p.get() for p in pool_list]
  35. pool.close()
  36. pool.join()
  37. end_recall = time.time()
  38. log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
  39. mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
  40. # ####### 排序
  41. start_rank = time.time()
  42. log_.info('====== rank')
  43. data = {
  44. 'rov_pool_recall': recall_result_list[0],
  45. 'flow_pool_recall': recall_result_list[1]
  46. }
  47. rank_result = video_rank(data=data, size=size)
  48. end_rank = time.time()
  49. log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
  50. mid, uid, rank_result, (end_rank - start_rank) * 1000))
  51. if not rank_result:
  52. # 兜底策略
  53. log_.info('====== bottom strategy')
  54. start_bottom = time.time()
  55. rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
  56. end_bottom = time.time()
  57. log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
  58. mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
  59. # ####### redis数据刷新
  60. log_.info('====== update redis')
  61. # 预曝光数据同步刷新到Redis, 过期时间为0.5h
  62. redis_helper = RedisHelper()
  63. preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
  64. preview_video_ids = [item['videoId'] for item in rank_result]
  65. if preview_video_ids:
  66. redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
  67. log_.info('preview redis update success!')
  68. # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  69. rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool']
  70. if 0 < len(rov_recall_video) <= config_.K:
  71. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
  72. elif len(rov_recall_video) > config_.K:
  73. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1])
  74. log_.info('last video redis update success!')
  75. return rank_result