Browse Source

add flow_set_level_score abtest

liqian 1 year ago
parent
commit
61592879b9
4 changed files with 68 additions and 32 deletions
  1. 2 1
      app.py
  2. 1 0
      config.py
  3. 47 23
      recommend.py
  4. 18 8
      video_recall.py

+ 2 - 1
app.py

@@ -31,7 +31,8 @@ config_ = set_config()
 level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
 flow_pool_abtest_config = {'control_group': [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
                            'experimental_flow_set_level': [],
-                           'experimental_flow_set_level_score': []}
+                           'experimental_flow_set_level_score': [],
+                           'flow_set_level_score': {'control': [], 'experimental': []}}
 ad_arpu = 0
 ad_roi_param = 0
 

+ 1 - 0
config.py

@@ -779,6 +779,7 @@ class BaseConfig(object):
     # 流量池各层分发概率权重存放 redis key,完整格式 flow:pool:level:
     FLOWPOOL_LEVEL_WEIGHT_KEY_NAME = 'flow:pool:level:recommend:weight'
     # 流量池视频分层存放(按分数排序) redis key前缀,完整格式 flow:pool:level:item:score:{appType}:{level}
+    # 流量池视频分ab实验分层存放(按分数排序) redis key前缀,完整格式 flow:pool:level:item:score:{abKey}:{appType}:{level}
     FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE = 'flow:pool:level:item:score:'
     # 快速曝光流量池数据存放 redis key前缀,完整格式 flow:pool:quick:item:{appType}:{flowPool_id}
     QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET = 'flow:pool:quick:item:'

+ 47 - 23
recommend.py

@@ -141,7 +141,7 @@ def positon_duplicate(pos1_vids, pos2_vids, videos):
 def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
-                    shield_config=None, level_weight=None, flow_pool_abtest_group=None):
+                    shield_config=None, level_weight=None, flow_pool_abtest_group=None, flow_pool_ab_key=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -236,12 +236,13 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
-    elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
+    elif flow_pool_abtest_group == 'experimental_flow_set_level_score' or \
+            flow_pool_abtest_group == 'flow_set_level_score':
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
-                          size, flow_pool_abtest_group=flow_pool_abtest_group)]
+                          size, flow_pool_abtest_group=flow_pool_abtest_group, flow_pool_ab_key=flow_pool_ab_key)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall,
@@ -429,7 +430,7 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
                     shield_config=None, env_dict=None, level_weight=None, flow_pool_abtest_group=None,
-                        rank_key_prefix=None):
+                        flow_pool_ab_key=None, rank_key_prefix=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -481,12 +482,13 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
-    elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
+    elif flow_pool_abtest_group == 'experimental_flow_set_level_score' or \
+            flow_pool_abtest_group == 'flow_set_level_score':
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
-                          size, flow_pool_abtest_group=flow_pool_abtest_group)]
+                          size, flow_pool_abtest_group=flow_pool_abtest_group, flow_pool_ab_key=flow_pool_ab_key)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall,
@@ -951,7 +953,8 @@ def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
     return rank_result
 
 
-def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None, flow_pool_abtest_group=None):
+def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None,
+                      flow_pool_abtest_group=None, flow_pool_ab_key=None):
     """
     根据最终的排序结果更新相关redis数据
     :param result: 排序结果
@@ -1051,8 +1054,9 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_w
             if flow_recall_video:
                 if flow_pool_abtest_group == 'experimental_flow_set_level':
                     update_local_distribute_count_new_with_level(flow_recall_video, level_weight)
-                elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
-                    update_local_distribute_count_new_with_level_score(flow_recall_video, level_weight)
+                elif flow_pool_abtest_group == 'experimental_flow_set_level_score' \
+                        or flow_pool_abtest_group == 'flow_set_level_score':
+                    update_local_distribute_count_new_with_level_score(flow_recall_video, level_weight, flow_pool_ab_key)
                 else:
                     update_local_distribute_count(flow_recall_video)
                 # update_local_distribute_count_new(flow_recall_video)
@@ -1304,7 +1308,7 @@ def update_local_distribute_count_new_with_level(videos, level_weight):
         log_.error(traceback.format_exc())
 
 
-def update_local_distribute_count_new_with_level_score(videos, level_weight):
+def update_local_distribute_count_new_with_level_score(videos, level_weight, flow_pool_ab_key=None):
     """
     更新本地分发数
     :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
@@ -1339,7 +1343,14 @@ def update_local_distribute_count_new_with_level_score(videos, level_weight):
                         f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
                     ]
                     for level in level_list:
-                        flow_pool_key_list.append(f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{app_type}:{level}")
+                        if flow_pool_ab_key is None:
+                            flow_pool_key_list.append(
+                                f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{app_type}:{level}"
+                            )
+                        else:
+                            flow_pool_key_list.append(
+                                f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{flow_pool_ab_key}:{app_type}:{level}"
+                            )
                     for key in flow_pool_key_list:
                         remove_res = redis_helper.remove_value_from_zset(key_name=key, value=f"{video_id}-{flow_pool}")
                         if remove_res > 0:
@@ -1822,13 +1833,20 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
     flow_pool_id_choice = random.choice(config_.FLOWPOOL_ID_LIST)
     # 2. 判断流量id所属实验配置分组
     flow_pool_abtest_group = 'control_group'
-    for key, items in flow_pool_abtest_config.items():
-        if int(flow_pool_id_choice) in items:
-            flow_pool_abtest_group = key
+    flow_pool_ab_key = None
+    for key, key_items in flow_pool_abtest_config.items():
+        if type(key_items) == 'dict':
+            for ab_key, ab_items in key_items.items():
+                if int(flow_pool_id_choice) in ab_items:
+                    flow_pool_abtest_group = key
+                    flow_pool_ab_key = ab_key
+        else:
+            if int(flow_pool_id_choice) in key_items:
+                flow_pool_abtest_group = key
     # log_.info(f"flow_pool_id_choice: {flow_pool_id_choice}, flow_pool_abtest_group: {flow_pool_abtest_group}")
 
     return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day, \
-           shield_config, flow_pool_abtest_group, rank_key_prefix
+           shield_config, flow_pool_abtest_group, flow_pool_ab_key, rank_key_prefix
 
 
 def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
@@ -1897,7 +1915,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
     # 普通mid推荐处理
     top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
-    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, rank_key_prefix = \
+    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, flow_pool_ab_key, \
+    rank_key_prefix = \
         get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, mid=mid,
                              app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
     # log_.info({
@@ -1940,6 +1959,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                                                rule_key_30day=rule_key_30day, shield_config=shield_config,
                                                env_dict=env_dict, level_weight=level_weight,
                                                flow_pool_abtest_group=flow_pool_abtest_group,
+                                               flow_pool_ab_key=flow_pool_ab_key,
                                                rank_key_prefix=rank_key_prefix)
         recommend_result['fea_info'] = fea_info
     else:
@@ -1948,7 +1968,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                                  ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key,
                                  no_op_flag=no_op_flag, old_video_index=old_video_index, params=params,
                                  rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight,
-                                 flow_pool_abtest_group=flow_pool_abtest_group)
+                                 flow_pool_abtest_group=flow_pool_abtest_group, flow_pool_ab_key=flow_pool_ab_key)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1977,7 +1997,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     #     update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
 
     update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K,
-                      level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group)
+                      level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group,
+                      flow_pool_ab_key=flow_pool_ab_key)
 
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
@@ -2025,9 +2046,10 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
 
     # 普通mid推荐处理
     top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
-    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, rank_key_prefix = \
-        get_recommend_params(recommend_type=1, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, page_type=page_type,
-                             mid=mid, app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
+    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group,  flow_pool_ab_key, \
+    rank_key_prefix = \
+        get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, mid=mid,
+                             app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -2063,6 +2085,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                                                rule_key_30day=rule_key_30day, shield_config=shield_config,
                                                env_dict=env_dict, level_weight=level_weight,
                                                flow_pool_abtest_group=flow_pool_abtest_group,
+                                               flow_pool_ab_key=flow_pool_ab_key,
                                                rank_key_prefix=rank_key_prefix)
         recommend_result['fea_info'] = fea_info
     else:
