recommend.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import time
  2. import multiprocessing
  3. import traceback
  4. from datetime import datetime
  5. from log import Log
  6. from config import set_config
  7. from video_recall import PoolRecall
  8. from video_rank import video_rank, bottom_strategy
  9. import aiocache
  10. log_ = Log()
  11. config_ = set_config()
  12. async def video_recommend(mid, uid, size, app_type, algo_type):
  13. """
  14. 首页线上推荐逻辑
  15. :param mid: mid type-string
  16. :param uid: uid type-string
  17. :param size: 请求视频数量 type-int
  18. :param app_type: 产品标识 type-int
  19. :param algo_type: 算法类型 type-string
  20. :return:
  21. """
  22. ab_code = config_.AB_CODE
  23. # ####### 多进程召回
  24. start_recall = time.time()
  25. log_.info('====== recall')
  26. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  27. _, last_rov_recall_key, _ = await pool_recall.get_video_last_idx()
  28. recall_1 = await pool_recall.rov_pool_recall(size)
  29. recall_2 = await pool_recall.flow_pool_recall(size)
  30. end_recall = time.time()
  31. log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
  32. mid, uid, [recall_1, recall_2], (end_recall - start_recall) * 1000))
  33. # ####### 排序
  34. start_rank = time.time()
  35. log_.info('====== rank')
  36. data = {
  37. 'rov_pool_recall': recall_1,
  38. 'flow_pool_recall': recall_2
  39. }
  40. rank_result = video_rank(data=data, size=size)
  41. end_rank = time.time()
  42. log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
  43. mid, uid, rank_result, (end_rank - start_rank) * 1000))
  44. if not rank_result:
  45. # 兜底策略
  46. log_.info('====== bottom strategy')
  47. start_bottom = time.time()
  48. rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
  49. end_bottom = time.time()
  50. log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
  51. mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
  52. # ####### redis数据刷新
  53. log_.info('====== update redis')
  54. # 预曝光数据同步刷新到Redis, 过期时间为0.5h
  55. preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
  56. preview_video_ids = [item['videoId'] for item in rank_result]
  57. if preview_video_ids:
  58. await aiocache.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
  59. log_.info('preview redis update success!')
  60. # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  61. rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool']
  62. if 0 < len(rov_recall_video) <= config_.K:
  63. await aiocache.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
  64. elif len(rov_recall_video) > config_.K:
  65. await aiocache.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[config_.K - 1])
  66. log_.info('last video redis update success!')
  67. # 将此次分发的流量池视频,对 本地分发数-1 进行记录
  68. flow_recall_video = [item for item in rank_result if item['pushFrom'] == 'flow_pool']
  69. if flow_recall_video:
  70. update_local_distribute_count(flow_recall_video)
  71. log_.info('update local distribute count success!')
  72. return rank_result
  73. async def update_local_distribute_count(videos):
  74. """
  75. 更新本地分发数
  76. :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
  77. 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
  78. :return:
  79. """
  80. try:
  81. redis_h = datetime.now().hour
  82. if datetime.now().minute >= 30:
  83. redis_h += 0.5
  84. key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
  85. print(key_name)
  86. update_data = {}
  87. for item in videos:
  88. video = '{}-{}'.format(item['videoId'], item['flowPool'])
  89. current_count = await aiocache.get_score_with_value(key_name=key_name, value=video)
  90. if current_count is not None:
  91. # 该视频本地有记录,本地记录的分发数 - 1
  92. new_count = current_count - 1
  93. else:
  94. # 该视频本地无记录,接口获取的分发数 - 1
  95. new_count = int(item['distributeCount']) - 1
  96. update_data[video] = new_count
  97. log_.info('now update video local distribute count: {}, key: {}'.format(update_data, key_name))
  98. # 更新redis中的数据
  99. await aiocache.add_data_with_zset(key_name=key_name, data=update_data, expire_time=0.5*3600)
  100. except Exception as e:
  101. log_.error(traceback.format_exc())
  102. if __name__ == '__main__':
  103. videos = [{'videoId': '12345', 'flowPool': '133#442#2', 'distributeCount': 10}]
  104. update_local_distribute_count(videos)