Browse Source

add rank abtest: 469, 470, 471

liqian 1 năm trước cách đây
mục cha
commit
25a2f72dad
3 tập tin đã thay đổi với 114 bổ sung12 xóa
  1. 16 0
      config.py
  2. 23 12
      recommend.py
  3. 75 0
      video_rank.py

+ 16 - 0
config.py

@@ -176,6 +176,9 @@ class BaseConfig(object):
             'abtest_463': 60077,
             'abtest_465': 60078,
             'abtest_466': 60079,
+            'abtest_469': 60080,
+            'abtest_470': 60081,
+            'abtest_471': 60082,
         },  # 地域分组小时级规则实验
 
         'rank_by_24h': {
@@ -480,6 +483,19 @@ class BaseConfig(object):
             'data_key': 'data1', 'rule_key': 'rule24',
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_466')
         },  # simrecall+融合+地域召回使用vlog数据+地域小时级召回score2(增加前两小时回流留存特征)+ 回流数据使用 分享限制地域,回流不限制地域 统计数据
+        '469': {
+            'data_key': 'data10', 'rule_key': 'rule7', 'rank_key_prefix': 'rank:score2:',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_469')
+        },  # simrecall + 融合 + rank_score2
+        '470': {
+            'data_key': 'data10', 'rule_key': 'rule7', 'rank_key_prefix': 'rank:score4:',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_470')
+        },  # simrecall + 融合 + rank_score4
+        '471': {
+            'data_key': 'data10', 'rule_key': 'rule7', 'rank_key_prefix': 'rank:score5:',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_471')
+        },  # simrecall + 融合 + rank_score5
+
     }
 
     # APP ab实验配置

+ 23 - 12
recommend.py

@@ -11,7 +11,8 @@ import config
 from log import Log
 from config import set_config
 from video_recall import PoolRecall
-from video_rank import video_new_rank2, video_sank_pos_rank,video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
+from video_rank import video_new_rank2, video_sank_pos_rank, video_new_rank, video_rank, refactor_video_rank, \
+    bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2, video_new_rank3
 from db_helper import RedisHelper
 import gevent
 from utils import FilterVideos, get_user_has30day_return
@@ -421,7 +422,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
 def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
                     no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
