Browse Source

Merge branch 'sorte_recall' into master_05_18

linfan 1 year ago
parent
commit
0ab015b307
4 changed files with 281 additions and 4 deletions
  1. 15 0
      config.py
  2. 151 4
      recommend.py
  3. 78 0
      video_rank.py
  4. 37 0
      video_recall.py

+ 15 - 0
config.py

@@ -148,6 +148,9 @@ class BaseConfig(object):
             'abtest_320': 60049,
             'abtest_322': 60050,
             'abtest_323': 60051,
+            'abtest_326': 60052,
+            'abtest_327': 60053,
+            'abtest_328': 60054,
         },  # 地域分组小时级规则实验
 
         'rank_by_24h': {
@@ -339,6 +342,18 @@ class BaseConfig(object):
         '323': {
             'data_key': 'data10', 'rule_key': 'rule7',
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_323')
+        },
+        '326': {
+            'data_key': 'data10', 'rule_key': 'rule7',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_326')
+        },
+        '327': {
+            'data_key': 'data10', 'rule_key': 'rule7',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_327')
+        },
+        '328': {
+            'data_key': 'data10', 'rule_key': 'rule7',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_328')
         }
     }
 

+ 151 - 4
recommend.py

@@ -10,7 +10,7 @@ import config
 from log import Log
 from config import set_config
 from video_recall import PoolRecall
-from video_rank import video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
+from video_rank import video_new_rank2,video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
 from db_helper import RedisHelper
 import gevent
 from utils import FilterVideos, get_user_has30day_return
@@ -336,6 +336,129 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #
     # result['rankResult'] = rank_result
 
+    return result
+    # return rank_result, last_rov_recall_key
+
+def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
+                    expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
+                    no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
+                    shield_config=None):
+    """
+    首页线上推荐逻辑
+    :param request_id: request_id
+    :param mid: mid type-string
+    :param uid: uid type-string
+    :param size: 请求视频数量 type-int
+    :param top_K: 保证topK为召回池视频 type-int
+    :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+    :param app_type: 产品标识  type-int
+    :param algo_type: 算法类型  type-string
+    :param client_info: 用户位置信息 {"country": "国家",  "province": "省份",  "city": "城市"}
+    :param expire_time: 末位视频记录redis过期时间
+    :param ab_code: AB实验code
+    :param video_id: 相关推荐头部视频id
+    :param params:
+    :return:
+    """
+    result = {}
+    # ####### 多进程召回
+    start_recall = time.time()
+    # log_.info('====== recall')
+    recall_result_list = []
+    pool_recall = PoolRecall(request_id=request_id,
+                             app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
+                             client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
+
+    if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
+        if ab_code == 60054:
+            t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
+    else:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
+        if ab_code == 60054:
+            t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
+
+    gevent.joinall(t)
+    recall_result_list = [i.get() for i in t]
+
+    #print(recall_result_list)
+    if len(recall_result_list)<0:
+        result['recallResult']= []
+        result['rankResult'] = []
+        return result
+    if ab_code == 60054:
+        rov_pool_recall = []
+        if len(recall_result_list)>=4:
+            region_recall = recall_result_list[0]
+            sim_recall = recall_result_list[3]
+            #print("sim:",sim_recall)
+            now_video_ids = set('')
+            if len(region_recall)>0:
+                for video in region_recall:
+                    video_id = video.get('videoId')
+                    if video_id not in now_video_ids:
+                        rov_pool_recall.append(video)
+                        now_video_ids.add(video_id)
+            if len(sim_recall)>0:
+                for video in sim_recall:
+                    video_id = video.get('videoId')
+                    #print("sim video_id:", video_id)
+                    if video_id not in now_video_ids:
+                        rov_pool_recall.append(video)
+                        now_video_ids.add(video_id)
+            if len(rov_pool_recall)>0:
+                recall_result_list[0] = rov_pool_recall
+
+    result['recallResult'] = recall_result_list
+    result['recallTime'] = (time.time() - start_recall) * 1000
+
+    # ####### 排序
+    start_rank = time.time()
+    # log_.info('====== rank')
+    if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
+        if ab_code in [
+            config_.AB_CODE['rov_rank_appType_18_19'],
+            config_.AB_CODE['rov_rank_appType_19'],
+            config_.AB_CODE['top_video_relevant_appType_19']
+        ]:
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': recall_result_list[1]
+            }
+        else:
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': []
+            }
+    else:
+        if recall_result_list[1]:
+            redis_helper = RedisHelper()
+            quick_flow_pool_P = redis_helper.get_data_from_redis(
+                key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
+            )
+            if quick_flow_pool_P:
+                flow_pool_P = quick_flow_pool_P
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': recall_result_list[1]
+            }
+        else:
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': recall_result_list[2]
+            }
+    #if ab_code=="ab_new_test":
+    #print("before data:", data)
+    rank_result = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code)
+    #print(rank_result)
+
+    result['rankResult'] = rank_result
+    result['rankTime'] = (time.time() - start_rank) * 1000
+
+
     return result
     # return rank_result, last_rov_recall_key
 
@@ -394,6 +517,8 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
              gevent.spawn(pool_recall.get_selected_recall, size, region_code),
              gevent.spawn(pool_recall.get_no_selected_recall, size, region_code)
              ]
