recommend.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import time
  2. import multiprocessing
  3. import traceback
  4. from datetime import datetime
  5. from log import Log
  6. from config import set_config
  7. from video_recall import PoolRecall
  8. from video_rank import video_rank, bottom_strategy
  9. from db_helper import RedisHelper
  10. import gevent
  11. log_ = Log()
  12. config_ = set_config()
  13. def video_recommend(mid, uid, size, app_type, algo_type):
  14. """
  15. 首页线上推荐逻辑
  16. :param mid: mid type-string
  17. :param uid: uid type-string
  18. :param size: 请求视频数量 type-int
  19. :param app_type: 产品标识 type-int
  20. :param algo_type: 算法类型 type-string
  21. :return:
  22. """
  23. ab_code = config_.AB_CODE
  24. # ####### 多进程召回
  25. start_recall = time.time()
  26. log_.info('====== recall')
  27. '''
  28. cores = multiprocessing.cpu_count()
  29. pool = multiprocessing.Pool(processes=cores)
  30. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  31. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  32. pool_list = [
  33. # rov召回池
  34. pool.apply_async(pool_recall.rov_pool_recall, (size,)),
  35. # 流量池
  36. pool.apply_async(pool_recall.flow_pool_recall, (size,))
  37. ]
  38. recall_result_list = [p.get() for p in pool_list]
  39. pool.close()
  40. pool.join()
  41. '''
  42. recall_result_list = []
  43. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  44. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  45. t = [gevent.spawn(pool_recall.rov_pool_recall, size), gevent.spawn(pool_recall.flow_pool_recall, size) ]
  46. gevent.joinall(t)
  47. recall_result_list = [i.get() for i in t]
  48. end_recall = time.time()
  49. log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
  50. mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
  51. # ####### 排序
  52. start_rank = time.time()
  53. log_.info('====== rank')
  54. data = {
  55. 'rov_pool_recall': recall_result_list[0],
  56. 'flow_pool_recall': recall_result_list[1]
  57. }
  58. rank_result = video_rank(data=data, size=size)
  59. end_rank = time.time()
  60. log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
  61. mid, uid, rank_result, (end_rank - start_rank) * 1000))
  62. if not rank_result:
  63. # 兜底策略
  64. log_.info('====== bottom strategy')
  65. start_bottom = time.time()
  66. rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
  67. end_bottom = time.time()
  68. log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
  69. mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
  70. # ####### redis数据刷新
  71. log_.info('====== update redis')
  72. # 预曝光数据同步刷新到Redis, 过期时间为0.5h
  73. redis_helper = RedisHelper()
  74. preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
  75. preview_video_ids = [int(item['videoId']) for item in rank_result]
  76. if preview_video_ids:
  77. log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
  78. redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
  79. log_.info('preview redis update success!')
  80. # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  81. rov_recall_video = [item['videoId'] for item in rank_result[:3] if item['pushFrom'] == 'recall_pool']
  82. if 0 < len(rov_recall_video) <= config_.K:
  83. if not redis_helper.get_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=rov_recall_video[-1]):
  84. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
  85. elif len(rov_recall_video) > config_.K:
  86. if not redis_helper.get_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=rov_recall_video[config_.K - 1]):
  87. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1])
  88. log_.info('last video redis update success!')
  89. # 将此次分发的流量池视频,对 本地分发数-1 进行记录
  90. flow_recall_video = [item for item in rank_result if item['pushFrom'] == 'flow_pool']
  91. if flow_recall_video:
  92. update_local_distribute_count(flow_recall_video)
  93. log_.info('update local distribute count success!')
  94. return rank_result
  95. def update_local_distribute_count(videos):
  96. """
  97. 更新本地分发数
  98. :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
  99. 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
  100. :return:
  101. """
  102. try:
  103. redis_helper = RedisHelper()
  104. for item in videos:
  105. key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
  106. # 本地记录的分发数 - 1
  107. redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  108. # if redis_helper.key_exists(key_name=key_name):
  109. # # 该视频本地有记录,本地记录的分发数 - 1
  110. # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  111. # else:
  112. # # 该视频本地无记录,接口获取的分发数 - 1
  113. # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
  114. except Exception as e:
  115. log_.error('update_local_distribute_count error...')
  116. log_.error(traceback.format_exc())
  117. def video_relevant_recommend(mid, uid, size, app_type):
  118. """
  119. 相关推荐逻辑
  120. :param mid: mid type-string
  121. :param uid: uid type-string
  122. :param size: 请求视频数量 type-int
  123. :param app_type: 产品标识 type-int
  124. :param algo_type: 算法类型 type-string
  125. :return: videos type-list
  126. """
  127. videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type='')
  128. return videos
  129. if __name__ == '__main__':
  130. videos = [{'videoId': '12345', 'flowPool': '133#442#2', 'distributeCount': 10}]
  131. update_local_distribute_count(videos)