Explorar el Código

update exp config

linfan hace 1 año
padre
commit
dff1dab9e6
Se han modificado 3 ficheros con 98 adiciones y 45 borrados
  1. 38 27
      recommend.py
  2. 10 4
      video_rank.py
  3. 50 14
      video_recall.py

+ 38 - 27
recommend.py

@@ -176,6 +176,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     pool.close()
     pool.join()
     '''
+
     recall_result_list = []
     pool_recall = PoolRecall(request_id=request_id,
                              app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
@@ -195,8 +196,19 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     # 地域分组实验
     # if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
 
+    # load test config
+    exp_config = None
+    if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061:
+        if ab_code == 60058:
+            exp_config = pool_recall.get_hotrecall_config()
+        elif ab_code == 60059:
+            exp_config = pool_recall.get_w2v_config()
+        elif ab_code == 60060:
+            exp_config = pool_recall.get_test_config()
+        elif ab_code == 60061:
+            exp_config = pool_recall.get_simrecall_config()
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
-        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time, ab_code, exp_config)]
         if ab_code==60058:
             t.append(gevent.spawn(pool_recall.get_hot_item_reall))
         elif  ab_code==60059:
@@ -204,7 +216,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
         elif  ab_code==60061:
             t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
     else:
-        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time, ab_code, exp_config),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
         if ab_code==60058:
@@ -314,22 +326,20 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #else:
     #print("data['hot_recall']", data['hot_recall'])
     if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061 :
-        test_config = None
-        flow_config = None
-        if ab_code == 60058:
-            test_config =pool_recall.get_hotrecall_config()
-            flow_config = pool_recall.get_hotrecall_flow_config()
-        elif ab_code == 60059:
-            test_config = pool_recall.get_w2v_config()
-            flow_config = pool_recall.get_w2v_flow_config()
-        elif ab_code == 60060:
-            test_config = pool_recall.get_test_config()
-            flow_config = pool_recall.get_flow_config()
-        elif ab_code == 60061:
-            test_config = pool_recall.get_simrecall_config()
-            flow_config = pool_recall.get_simrecall_flow_config()
+        # if ab_code == 60058:
+        #     test_config =pool_recall.get_hotrecall_config()
+        #     flow_config = pool_recall.get_hotrecall_flow_config()
+        # elif ab_code == 60059:
+        #     test_config = pool_recall.get_w2v_config()
+        #     flow_config = pool_recall.get_w2v_flow_config()
+        # elif ab_code == 60060:
+        #     test_config = pool_recall.get_test_config()
+        #     flow_config = pool_recall.get_flow_config()
+        # elif ab_code == 60061:
+        #     test_config = pool_recall.get_simrecall_config()
+        #     flow_config = pool_recall.get_simrecall_flow_config()
         #print("test config:",test_config)
-        rank_result, flow_num = video_sanke_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=test_config, flowConfig=flow_config)
+        rank_result, flow_num = video_sanke_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=exp_config)
         result['flow_num'] = flow_num
         if rank_result:
             result['rank_num'] = len(rank_result)
@@ -422,8 +432,16 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                              client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
                              params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
 
+    exp_config = None
+    if ab_code == 60052:
+        exp_config = pool_recall.get_flow_exp_7_config()
+    elif ab_code == 60053:
+        exp_config = pool_recall.get_flow_exp_8_config()
+    elif ab_code == 60057:
+        exp_config = pool_recall.get_flow_exp_6_config()
+
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
-        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time, ab_code, exp_config)]
         if ab_code ==60054:
             t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
         if ab_code == 60055:
@@ -431,7 +449,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         if ab_code == 60056:
             t.append(gevent.spawn(pool_recall.get_hot_item_reall))
     else:
-        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time, ab_code, exp_config),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
         if  ab_code == 60054:
@@ -572,14 +590,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
             }
     #if ab_code=="ab_new_test":
     #print("before data:", data)
-    flow_config = None
-    if ab_code == 60052:
-        flow_config = pool_recall.get_flow_exp_7_config()
-    elif ab_code == 60053:
-        flow_config = pool_recall.get_flow_exp_8_config()
-    elif ab_code == 60057:
-        flow_config = pool_recall.get_flow_exp_6_config()
-    rank_result, flow_num  = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, flowConfig=flow_config)
+    rank_result, flow_num  = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, flowConfig=exp_config)
     #print(rank_result)
     if rank_result:
         result['rank_num'] = len(rank_result)

+ 10 - 4
video_rank.py

@@ -592,7 +592,7 @@ def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_vi
     return new_rank_result[:size]
 
 
-def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, flowConfig=None):
+def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, exp_config=None):
     """
         视频分发排序
         :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -655,6 +655,9 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, flowConfig=None):
         flow_recall_rank = flow_recall_rank[top_K:]
         # 按概率 p 及score排序获取 size - k 个视频
     flow_num = 0
+    flowConfig = "0"
+    if exp_config and exp_config['flowConfig']:
+        flowConfig = exp_config['flowConfig']
     if flowConfig == "1" and len(rov_recall_rank) > 0:
         for recall_item in rank_result:
             flow_recall_name = recall_item.get("flowPool", '')
@@ -707,7 +710,7 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, flowConfig=None):
             i += 1
         return rank_result[:size], flow_num
 
-def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None, flowConfig=None):
+def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
     """
     视频分发排序
     :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -761,8 +764,8 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
 
     recall_list = [('rov_recall_region_h',1, 1),('rov_recall_region_h',0.5, 1),('rov_recall_region_24h',1,1),
                    ('hot_recall',0.5,1), ('w2v_recall',0.5,1),('rov_recall_24h',1,1), ('rov_recall_24h_dup',0.5,1)]
-    if exp_config is not  None:
-        recall_list = exp_config
+    if exp_config  and exp_config['recall_list']:
+        recall_list = exp_config['recall_list']
     #print("recall_config:", recall_list)
     rov_recall_rank = []
     select_ids = set('')
@@ -811,6 +814,9 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
         rank_result.extend(flow_recall_rank[:top_K])
         flow_recall_rank = flow_recall_rank[top_K:]
     flow_num = 0
+    flowConfig ="0"
+    if exp_config and exp_config['flowConfig']:
+        flowConfig = exp_config['flowConfig']
     if flowConfig == "1" and len(rov_recall_rank) > 0:
         rank_result.extend(rov_recall_rank[:top_K])
         for recall_item in rank_result:

+ 50 - 14
video_recall.py

@@ -1210,7 +1210,7 @@ class PoolRecall(object):
             idx = 0
         return key_name, last_region_dup_key, idx
 
-    def rov_pool_recall_with_region_process(self, size=4, expire_time=24*3600):
+    def rov_pool_recall_with_region_process(self, size=4, expire_time=24*3600, ab_code=None, exp_config=None):
         """
         地域分组召回视频
         :param size: 获取视频个数
@@ -1274,13 +1274,22 @@ class PoolRecall(object):
         # 将已获取到的视频按顺序去重合并
         now_video_ids = []
         recall_result = []
+        recall_num = size
+        if ab_code and exp_config:
+            if ab_code==60058 or ab_code==60059 or ab_code == 60060 or ab_code == 60061 :
+                try:
+                    recall_num = int(exp_config['recall_num'])
+                except:
+                    recall_num = size
+        if recall_num<size:
+            recall_num = size
         for region_result in region_recall_result_list:
             for video in region_result:
                 video_id = video.get('videoId')
                 if video_id not in now_video_ids:
                     recall_result.append(video)
                     now_video_ids.append(video_id)
-                    if len(recall_result) >= size:
+                    if len(recall_result) >= recall_num:
                         break
                     else:
                         continue
@@ -1310,10 +1319,10 @@ class PoolRecall(object):
         # })
         return recall_result[:size]
 
-    def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
+    def rov_pool_recall_with_region(self, size=4, expire_time=24*3600, ab_code=None, exp_config=None):
         """召回池召回视频"""
         # 获取召回池中视频
-        videos = self.rov_pool_recall_with_region_process(size=size, expire_time=expire_time)
+        videos = self.rov_pool_recall_with_region_process(size=size, expire_time=expire_time, ab_code=ab_code, exp_config=exp_config)
         # 对在流量池中存在的视频添加标记字段
         result = []
         for item in videos:
@@ -2519,7 +2528,7 @@ class PoolRecall(object):
 
 
     def get_test_config(self):
-        recall_key = "test_config"
+        recall_key = "test_exp_config"
         # print("recall_key:", recall_key)
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
         if data is not None:
@@ -2534,7 +2543,7 @@ class PoolRecall(object):
             return None
 
     def get_w2v_config(self):
-        recall_key = "w2v_config"
+        recall_key = "w2v_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
         if data is not None:
             try:
@@ -2548,7 +2557,7 @@ class PoolRecall(object):
             return None
 
     def get_hotrecall_config(self):
-        recall_key = "ht_config"
+        recall_key = "ht_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
         if data is not None:
             try:
@@ -2562,7 +2571,7 @@ class PoolRecall(object):
             return None
 
     def get_simrecall_config(self):
-        recall_key = "simrecall_config"
+        recall_key = "simrecall_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
         if data is not None:
             try:
@@ -2596,16 +2605,43 @@ class PoolRecall(object):
         return data
 
     def get_flow_exp_7_config(self):
-        recall_key = "exp7_flow_config:"
+        recall_key = "exp7_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
-        return data
+        if data is not None:
+            try:
+                # print(data)
+                json_result = json.loads(data)
+                # print(json_result)
+                return json_result
+            except Exception as e:
+                return None
+        else:
+            return None
 
     def get_flow_exp_8_config(self):
-        recall_key = "exp8_flow_config:"
+        recall_key = "exp8_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
-        return data
+        if data is not None:
+            try:
+                # print(data)
+                json_result = json.loads(data)
+                # print(json_result)
+                return json_result
+            except Exception as e:
+                return None
+        else:
+            return None
 
     def get_flow_exp_6_config(self):
-        recall_key = "exp6_flow_config:"
+        recall_key = "exp6_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
-        return data
+        if data is not None:
+            try:
+                # print(data)
+                json_result = json.loads(data)
+                # print(json_result)
+                return json_result
+            except Exception as e:
+                return None
+        else:
+            return None