-                    shield_config=None, env_dict=None, level_weight=None, flow_pool_abtest_group=None):
+                    shield_config=None, env_dict=None, level_weight=None, flow_pool_abtest_group=None,
+                        rank_key_prefix=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -480,7 +482,8 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
              gevent.spawn(pool_recall.flow_pool_recall,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
     if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074 \
-            or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 or ab_code == 60078 or ab_code == 60079:
+            or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 or ab_code == 60078 or ab_code == 60079 \
+            or ab_code == 60080 or ab_code == 60081 or ab_code == 60082:
         t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
     elif ab_code == 60056 or ab_code == 60071:
         t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
@@ -502,7 +505,8 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         return result
     #1. merge simrecall or  deepfm
     if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074 \
-            or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 or ab_code == 60078 or ab_code == 60079:
+            or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 or ab_code == 60078 or ab_code == 60079 \
+            or ab_code == 60080 or ab_code == 60081 or ab_code == 60082:
         rov_pool_recall = []
         if len(recall_result_list) >= 2:
             region_recall = recall_result_list[0]
@@ -673,7 +677,10 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         env_dict['city_code'] = city_code
         env_json = env_dict
     #4.
-    rank_result, flow_num  = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
+    # rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
+    rank_result, flow_num = video_new_rank3(
+        data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix
+    )
     #print(rank_result)
     if rank_result:
         result['rank_num'] = len(rank_result)
@@ -1336,6 +1343,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
     data_key = param.get('data_key')
     rule_key_30day = param.get('30day_rule_key')
     shield_config = config_.SHIELD_CONFIG
+    rank_key_prefix = 'rank:score1:'
 
     # 默认使用 095 实验的配置
     # ab_code = config_.AB_EXP_CODE['095'].get('ab_code')
@@ -1450,6 +1458,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
                         data_key = param.get('data_key')
                         rule_key_30day = param.get('30day_rule_key')
                         shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
+                        rank_key_prefix = param.get('rank_key_prefix', 'rank:score1:')
                         break
 
             """
@@ -1749,7 +1758,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_typ
     # log_.info(f"flow_pool_id_choice: {flow_pool_id_choice}, flow_pool_abtest_group: {flow_pool_abtest_group}")
 
     return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day, \
-           shield_config, flow_pool_abtest_group
+           shield_config, flow_pool_abtest_group, rank_key_prefix
 
 
 def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
@@ -1818,7 +1827,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
     # 普通mid推荐处理
     top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
-    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group = \
+    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, rank_key_prefix = \
         get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, mid=mid,
                              app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
     # log_.info({
@@ -1851,7 +1860,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
             or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \
             or ab_code == 60074 or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 \
-            or ab_code == 60078 or ab_code == 60079:
+            or ab_code == 60078 or ab_code == 60079 or ab_code == 60080 or ab_code == 60081 or ab_code == 60082:
         result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size,
                                                top_K=top_K, flow_pool_P=flow_pool_P, algo_type='',
                                                client_info=client_info, ab_code=ab_code, expire_time=expire_time,
@@ -1859,7 +1868,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                                                old_video_index=old_video_index, video_id=None, params=params,
                                                rule_key_30day=rule_key_30day, shield_config=shield_config,
                                                env_dict=env_dict, level_weight=level_weight,
-                                               flow_pool_abtest_group=flow_pool_abtest_group)
+                                               flow_pool_abtest_group=flow_pool_abtest_group,
+                                               rank_key_prefix=rank_key_prefix)
         recommend_result['fea_info'] = fea_info
     else:
         result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K,
@@ -1944,7 +1954,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
 
     # 普通mid推荐处理
     top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
-    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group = \
+    no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, rank_key_prefix = \
         get_recommend_params(recommend_type=1, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, page_type=page_type,
                              mid=mid, app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
     # log_.info({
@@ -1972,7 +1982,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
             or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \
             or ab_code == 60074 or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 \
-            or ab_code == 60078 or ab_code == 60079:
+            or ab_code == 60078 or ab_code == 60079 or ab_code == 60080 or ab_code == 60081 or ab_code == 60082:
         result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size,
                                                top_K=top_K, flow_pool_P=flow_pool_P, algo_type='',
                                                client_info=client_info, ab_code=ab_code, expire_time=expire_time,
@@ -1980,7 +1990,8 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                                                old_video_index=old_video_index, video_id=video_id, params=params,
                                                rule_key_30day=rule_key_30day, shield_config=shield_config,
                                                env_dict=env_dict, level_weight=level_weight,
-                                               flow_pool_abtest_group=flow_pool_abtest_group)
+                                               flow_pool_abtest_group=flow_pool_abtest_group,
+                                               rank_key_prefix=rank_key_prefix)
         recommend_result['fea_info'] = fea_info
     else:
         result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K,

+ 75 - 0
video_rank.py

@@ -774,6 +774,81 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=Non
         return rank_result[:size], flow_num
 
 
+def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:'):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :param rank_key_prefix:
+        :return: rank_result
+    """
+    redis_helper = RedisHelper()
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
+        return [], 0
+
+    rov_recall_rank = data['rov_pool_recall']
+    vid_keys = []
+    rec_recall_item_list = []
+    rec_recall_vid_list = []
+    for recall_item in data['rov_pool_recall']:
+        try:
+            vid = int(recall_item.get("videoId", 0))
+            rec_recall_vid_list.append(vid)
+            rec_recall_item_list.append(recall_item)
+            vid_keys.append(f"{rank_key_prefix}{vid}")
+        except:
+            continue
+    video_scores = redis_helper.get_batch_key(vid_keys)
+    if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
+        for i in range(len(video_scores)):
+            try:
+                if video_scores[i] is None:
+                    rec_recall_item_list[i]['sort_score'] = 0.0
+                else:
+                    rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
+            except Exception:
+                rec_recall_item_list[i]['sort_score'] = 0.0
+        rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    rov_recall_rank, flow_recall_rank = remove_duplicate(
+        rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
+    )
+    rank_result = []
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+        # 按概率 p 及score排序获取 size - k 个视频
+    flow_num = 0
+    i = 0
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            if flow_recall_rank:
+                rank_result.append(flow_recall_rank[0])
+                flow_recall_rank.remove(flow_recall_rank[0])
+            else:
+                rank_result.extend(rov_recall_rank[:size - top_K - i])
+                return rank_result[:size], flow_num
+        else:
+            if rov_recall_rank:
+                rank_result.append(rov_recall_rank[0])
+                rov_recall_rank.remove(rov_recall_rank[0])
+            else:
+                rank_result.extend(flow_recall_rank[:size - top_K - i])
+                return rank_result[:size], flow_num
+        i += 1
+    return rank_result[:size], flow_num
+
+
 # 排序服务兜底
 def sup_rank(video_scores, recall_list):
     if video_scores and len(recall_list) > 0: