Browse Source

add flow control

linfan 1 year ago
parent
commit
1df6ba37d3
3 changed files with 166 additions and 48 deletions
  1. 21 3
      recommend.py
  2. 109 44
      video_rank.py
  3. 36 1
      video_recall.py

+ 21 - 3
recommend.py

@@ -315,16 +315,25 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #print("data['hot_recall']", data['hot_recall'])
     if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061 :
         test_config = None
+        flow_config = None
         if ab_code == 60058:
             test_config =pool_recall.get_hotrecall_config()
+            flow_config = pool_recall.get_hotrecall_flow_config()
         elif ab_code == 60059:
             test_config = pool_recall.get_w2v_config()
+            flow_config = pool_recall.get_w2v_flow_config()
         elif ab_code == 60060:
             test_config = pool_recall.get_test_config()
+            flow_config = pool_recall.get_flow_config()
         elif ab_code == 60061:
             test_config = pool_recall.get_simrecall_config()
+            flow_config = pool_recall.get_simrecall_flow_config()
         #print("test config:",test_config)
-        rank_result = video_sanke_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=test_config)
+        rank_result, flow_num = video_sanke_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=test_config, flowConfig=flow_config)
+        result['flow_num'] = flow_num
+        result['rank_num'] = len(rank_result)
+        if rank_result:
+            result['rank_num'] = len(rank_result)
     else:
         rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
 
@@ -564,10 +573,19 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
             }
     #if ab_code=="ab_new_test":
     #print("before data:", data)
-    rank_result = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code)
+    flow_config = None
+    if ab_code == 60052:
+        flow_config = pool_recall.get_flow_exp_7_config()
+    elif ab_code == 60053:
+        flow_config = pool_recall.get_flow_exp_8_config()
+    elif ab_code == 60057:
+        flow_config = pool_recall.get_flow_exp_6_config()
+    rank_result, flow_num  = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, flowConfig=flow_config)
     #print(rank_result)
-
+    if rank_result:
+        result['rank_num'] = len(rank_result)
     result['rankResult'] = rank_result
+    result['flow_num'] = flow_num
     result['rankTime'] = (time.time() - start_rank) * 1000
 
 

+ 109 - 44
video_rank.py

@@ -592,7 +592,7 @@ def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_vi
     return new_rank_result[:size]
 
 
-def video_new_rank2(data, size, top_K, flow_pool_P, ab_code):
+def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, flowConfig=None):
     """
         视频分发排序
         :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -602,7 +602,7 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code):
         :return: rank_result
         """
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
-        return []
+        return [], 0
 
     redisObj = RedisHelper()
     vidKeys = []
@@ -654,29 +654,61 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code):
         rank_result.extend(flow_recall_rank[:top_K])
         flow_recall_rank = flow_recall_rank[top_K:]
         # 按概率 p 及score排序获取 size - k 个视频
-    i = 0
-    while i < size - top_K:
-        # 随机生成[0, 1)浮点数
-        rand = random.random()
-        # log_.info('rand: {}'.format(rand))
-        if rand < flow_pool_P:
-            if flow_recall_rank:
-                rank_result.append(flow_recall_rank[0])
-                flow_recall_rank.remove(flow_recall_rank[0])
+    flow_num = 0
+    if flowConfig == "1" and len(rov_recall_rank) > 0:
+        if flowConfig == "1" and len(rov_recall_rank) > 0:
+            for recall_item in rank_result:
+                flow_recall_name = recall_item.get("flowPool", '')
+                flow_num = flow_num + 1
+            all_recall_rank = rov_recall_rank + flow_recall_rank
+            if flow_num > 0:
+                rank_result.extend(all_recall_rank[:size - top_K])
             else:
-                rank_result.extend(rov_recall_rank[:size - top_K - i])
-                return rank_result[:size]
-        else:
-            if rov_recall_rank:
-                rank_result.append(rov_recall_rank[0])
-                rov_recall_rank.remove(rov_recall_rank[0])
+                i = 0
+                while i < size - top_K:
+                    # 随机生成[0, 1)浮点数
+                    rand = random.random()
+                    # log_.info('rand: {}'.format(rand))
+                    if rand < flow_pool_P:
+                        if flow_recall_rank:
+                            rank_result.append(flow_recall_rank[0])
+                            flow_recall_rank.remove(flow_recall_rank[0])
+                        else:
+                            rank_result.extend(rov_recall_rank[:size - top_K - i])
+                            return rank_result[:size], flow_num
+                    else:
+                        if rov_recall_rank:
+                            rank_result.append(rov_recall_rank[0])
+                            rov_recall_rank.remove(rov_recall_rank[0])
+                        else:
+                            rank_result.extend(flow_recall_rank[:size - top_K - i])
+                            return rank_result[:size], flow_num
+                    i += 1
+
+    else:
+        i = 0
+        while i < size - top_K:
+            # 随机生成[0, 1)浮点数
+            rand = random.random()
+            # log_.info('rand: {}'.format(rand))
+            if rand < flow_pool_P:
+                if flow_recall_rank:
+                    rank_result.append(flow_recall_rank[0])
+                    flow_recall_rank.remove(flow_recall_rank[0])
+                else:
+                    rank_result.extend(rov_recall_rank[:size - top_K - i])
+                    return rank_result[:size], flow_num
             else:
