Browse Source

update recall

linfan 1 year ago
parent
commit
c95311c83e
4 changed files with 184 additions and 3 deletions
  1. 10 0
      config.py
  2. 18 2
      recommend.py
  3. 117 0
      video_rank.py
  4. 39 1
      video_recall.py

+ 10 - 0
config.py

@@ -154,6 +154,8 @@ class BaseConfig(object):
             'abtest_332': 60055,
             'abtest_332': 60055,
             'abtest_333': 60056,
             'abtest_333': 60056,
             'abtest_334': 60057,
             'abtest_334': 60057,
+            'abtest_335': 60058,
+            'abtest_336': 60059,
         },  # 地域分组小时级规则实验
         },  # 地域分组小时级规则实验
 
 
         'rank_by_24h': {
         'rank_by_24h': {
@@ -369,6 +371,14 @@ class BaseConfig(object):
         '334': {
         '334': {
             'data_key': 'data10', 'rule_key': 'rule7',
             'data_key': 'data10', 'rule_key': 'rule7',
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_334')
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_334')
+        },
+        '335': {
+            'data_key': 'data10', 'rule_key': 'rule7',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_335')
+        },
+        '336': {
+            'data_key': 'data10', 'rule_key': 'rule7',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_336')
         }
         }
     }
     }
 
 

+ 18 - 2
recommend.py

@@ -10,7 +10,7 @@ import config
 from log import Log
 from log import Log
 from config import set_config
 from config import set_config
 from video_recall import PoolRecall
 from video_recall import PoolRecall
-from video_rank import video_new_rank2,video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
+from video_rank import video_new_rank2, video_sanke_rank,video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
 from db_helper import RedisHelper
 from db_helper import RedisHelper
 import gevent
 import gevent
 from utils import FilterVideos, get_user_has30day_return
 from utils import FilterVideos, get_user_has30day_return
@@ -197,10 +197,18 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
 
 
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
+        if ab_code==60058:
+            t.append(gevent.spawn(pool_recall.get_hot_item_reall))
+        elif  ab_code==60059:
+            t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
     else:
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
              gevent.spawn(pool_recall.flow_pool_recall, size)]
+        if ab_code==60058:
+            t.append(gevent.spawn(pool_recall.get_hot_item_reall))
+        elif ab_code == 60059:
+            t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
 
 
     # 最惊奇相关推荐实验
     # 最惊奇相关推荐实验
     # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
     # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
@@ -281,10 +289,18 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                 'rov_pool_recall': recall_result_list[0],
                 'rov_pool_recall': recall_result_list[0],
                 'flow_pool_recall': recall_result_list[2]
                 'flow_pool_recall': recall_result_list[2]
             }
             }
+
+    if ab_code == 60058:
+        data['hot_recall'] = recall_result_list[3]
+    elif ab_code == 60059:
+        data['w2v_recall'] = recall_result_list[3]
     #if ab_code=="ab_new_test":
     #if ab_code=="ab_new_test":
     #    rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
     #    rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
     #else:
     #else:
-    rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+    if ab_code == 60058 or ab_code == 60059:
+        rank_result = video_sanke_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code)
+    else:
+        rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
 
 
     # 老视频实验
     # 老视频实验
     # if ab_code in [config_.AB_CODE['old_video']]:
     # if ab_code in [config_.AB_CODE['old_video']]:

+ 117 - 0
video_rank.py

@@ -676,6 +676,123 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code):
         i += 1
         i += 1
     return rank_result[:size]
     return rank_result[:size]
 
 
+def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code=''):
+    """
+    视频分发排序
+    :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+    :param size: 请求数
+    :param top_K: 保证topK为召回池视频 type-int
+    :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+    :return: rank_result
+    """
+    if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
+            and not data['hot_rcall'] and not data['hot_rcall']:
+        return []
+    # 地域分组小时级规则更新数据
+    recall_dict = {}
+    region_h_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
+    region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_region_h'] = region_h_recall_rank
+    # 地域分组小时级更新24h规则更新数据
+    region_24h_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
+    region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
+
+    # 相对24h规则更新数据
+    rule_24h_recall = [item for item in data['rov_pool_recall']
+                       if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
+    rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_24h'] = rule_24h_recall_rank
+    # 相对24h规则筛选后剩余更新数据
+    rule_24h_dup_recall = [item for item in data['rov_pool_recall']
+                           if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
+    rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
+    hot_recall = []
+    w2v_recall =[]
+
+    if ab_Code=='60058':
+        if len(data['hot_recall'])>0:
+            hot_recall = sorted(data['hot_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+            recall_dict['hot_recall'] = hot_recall
+    elif ab_Code=='60059':
+        if len(data['w2v_recall']>0):
+           w2v_recall = sorted(data['w2v_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+           recall_dict['w2v_recall'] = w2v_recall
+
+    recall_list = [('rov_recall_region_h',1, 1),('hot_recall',0.5,1), ('w2v_recall',0.5,1),
+                   ('rov_recall_region_h',0.5, 1),('rov_recall_region_24h',1,1),
+                   ('rov_recall_24h',1,1), ('rov_recall_24h_dup',0.5,1)]
+    rov_recall_rank = []
+    select_ids = set('')
+    for i in  range(3):
+        if len(rov_recall_rank)>8:
+            break
+        for per_recall_item in recall_list:
+            per_recall_name =  per_recall_item[0]
+            per_recall_freq = per_recall_item[1]
+            per_limt_num =  per_recall_item[2]
+            rand_num = random.random()
+            if rand_num<per_recall_freq and per_recall_name in recall_dict:
+                per_recall = recall_dict[per_recall_name]
+                cur_recall_num = 0
+                for recall_item in per_recall:
+                    vid = recall_item['videoId']
+                    if vid in select_ids:
+                        continue
+                    rov_recall_rank.append(recall_item)
+                    select_ids.add(vid)
+                    cur_recall_num+=1
+                    if cur_recall_num>=per_limt_num:
+                        break
+
+    #rov_recall_rank = region_h_recall_rank + region_24h_recall_rank + \
+    #                  rule_24h_recall_rank + rule_24h_dup_recall_rank
+    # 流量池
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 对各路召回的视频进行去重
+    rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
+                                                         top_K=top_K)
+    # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
+    #     rov_recall_rank, flow_recall_rank))
+
+    # rank_result = relevant_recall_rank
+    rank_result = []
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+
+    # 按概率 p 及score排序获取 size - k 个视频
+    i = 0
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            if flow_recall_rank:
+                rank_result.append(flow_recall_rank[0])
+                flow_recall_rank.remove(flow_recall_rank[0])
+            else:
+                rank_result.extend(rov_recall_rank[:size - top_K - i])
+                return rank_result[:size]
+        else:
+            if rov_recall_rank:
+                rank_result.append(rov_recall_rank[0])
+                rov_recall_rank.remove(rov_recall_rank[0])
+            else:
+                rank_result.extend(flow_recall_rank[:size - top_K - i])
+                return rank_result[:size]
+        i += 1
+    return rank_result[:size]
+
+
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
     d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
     d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},

+ 39 - 1
video_recall.py

@@ -2476,4 +2476,42 @@ class PoolRecall(object):
         for vid in filtered_viewed_videos:
         for vid in filtered_viewed_videos:
             if vid in recall_dict:
             if vid in recall_dict:
                 recall_result.append(recall_dict[vid])
                 recall_result.append(recall_dict[vid])
-        return recall_result
+        return recall_result
+
+    def get_word2vec_item_reall(self):
+        if self.video_id is None:
+            return  []
+        recall_key = "w2v:" + str(self.video_id)
+        print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+
+        print(data)
+        recall_result = []
+        recall_dict  = {}
+        video_ids = []
+        if data is not None:
+            json_result = json.loads(data)
+            # print("json_result:", json_result)
+            for per_item in json_result:
+                try:
+                    vid = int(per_item[0])
+                    video_ids.append(vid)
+                    recall_dict[vid] = {'videoId': vid, 'flowPool': '',
+                         'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
+                         'abCode': self.ab_code}
+                except Exception as e:
+                    continue
+        if len(video_ids)<=0:
+            return  recall_result
+        video_ids = video_ids[:50]
+        #print(video_ids)
+        filter_ = FilterVideos(request_id=self.request_id,
+                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+        filtered_viewed_videos = filter_.filter_videos(pool_type='rov')
+        if filtered_viewed_videos is None:
+            return recall_result
+        print("filtered_viewed_videos:", filtered_viewed_videos)
+        for vid in filtered_viewed_videos:
+            if vid in recall_dict:
+                recall_result.append(recall_dict[vid])
+        return recall_result[:30]