baichongyang 3 år sedan
förälder
incheckning
d15c0d6102
7 ändrade filer med 101 tillägg och 39 borttagningar
  1. 2 2
      configs/config_algo.py
  2. 3 1
      configs/config_basic.py
  3. 11 7
      configs/config_redis.py
  4. 58 25
      controller/pool_manager.py
  5. 19 0
      mock/mock_redis.py
  6. 4 0
      models/cache.py
  7. 4 4
      utils/filter_utils.py

+ 2 - 2
configs/config_algo.py

@@ -1,4 +1,4 @@
 DISTRIBUTE_FROM=''
-DISTRIBUTE_FROM_RECALL = ''
-DISTRIBUTE_FROM_POOL = ''
+DISTRIBUTE_FROM_RECALL = 'from_recall'
+DISTRIBUTE_FROM_POOL = 'from_pool'
 ABTEST_CODE = ''

+ 3 - 1
configs/config_basic.py

@@ -1,2 +1,4 @@
+IS_DEBUG = True
+
 DISTRIBUTE_X = 10 #按X倍的曝光率去分发视频
-RECALL_VIDS_SURPLUS = 3 #按X倍的量去召回视频,进行过滤
+RECALL_VIDS_SURPLUS = 3 #按X倍的量去召回视频,进行过滤

+ 11 - 7
configs/config_redis.py

@@ -1,15 +1,19 @@
 #redis ip
-REDIS_HOST = ''
+REDIS_HOST = 'r-bp1u0cbv5pp2treyex140.redis.rds.aliyuncs.com'
 #redis 端口
-REDIS_PORT = ''
+REDIS_PORT = '6379'
+#redis 密码
+REDIS_PWD = 'Wqsd@2019'
 
 #召回池
-ROV_SERVICE_RECALL_VIDS = ''
+ROV_SERVICE_RECALL_VIDS = 'ROV_SERVICE_RECALL_VIDS'
 #测试池
-ROV_SERVICE_POOL_VIDS = ''
+ROV_SERVICE_POOL_VIDS = 'ROV_SERVICE_POOL_VIDS'
+#测试池
+ROV_SERVICE_BACKUP_VIDS = 'ROV_SERVICE_BACKUP_VIDS'
 #测试池视频已分发量
-ROV_SERVICE_POOL_VIDS_DISTRIBUTE_COUNTS = ''
+ROV_SERVICE_POOL_VIDS_DISTRIBUTE_COUNTS = 'ROV_SERVICE_POOL_VIDS_DISTRIBUTE_COUNTS'
 #测试池视频分发上限
-ROV_SERVICE_POOL_VIDS_DISTRIBUTE_LIMIT = ''
+ROV_SERVICE_POOL_VIDS_DISTRIBUTE_LIMIT = 'ROV_SERVICE_POOL_VIDS_DISTRIBUTE_LIMIT'
 #已分发视频(preview)
-ROV_SERVICE_MID_DISTRIBUTED = ''
+ROV_SERVICE_MID_DISTRIBUTED = 'ROV_SERVICE_MID_DISTRIBUTED'

+ 58 - 25
controller/pool_manager.py

@@ -1,77 +1,110 @@
 import redis
 import random
-from models.model import R
+from models.cache import R
 from utils import filter_utils
-from configs import config_redis, config_algo
+from configs import config_redis, config_algo, config_basic
 
 class pool_manager:
-    def __init__(self, k, p, mid, scene,counts=10):
+    def __init__(self, k, p, mid, scene='MP-FRONT',counts=10):
         self.k = k
         self.p = p
         self.mid = mid
         self.scene = scene
         self.counts = counts
 
+
     #前k个按rov分值排序,后面total-k个,按p的概率从测试池取值,1-p的概率从召回池取
-    def get_rov_data(self, k,p):
+    def get_rov_data(self):
         res_all = []
-        res_recall = self._get_data_from_cache(config_redis.ROV_SERVICE_RECALL_VIDS, self.counts, self.mid)
-        res_pool = self._get_data_from_cache(config_redis.ROV_SERVICE_POOL_VIDS, self.counts, self.mid)
+        res_recall = self._get_data_from_cache(config_redis.ROV_SERVICE_RECALL_VIDS, self.counts)
+        res_pool = self._get_data_from_cache(config_redis.ROV_SERVICE_POOL_VIDS, self.counts)
         head_recall = 0
         head_pool = 0
-        #TODO
-        #添加score及from来源
+        len_recall = len(res_recall)
+        len_pool = len(res_pool)
+
         i = 0
         item = None
-        while(i<k):
-            if res_recall[head_recall]['score']>res_pool[head_pool]['score']:
+        vid_from = None
+        #按分排序取top k
+        while(i<self.k):
+            if res_recall[head_recall][1]>res_pool[head_pool][1] and head_recall<len_recall:
                 item = res_recall[head_recall]