-                rank_result.extend(flow_recall_rank[:size - top_K - i])
-                return rank_result[:size]
-        i += 1
-    return rank_result[:size]
+                if rov_recall_rank:
+                    rank_result.append(rov_recall_rank[0])
+                    rov_recall_rank.remove(rov_recall_rank[0])
+                else:
+                    rank_result.extend(flow_recall_rank[:size - top_K - i])
+                    return rank_result[:size], flow_num
+            i += 1
+        return rank_result[:size], flow_num
 
-def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
+def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None, flowConfig=None):
     """
     视频分发排序
     :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -687,7 +719,7 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
     """
     if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
             and not data['hot_rcall'] and not data['hot_rcall']:
-        return []
+        return [], 0
     # 地域分组小时级规则更新数据
     recall_dict = {}
     region_h_recall = [item for item in data['rov_pool_recall']
@@ -779,29 +811,62 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
     else:
         rank_result.extend(flow_recall_rank[:top_K])
         flow_recall_rank = flow_recall_rank[top_K:]
-
-    # 按概率 p 及score排序获取 size - k 个视频
-    i = 0
-    while i < size - top_K:
-        # 随机生成[0, 1)浮点数
-        rand = random.random()
-        # log_.info('rand: {}'.format(rand))
-        if rand < flow_pool_P:
-            if flow_recall_rank:
-                rank_result.append(flow_recall_rank[0])
-                flow_recall_rank.remove(flow_recall_rank[0])
+    flow_num = 0
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        for recall_item in rank_result:
+            flow_recall_name = recall_item.get("flowPool", '')
+            if flow_recall_name is not None and flow_recall_name.find("#") > -1:
+                flow_num = flow_num + 1
+            all_recall_rank = rov_recall_rank + flow_recall_rank
+            if flow_num > 0:
+                rank_result.extend(all_recall_rank[:size - top_K])
+                return rank_result[:size], flow_num
             else:
-                rank_result.extend(rov_recall_rank[:size - top_K - i])
-                return rank_result[:size]
-        else:
-            if rov_recall_rank:
-                rank_result.append(rov_recall_rank[0])
-                rov_recall_rank.remove(rov_recall_rank[0])
+                # 按概率 p 及score排序获取 size - k 个视频
+                i = 0
+                while i < size - top_K:
+                    # 随机生成[0, 1)浮点数
+                    rand = random.random()
+                    # log_.info('rand: {}'.format(rand))
+                    if rand < flow_pool_P:
+                        if flow_recall_rank:
+                            rank_result.append(flow_recall_rank[0])
+                            flow_recall_rank.remove(flow_recall_rank[0])
+                        else:
+                            rank_result.extend(rov_recall_rank[:size - top_K - i])
+                            return rank_result[:size], flow_num
+                    else:
+                        if rov_recall_rank:
+                            rank_result.append(rov_recall_rank[0])
+                            rov_recall_rank.remove(rov_recall_rank[0])
+                        else:
+                            rank_result.extend(flow_recall_rank[:size - top_K - i])
+                            return rank_result[:size], flow_num
+                    i += 1
+    else:
+        # 按概率 p 及score排序获取 size - k 个视频
+        i = 0
+        while i < size - top_K:
+            # 随机生成[0, 1)浮点数
+            rand = random.random()
+            # log_.info('rand: {}'.format(rand))
+            if rand < flow_pool_P:
+                if flow_recall_rank:
+                    rank_result.append(flow_recall_rank[0])
+                    flow_recall_rank.remove(flow_recall_rank[0])
+                else:
+                    rank_result.extend(rov_recall_rank[:size - top_K - i])
+                    return rank_result[:size], flow_num
             else:
-                rank_result.extend(flow_recall_rank[:size - top_K - i])
-                return rank_result[:size]
-        i += 1
-    return rank_result[:size]
+                if rov_recall_rank:
+                    rank_result.append(rov_recall_rank[0])
+                    rov_recall_rank.remove(rov_recall_rank[0])
+                else:
+                    rank_result.extend(flow_recall_rank[:size - top_K - i])
+                    return rank_result[:size],flow_num
+            i += 1
+    return rank_result[:size], flow_num
 
 
 

+ 36 - 1
video_recall.py

@@ -2573,4 +2573,39 @@ class PoolRecall(object):
             except Exception as e:
                 return None
         else:
-            return None
+            return None
+
+    def get_hotrecall_flow_config(self):
+        recall_key = "ht_flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data
+
+    def get_w2v_flow_config(self):
+        recall_key = "w2v_flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data
+
+    def get_flow_config(self):
+        recall_key = "flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data
+
+    def get_simrecall_flow_config(self):
+        recall_key = "flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data
+
+    def get_flow_exp_7_config(self):
+        recall_key = "exp7_flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data
+
+    def get_flow_exp_8_config(self):
+        recall_key = "exp8_flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data
+
+    def get_flow_exp_6_config(self):
+        recall_key = "exp6_flow_config:"
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        return data