+        if ab_code == 60049:
+            t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall))
     else:
         t = [
              gevent.spawn(pool_recall.get_region_hour_recall, size, region_code),
@@ -1351,6 +1476,15 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                                  rule_key=rule_key, data_key=data_key,
                                  no_op_flag=no_op_flag, old_video_index=old_video_index,
                                  params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+    elif ab_code == 60052 or ab_code == 60053 or ab_code == 60054:
+        result = video_old_recommend(request_id=request_id,
+                                     mid=mid, uid=uid, app_type=app_type,
+                                     size=size, top_K=top_K, flow_pool_P=flow_pool_P,
+                                     algo_type='', client_info=client_info,
+                                     ab_code=ab_code, expire_time=expire_time,
+                                     rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                                     old_video_index=old_video_index, video_id= None,
+                                     params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
     else:
         result = video_recommend(request_id=request_id,
                              mid=mid, uid=uid, app_type=app_type,
@@ -1384,6 +1518,8 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
         update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     elif ab_code == 60050 or  ab_code == 60051:
         update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    elif ab_code == 60052 or ab_code == 60053 or ab_code == 60054:
+        update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     else:
         update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     # log_.info({
@@ -1466,6 +1602,15 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                                  rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
                                  old_video_index=old_video_index, video_id=video_id,
                                  params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+    elif ab_code == 60052 or ab_code == 60053 or ab_code == 60054:
+        result = video_old_recommend(request_id=request_id,
+                                 mid=mid, uid=uid, app_type=app_type,
+                                 size=size, top_K=top_K, flow_pool_P=flow_pool_P,
+                                 algo_type='', client_info=client_info,
+                                 ab_code=ab_code, expire_time=expire_time,
+                                 rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                                 old_video_index=old_video_index, video_id=video_id,
+                                 params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
     else:
         result = video_recommend(request_id=request_id,
                              mid=mid, uid=uid, app_type=app_type,
@@ -1497,11 +1642,13 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     # redis数据刷新
     update_redis_st = time.time()
     if ab_code == 60047 or ab_code == 60048 or  ab_code == 60049:
-        update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+         update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     elif ab_code == 60050 or ab_code == 60051:
-        update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+         update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    elif ab_code == 60052 or ab_code == 60053 or ab_code == 60054:
+         update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     else:
-        update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+         update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,

+ 78 - 0
video_rank.py

@@ -592,6 +592,84 @@ def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_vi
     return new_rank_result[:size]
 
 
+def video_new_rank2(data, size, top_K, flow_pool_P, ab_code):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :return: rank_result
+        """
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
+        return []
+
+    redisObj = RedisHelper()
+    vidKeys = []
+    recall_list = []
+    pre_str = "k_p:"
+    if ab_code==60053 or ab_code==60054:
+        pre_str = "k_p2:"
+    #print("pre_str:", pre_str)
+    for recall_item in data['rov_pool_recall']:
+        if len(recall_item)<=0:
+            continue
+        vid = recall_item.get("videoId",0)
+        vidKeys.append(pre_str+ str(vid))
+        recall_list.append(recall_item)
+    #print("vidKeys:", vidKeys)
+    video_scores = redisObj.get_batch_key(vidKeys)
+    #print("video_score:",video_scores)
+    for i in range(len(video_scores)):
+         try:
+            # print(video_scores[i])
+            if video_scores[i] is None:
+                recall_list[i]['sort_score']= 0.0
+            else:
+                video_score_str = json.loads(video_scores[i])
+                #print("video_score_str:", video_score_str)
+                recall_list[i]['sort_score'] = video_score_str[0]
+         except Exception :
+                recall_list[i]['sort_score'] = 0.0
+    #sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
+    rov_recall_rank =sorted(recall_list, key=lambda k: k.get('sort_score', 0), reverse=True)
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
+                                                     top_K=top_K)
+    rank_result = []
+    rank_set = set('')
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+        # 按概率 p 及score排序获取 size - k 个视频
+    i = 0
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            if flow_recall_rank:
+                rank_result.append(flow_recall_rank[0])
+                flow_recall_rank.remove(flow_recall_rank[0])
+            else:
+                rank_result.extend(rov_recall_rank[:size - top_K - i])
+                return rank_result[:size]
+        else:
+            if rov_recall_rank:
+                rank_result.append(rov_recall_rank[0])
+                rov_recall_rank.remove(rov_recall_rank[0])
+            else:
+                rank_result.extend(flow_recall_rank[:size - top_K - i])
+                return rank_result[:size]
+        i += 1
+    return rank_result[:size]
+
+
 if __name__ == '__main__':
     d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
               {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},

+ 37 - 0
video_recall.py

@@ -2124,6 +2124,43 @@ class PoolRecall(object):
                 )
         return recall_result[:200]
 
+    def get_sim_hot_item_reall_filter(self):
+        if self.video_id is None:
+            return  []
+        recall_key = "sim_hot_" + str(self.video_id)
+        # print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+
+        # print(data)
+        recall_result = []
+        recall_dict  = {}
+        video_ids = []
+        if data is not None:
+            json_result = json.loads(data)
+            # print("json_result:", json_result)
+            for per_item in json_result:
+                try:
+                    vid = int(per_item[0])
+                    video_ids.append(vid)
+                    recall_dict[vid] = {'videoId': per_item[0], 'flowPool': '',
+                         'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
+                         'abCode': self.ab_code}
+                except Exception as e:
+                    continue
+        if len(video_ids)<=0:
+            return  recall_result
+        video_ids = video_ids[:50]
+        #print(video_ids)
+        filter_ = FilterVideos(request_id=self.request_id,
+                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+        filtered_viewed_videos = filter_.filter_videos(pool_type='rov')
+        if filtered_viewed_videos is None:
+            return recall_result
+        #print("filtered_viewed_videos:", filtered_viewed_videos)
+        for vid in filtered_viewed_videos:
+            if vid in recall_dict:
+                recall_result.append(recall_dict[vid])
+        return recall_result
     # get region_hour_recall
     def get_region_hour_recall(self, size=4, region_code='-1'):
         pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H