+                vid_from = config_algo.DISTRIBUTE_FROM_RECALL
                 head_recall += 1
-            else:
+            elif head_pool<len_pool:
                 item = res_pool[head_pool]
+                vid_from = config_algo.DISTRIBUTE_FROM_POOL
                 head_pool += 1
-            res_all.append(item)
+                self.check_over_distribute(item[0])
+            else:
+                break
+            res_all.append({'vid':item[0], 'score':item[1], 'from':vid_from})
             i += 1
-
+        #按概率及分排序取counts-k
         i = 0
-        while(i<self.counts-k):
-            if random.randint(1,10)<p:
+        while(i<self.counts-self.k):
+            randint = random.randint(1,10)
+            print(randint)
+            if randint <self.p and head_recall<len_recall:
                 item = res_recall[head_recall]
+                vid_from = config_algo.DISTRIBUTE_FROM_RECALL
                 head_recall += 1
-            else:
+            elif head_pool<len_pool:
                 item = res_pool[head_pool]
+                vid_from = config_algo.DISTRIBUTE_FROM_POOL
                 head_pool += 1
-            res_all.append(item)
+                self.check_over_distribute(item[0])
+            else:
+                break
+            res_all.append({'vid':item[0], 'score':item[1], 'from':vid_from})
             i+= 1
 
+        #兜底
+        if len(res_all)<self.counts:
+            res_backup = self._get_data_from_cache(config_redis.ROV_SERVICE_BACKUP_VIDS, self.counts-len(res_all))    
+            res_all.extend(res_backup[:self.counts-len(res_all)])
+        #TODO
+        #添加score及from来源
+        #TODO
+        #无数据,不再过滤,继续兜底
         return res_all
 
-    #获取兜底数据
-    def get_backup_data(self, vid, start, counts):
-        pass
 
     #检查vid是否已超量分发(只对测试池有效)
     def check_over_distribute(self, vid):
+        vid = eval(vid)
+        #TODO
         pass
 
     #从缓存中key取counts个数据
-    def _get_data_from_cache(self, key, counts, mid):
+    def _get_data_from_cache(self, key, counts):
         start = 0
         end = counts
+        res = []
+        _res = self._many_get_data_from_cache(start, end, key, counts)
+        res.extend(_res)
+        return res
+
+    #循环取数据
+    def _many_get_data_from_cache(self, start, end, key, counts):
         res = []
         while(len(res)<counts):
-            _res = self._sub_get_data_from_cache(self, start, end, key, counts)
+            _res = self._sub_get_data_from_cache(start, end, key, counts)
+            if len(_res) == 0:
+                break
             res.extend(_res)
             start = end
             end = start + counts
-        #TODO
-        #数量不足用backup数据补齐
         return res
 
     #从缓存中key取counts个数据
     def _sub_get_data_from_cache(self, start, end, key, counts):
-        res = R.zrange(key, start, end, withscores=True)
+        # res = R.zrange(key, start, end, withscores=True)
+        res = R.zrevrange(key, start, end, withscores=True)
+        '''
         res = filter_utils.filter_res_unavailable(res, self.scene)
         res = filter_utils.filter_mid_viewed_videos(res, self.scene, self.mid)
         res = filter_utils.filter_mid_viewed_videos(res, self.scene, self.mid)
+        '''
         return res

+ 19 - 0
mock/mock_redis.py

@@ -0,0 +1,19 @@
+import sys 
+sys.path.append("..") 
+from models import cache
+from configs import config_redis
+import json
+
+cache.R.zremrangebyscore(config_redis.ROV_SERVICE_RECALL_VIDS, 0, 10000)
+cache.R.zremrangebyscore(config_redis.ROV_SERVICE_POOL_VIDS, 0, 10000)
+
+item = {}
+for i in range(1,100):
+    item[i] = i+10
+cache.R.zadd(config_redis.ROV_SERVICE_RECALL_VIDS, item)
+
+item = {}
+for i in range(1,200):
+    item[i] = i+10
+
+cache.R.zadd(config_redis.ROV_SERVICE_POOL_VIDS, item)

+ 4 - 0
models/cache.py

@@ -0,0 +1,4 @@
+from configs import config_redis
+import redis
+
+R = redis.Redis(host=config_redis.REDIS_HOST, port=config_redis.REDIS_PORT, password=config_redis.REDIS_PWD)

+ 4 - 4
utils/filter_utils.py

@@ -1,15 +1,15 @@
 #检查视频是否不可用
-def check_res_unavailable(vids):
+def filter_res_unavailable(vids):
     pass
 
 #移除不可用视频
-def rm_unavailable_from_db(vids):
+def _rm_unavailable_from_db(vids):
     pass
 
 #已看过的视频
-def check_mid_viewed_videos(mid):
+def filter_mid_viewed_videos(mid):
     pass
 
 #暂时分发过的视频
-def check_mid_previewed_videos(mid):
+def filter_mid_previewed_videos(mid):
     pass