@@ -2071,7 +2094,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                                  expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
                                  old_video_index=old_video_index, video_id=video_id, params=params,
                                  rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight,
-                                 flow_pool_abtest_group=flow_pool_abtest_group)
+                                 flow_pool_abtest_group=flow_pool_abtest_group, flow_pool_ab_key=flow_pool_ab_key)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -2101,7 +2124,8 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     #      update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
 
     update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K,
-                      level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group)
+                      level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group,
+                      flow_pool_ab_key=flow_pool_ab_key)
 
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),

+ 18 - 8
video_recall.py

@@ -793,7 +793,8 @@ class PoolRecall(object):
 
         return flow_pool_recall_result[:size]
 
-    def flow_pool_recall_new_with_level_score(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
+    def flow_pool_recall_new_with_level_score(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None,
+                                              flow_pool_ab_key=None):
         """从流量池中获取视频"""
         # 获取存在城市分组数据的城市编码列表
         city_code_list = [code for _, code in config_.CITY_CODE.items()]
@@ -810,7 +811,8 @@ class PoolRecall(object):
         if region_code == '':
             region_code = '-1'
 
-        flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
+        flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id,
+                                                       flow_pool_ab_key=flow_pool_ab_key)
         if flow_pool_key is None:
             return []
         # print(flow_pool_key)
@@ -869,7 +871,8 @@ class PoolRecall(object):
             if filtered_result:
                 # st_check = time.time()
                 ge = gevent.spawn(self.check_video_counts_new_with_level_score,
-                                  video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                                  video_ids=filtered_result, flow_pool_mapping=video_mapping,
+                                  flow_pool_ab_key=flow_pool_ab_key)
                 ge.join()
                 check_result = ge.get()
                 # log_.info({
@@ -891,7 +894,8 @@ class PoolRecall(object):
                         flow_pool_recall_result.append(
                             {'videoId': video_id, 'flowPool': flow_pool, 'level': level,
                              'rovScore': video_score[f"{video_id}-{flow_pool}"], 'pushFrom': config_.PUSH_FROM['flow_recall'],
-                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
+                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group,
+                             'flow_pool_ab_key': flow_pool_ab_key}
                         )
 
                         flow_pool_recall_videos.append(video_id)
@@ -1052,7 +1056,7 @@ class PoolRecall(object):
                         log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
         return check_result
 
-    def check_video_counts_new_with_level_score(self, video_ids, flow_pool_mapping):
+    def check_video_counts_new_with_level_score(self, video_ids, flow_pool_mapping, flow_pool_ab_key=None):
         """
         检查视频剩余可分发数
         :param video_ids: 视频id type-list
@@ -1090,7 +1094,10 @@ class PoolRecall(object):
                     value = '{}-{}'.format(video_id, flow_pool)
                     for item in config_.APP_TYPE:
                         for level in level_list:
-                            flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{config_.APP_TYPE.get(item)}:{level}"
+                            if flow_pool_ab_key is None:
+                                flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{config_.APP_TYPE.get(item)}:{level}"
+                            else:
+                                flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{flow_pool_ab_key}:{config_.APP_TYPE.get(item)}:{level}"
                             remove_res = self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
                             if remove_res > 0:
                                 add_remove_log = True
@@ -1154,7 +1161,7 @@ class PoolRecall(object):
         return check_result, error_flag
     """
 
-    def get_pool_redis_key(self, pool_type, flow_pool_id=None):
+    def get_pool_redis_key(self, pool_type, flow_pool_id=None, flow_pool_ab_key=None):
         """
         拼接key
         :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
@@ -1269,7 +1276,10 @@ class PoolRecall(object):
                 # 2. 判断各层级是否有视频需分发
                 available_level = []
                 for level, weight in level_weight.items():
-                    level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{self.app_type}:{level}"
+                    if flow_pool_ab_key is None:
+                        level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{self.app_type}:{level}"
+                    else:
+                        level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{flow_pool_ab_key}:{self.app_type}:{level}"
                     if self.redis_helper.key_exists(key_name=level_key):
                         available_level.append((level, level_key, weight))
                 if len(available_level) == 0: