123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import redis
- import random
- from models.cache import R
- from utils import filter_utils
- from configs import config_redis, config_algo, config_basic
- class pool_manager:
- def __init__(self, k, p, mid, scene='MP-FRONT',counts=10):
- self.k = k
- self.p = p
- self.mid = mid
- self.scene = scene
- self.counts = counts
- #前k个按rov分值排序,后面total-k个,按p的概率从测试池取值,1-p的概率从召回池取
- def get_rov_data(self):
- res_all = []
- res_recall = self._get_data_from_cache(config_redis.ROV_SERVICE_RECALL_VIDS, self.counts)
- res_pool = self._get_data_from_cache(config_redis.ROV_SERVICE_POOL_VIDS, self.counts)
- head_recall = 0
- head_pool = 0
- len_recall = len(res_recall)
- len_pool = len(res_pool)
- i = 0
- item = None
- vid_from = None
- #按分排序取top k
- while(i<self.k):
- if res_recall[head_recall][1]>res_pool[head_pool][1] and head_recall<len_recall:
- item = res_recall[head_recall]
- vid_from = config_algo.DISTRIBUTE_FROM_RECALL
- head_recall += 1
- elif head_pool<len_pool:
- item = res_pool[head_pool]
- vid_from = config_algo.DISTRIBUTE_FROM_POOL
- head_pool += 1
- self.check_over_distribute(item[0])
- else:
- break
- res_all.append({'vid':item[0], 'score':item[1], 'from':vid_from})
- i += 1
- #按概率及分排序取counts-k
- i = 0
- while(i<self.counts-self.k):
- randint = random.randint(1,10)
- print(randint)
- if randint <self.p and head_recall<len_recall:
- item = res_recall[head_recall]
- vid_from = config_algo.DISTRIBUTE_FROM_RECALL
- head_recall += 1
- elif head_pool<len_pool:
- item = res_pool[head_pool]
- vid_from = config_algo.DISTRIBUTE_FROM_POOL
- head_pool += 1
- self.check_over_distribute(item[0])
- else:
- break
- res_all.append({'vid':item[0], 'score':item[1], 'from':vid_from})
- i+= 1
- #兜底
- if len(res_all)<self.counts:
- res_backup = self._get_data_from_cache(config_redis.ROV_SERVICE_BACKUP_VIDS, self.counts-len(res_all))
- res_all.extend(res_backup[:self.counts-len(res_all)])
- #TODO
- #添加score及from来源
- #TODO
- #无数据,不再过滤,继续兜底
- return res_all
- #检查vid是否已超量分发(只对测试池有效)
- def check_over_distribute(self, vid):
- vid = eval(vid)
- #TODO
- pass
- #从缓存中key取counts个数据
- def _get_data_from_cache(self, key, counts):
- start = 0
- end = counts
- res = []
- _res = self._many_get_data_from_cache(start, end, key, counts)
- res.extend(_res)
- return res
- #循环取数据
- def _many_get_data_from_cache(self, start, end, key, counts):
- res = []
- while(len(res)<counts):
- _res = self._sub_get_data_from_cache(start, end, key, counts)
- if len(_res) == 0:
- break
- res.extend(_res)
- start = end
- end = start + counts
- return res
- #从缓存中key取counts个数据
- def _sub_get_data_from_cache(self, start, end, key, counts):
- # res = R.zrange(key, start, end, withscores=True)
- res = R.zrevrange(key, start, end, withscores=True)
- '''
- res = filter_utils.filter_res_unavailable(res, self.scene)
- res = filter_utils.filter_mid_viewed_videos(res, self.scene, self.mid)
- res = filter_utils.filter_mid_viewed_videos(res, self.scene, self.mid)
- '''
- return res
|