pool_manager.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import redis
  2. import random
  3. from models.cache import R
  4. from utils import filter_utils
  5. from configs import config_redis, config_algo, config_basic
  6. class pool_manager:
  7. def __init__(self, k, p, mid, scene='MP-FRONT',counts=10):
  8. self.k = k
  9. self.p = p
  10. self.mid = mid
  11. self.scene = scene
  12. self.counts = counts
  13. #前k个按rov分值排序,后面total-k个,按p的概率从测试池取值,1-p的概率从召回池取
  14. def get_rov_data(self):
  15. res_all = []
  16. res_recall = self._get_data_from_cache(config_redis.ROV_SERVICE_RECALL_VIDS, self.counts)
  17. res_pool = self._get_data_from_cache(config_redis.ROV_SERVICE_POOL_VIDS, self.counts)
  18. head_recall = 0
  19. head_pool = 0
  20. len_recall = len(res_recall)
  21. len_pool = len(res_pool)
  22. i = 0
  23. item = None
  24. vid_from = None
  25. #按分排序取top k
  26. while(i<self.k):
  27. if res_recall[head_recall][1]>res_pool[head_pool][1] and head_recall<len_recall:
  28. item = res_recall[head_recall]
  29. vid_from = config_algo.DISTRIBUTE_FROM_RECALL
  30. head_recall += 1
  31. elif head_pool<len_pool:
  32. item = res_pool[head_pool]
  33. vid_from = config_algo.DISTRIBUTE_FROM_POOL
  34. head_pool += 1
  35. self.check_over_distribute(item[0])
  36. else:
  37. break
  38. res_all.append({'vid':item[0], 'score':item[1], 'from':vid_from})
  39. i += 1
  40. #按概率及分排序取counts-k
  41. i = 0
  42. while(i<self.counts-self.k):
  43. randint = random.randint(1,10)
  44. print(randint)
  45. if randint <self.p and head_recall<len_recall:
  46. item = res_recall[head_recall]
  47. vid_from = config_algo.DISTRIBUTE_FROM_RECALL
  48. head_recall += 1
  49. elif head_pool<len_pool:
  50. item = res_pool[head_pool]
  51. vid_from = config_algo.DISTRIBUTE_FROM_POOL
  52. head_pool += 1
  53. self.check_over_distribute(item[0])
  54. else:
  55. break
  56. res_all.append({'vid':item[0], 'score':item[1], 'from':vid_from})
  57. i+= 1
  58. #兜底
  59. if len(res_all)<self.counts:
  60. res_backup = self._get_data_from_cache(config_redis.ROV_SERVICE_BACKUP_VIDS, self.counts-len(res_all))
  61. res_all.extend(res_backup[:self.counts-len(res_all)])
  62. #TODO
  63. #添加score及from来源
  64. #TODO
  65. #无数据,不再过滤,继续兜底
  66. return res_all
  67. #检查vid是否已超量分发(只对测试池有效)
  68. def check_over_distribute(self, vid):
  69. vid = eval(vid)
  70. #TODO
  71. pass
  72. #从缓存中key取counts个数据
  73. def _get_data_from_cache(self, key, counts):
  74. start = 0
  75. end = counts
  76. res = []
  77. _res = self._many_get_data_from_cache(start, end, key, counts)
  78. res.extend(_res)
  79. return res
  80. #循环取数据
  81. def _many_get_data_from_cache(self, start, end, key, counts):
  82. res = []
  83. while(len(res)<counts):
  84. _res = self._sub_get_data_from_cache(start, end, key, counts)
  85. if len(_res) == 0:
  86. break
  87. res.extend(_res)
  88. start = end
  89. end = start + counts
  90. return res
  91. #从缓存中key取counts个数据
  92. def _sub_get_data_from_cache(self, start, end, key, counts):
  93. # res = R.zrange(key, start, end, withscores=True)
  94. res = R.zrevrange(key, start, end, withscores=True)
  95. '''
  96. res = filter_utils.filter_res_unavailable(res, self.scene)
  97. res = filter_utils.filter_mid_viewed_videos(res, self.scene, self.mid)
  98. res = filter_utils.filter_mid_viewed_videos(res, self.scene, self.mid)
  99. '''
  100. return res