Browse Source

update user2tag

linfan 1 year ago
parent
commit
7e78d3b3e3
4 changed files with 264 additions and 14 deletions
  1. 6 0
      config.py
  2. 15 8
      recommend.py
  3. 194 4
      video_rank.py
  4. 49 2
      video_recall.py

+ 6 - 0
config.py

@@ -158,6 +158,7 @@ class BaseConfig(object):
             'abtest_336': 60059,
             'abtest_337': 60060,
             'abtest_338': 60061,
+            'abtest_339': 60062,
         },  # 地域分组小时级规则实验
 
         'rank_by_24h': {
@@ -389,6 +390,10 @@ class BaseConfig(object):
         '338': {
             'data_key': 'data10', 'rule_key': 'rule7',
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_338')
+        },
+        '339': {
+            'data_key': 'data10', 'rule_key': 'rule7',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_339')
         }
     }
 
@@ -471,6 +476,7 @@ class BaseConfig(object):
         'hot_recall': 'hot_recall',  # hot召回
         'w2v_recall': 'w2v_recall', #word2vec
         'u2i_tag_recall':'u2i_tag_recall', #u2i_tag_recall
+        'u2u2i_recall':'u2u2i_recall', #u2u2i_recall
     }
 
     # category id mapping

+ 15 - 8
recommend.py

@@ -207,6 +207,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
         exp_config = pool_recall.get_test_config()
     elif ab_code == 60061:
         exp_config = pool_recall.get_simrecall_config()
+    elif ab_code == 60062:
+        exp_config = pool_recall.get_u2u2i_config()
     #print("exp_config:", exp_config)
     if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time, ab_code, exp_config)]
@@ -216,6 +218,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
             t.append(gevent.spawn(pool_recall.get_word2vec_item_reall, exp_config))
         elif  ab_code==60061:
             t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter, exp_config))
+        elif  ab_code==60062:
+            t.append(gevent.spawn(pool_recall.get_U2U2I_reall, exp_config))
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time, ab_code, exp_config),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
@@ -226,6 +230,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
             t.append(gevent.spawn(pool_recall.get_word2vec_item_reall, exp_config))
         elif ab_code == 60061:
             t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter, exp_config))
+        elif ab_code == 60062:
+            t.append(gevent.spawn(pool_recall.get_U2U2I_reall, exp_config))
 
     # 最惊奇相关推荐实验
     # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
@@ -306,27 +312,28 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                 'rov_pool_recall': recall_result_list[0],
                 'flow_pool_recall': recall_result_list[2]
             }
-
+    data['u2i_recall'] = []
+    data['w2v_recall'] = []
+    data['sim_recall'] = []
+    data['u2u2i_recall'] = []
     if ab_code == 60058:
         if len(recall_result_list)>=4:
             data['u2i_recall'] = recall_result_list[3]
-        else:
-            data['u2i_recall'] = []
     elif ab_code == 60059:
         if len(recall_result_list)>=4:
             data['w2v_recall'] = recall_result_list[3]
-        else:
-            data['w2v_recall'] = []
     elif ab_code == 60061:
         if len(recall_result_list)>=4:
             data['sim_recall'] = recall_result_list[3]
-        else:
-            data['sim_recall'] = []
+    elif ab_code == 60062:
+        if len(recall_result_list)>=4:
+            data['u2u2i_recall'] = recall_result_list[3]
     #if ab_code=="ab_new_test":
     #    rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
     #else:
     #print("data['hot_recall']", data['hot_recall'])
-    if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061 :
+    # 60058: u2itag, 60059:word2vec, 60061: sim_recall, 60062: u2u2i
+    if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061 or ab_code == 60062:
         rank_result, flow_num = video_sanke_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=exp_config)
         result['flow_num'] = flow_num
         if rank_result:

+ 194 - 4
video_rank.py

@@ -719,10 +719,9 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
     :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
     :return: rank_result
     """
-    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
-            # and not data['u2i_recall'] and not data['u2i_recall'] \
-            # and not data['w2v_recall'] and not data['w2v_recall'] \
-            # and not data['sim_recall'] and not data['sim_recall']:
+    if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
+        and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
+        and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
         return [], 0
     # 地域分组小时级规则更新数据
     recall_dict = {}
@@ -749,6 +748,7 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
     hot_recall = []
     w2v_recall =[]
     sim_recall = []
+    u2u2i_recall = []
     if ab_Code==60058:
         if len(data['u2i_recall'])>0:
             hot_recall = sorted(data['u2i_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
@@ -763,6 +763,11 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
             recall_dict['sim_recall'] = data['sim_recall']
         else:
             recall_dict['sim_recall'] = sim_recall
+    elif ab_Code==60062:
+        if len(data['u2u2i_recall'])>0:
+            recall_dict['u2u2i_recall'] = data['u2u2i_recall']
+        else:
+            recall_dict['u2u2i_recall'] = u2u2i_recall
 
     recall_list = [('rov_recall_region_h',1, 1),('rov_recall_region_h',0.5, 1),('rov_recall_region_24h',1,1),
                    ('u2i_recall',0.5,1), ('w2v_recall',0.5,1),('rov_recall_24h',1,1), ('rov_recall_24h_dup',0.5,1)]
@@ -875,6 +880,191 @@ def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None
             i += 1
     return rank_result[:size], flow_num
 
+def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
+    """
+    视频分发排序
+    :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+    :param size: 请求数
+    :param top_K: 保证topK为召回池视频 type-int
+    :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+    :return: rank_result
+    """
+    if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
+        and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
+        and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
+        return [], 0
+    # 地域分组小时级规则更新数据
+    recall_dict = {}
+    region_h_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
+    region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_region_h'] = region_h_recall_rank
+    # 地域分组小时级更新24h规则更新数据
+    region_24h_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
+    region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
+
+    # 相对24h规则更新数据
+    rule_24h_recall = [item for item in data['rov_pool_recall']
+                       if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
+    rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_24h'] = rule_24h_recall_rank
+    # 相对24h规则筛选后剩余更新数据
+    rule_24h_dup_recall = [item for item in data['rov_pool_recall']
+                           if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
+    rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
+    u2i_recall = []
+    w2v_recall =[]
+    sim_recall = []
+    u2u2i_recall = []
+    if ab_Code==60058:
+        if len(data['u2i_recall'])>0:
+            u2i_recall = sorted(data['u2i_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+        recall_dict['u2i_recall'] = u2i_recall
+    elif ab_Code==60059:
+        if len(data['w2v_recall'])>0:
+            recall_dict['w2v_recall'] = data['w2v_recall']
+        else:
+            recall_dict['w2v_recall'] = w2v_recall
+    elif ab_Code==60061:
+        if len(data['sim_recall'])>0:
+            recall_dict['sim_recall'] = data['sim_recall']
+        else:
+            recall_dict['sim_recall'] = sim_recall
+    elif ab_Code==60062:
+        if len(data['u2u2i_recall'])>0:
+            recall_dict['u2u2i_recall'] = data['u2u2i_recall']
+        else:
+            recall_dict['u2u2i_recall'] = u2u2i_recall
+
+    recall_pos1 = [('rov_recall_region_h',0.98),('rov_recall_24h',0.2),('rov_recall_region_24h',1),
+                   ('rov_recall_24h',1),('rov_recall_24h_dup',1)]
+    recall_pos2 =  [('rov_recall_region_h',0.98),('rov_recall_24h',0.2),('rov_recall_region_24h',1),
+                   ('rov_recall_24h',1),('rov_recall_24h_dup',1)]
+    recall_pos3 = [('rov_recall_region_h', 0.98), ('rov_recall_24h', 0.2), ('rov_recall_region_24h', 1),
+                   ('rov_recall_24h', 1), ('rov_recall_24h_dup', 1)]
+    recall_pos4 = [('rov_recall_region_h', 0.98), ('rov_recall_24h', 0.2), ('rov_recall_region_24h', 1),
+                   ('rov_recall_24h', 1), ('rov_recall_24h_dup', 1)]
+    if exp_config  and exp_config['recall_list']:
+        recall_pos1 = exp_config['recall_pos1']
+        recall_pos2 = exp_config['recall_pos2']
+        recall_pos3 = exp_config['recall_pos3']
+        recall_pos4 = exp_config['recall_pos4']
+    #print("recall_config:", recall_list)
+    rov_recall_rank = []
+    recall_list = []
+    recall_list.append(recall_pos1, recall_pos2, recall_pos3,recall_pos4)
+    select_ids = set('')
+    for j in range(3):
+        if len(rov_recall_rank)>12:
+            break
+        for recall_pos_config in recall_list:
+            rand_num = random.random()
+            for per_recall_item in recall_pos_config:
+                per_recall_name = per_recall_item[0]
+                per_recall_freq = per_recall_item[1]
+                if rand_num < per_recall_freq and per_recall_name in recall_dict
+                    per_recall = recall_dict[per_recall_name]
+                    for recall_item in per_recall:
+                        vid = recall_item['videoId']
+                        if vid in select_ids:
+                            continue
+                        rov_recall_rank.append(recall_item)
+                        select_ids.add(vid)
+                        break
+
+    if len(rov_recall_rank)<4:
+        rov_doudi_rank = region_h_recall_rank + sim_recall + u2i_recall + u2u2i_recall + w2v_recall + region_24h_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank
+        for recall_item in rov_doudi_rank:
+            vid = recall_item['videoId']
+            if vid in select_ids:
+                continue
+            rov_recall_rank.append(recall_item)
+            select_ids.add(vid)
+            if len(rov_recall_rank)>12:
+                break
+    # print("rov_recall_rank:")
+    print(rov_recall_rank)
+    # 流量池
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 对各路召回的视频进行去重
+    rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
+                                                         top_K=top_K)
+    # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
+    #     rov_recall_rank, flow_recall_rank))
+
+    # rank_result = relevant_recall_rank
+    rank_result = []
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+    flow_num = 0
+    flowConfig =0
+    if exp_config and exp_config['flowConfig']:
+        flowConfig = exp_config['flowConfig']
+    if flowConfig == 1 and len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        for recall_item in rank_result:
+            flow_recall_name = recall_item.get("flowPool", '')
+            if flow_recall_name is not None and flow_recall_name.find("#") > -1:
+                flow_num = flow_num + 1
+            all_recall_rank = rov_recall_rank + flow_recall_rank
+            if flow_num > 0:
+                rank_result.extend(all_recall_rank[:size - top_K])
+                return rank_result[:size], flow_num
+            else:
+                # 按概率 p 及score排序获取 size - k 个视频
+                i = 0
+                while i < size - top_K:
+                    # 随机生成[0, 1)浮点数
+                    rand = random.random()
+                    # log_.info('rand: {}'.format(rand))
+                    if rand < flow_pool_P:
+                        if flow_recall_rank:
+                            rank_result.append(flow_recall_rank[0])
+                            flow_recall_rank.remove(flow_recall_rank[0])
+                        else:
+                            rank_result.extend(rov_recall_rank[:size - top_K - i])
+                            return rank_result[:size], flow_num
+                    else:
+                        if rov_recall_rank:
+                            rank_result.append(rov_recall_rank[0])
+                            rov_recall_rank.remove(rov_recall_rank[0])
+                        else:
+                            rank_result.extend(flow_recall_rank[:size - top_K - i])
+                            return rank_result[:size], flow_num
+                    i += 1
+    else:
+        # 按概率 p 及score排序获取 size - k 个视频
+        i = 0
+        while i < size - top_K:
+            # 随机生成[0, 1)浮点数
+            rand = random.random()
+            # log_.info('rand: {}'.format(rand))
+            if rand < flow_pool_P:
+                if flow_recall_rank:
+                    rank_result.append(flow_recall_rank[0])
+                    flow_recall_rank.remove(flow_recall_rank[0])
+                else:
+                    rank_result.extend(rov_recall_rank[:size - top_K - i])
+                    return rank_result[:size], flow_num
+            else:
+                if rov_recall_rank:
+                    rank_result.append(rov_recall_rank[0])
+                    rov_recall_rank.remove(rov_recall_rank[0])
+                else:
+                    rank_result.extend(flow_recall_rank[:size - top_K - i])
+                    return rank_result[:size],flow_num
+            i += 1
+    return rank_result[:size], flow_num
+
 
 
 if __name__ == '__main__':

+ 49 - 2
video_recall.py

@@ -2615,8 +2615,8 @@ class PoolRecall(object):
         else:
             return None
 
-    def get_simrecall_config(self):
-        recall_key = "simrecall_exp_config"
+    def get_u2u2i_config(self):
+        recall_key = "u2u2i_exp_config"
         data = self.redis_helper.get_data_from_redis(key_name=recall_key)
         if data is not None:
             try:
@@ -2737,4 +2737,51 @@ class PoolRecall(object):
             if vid in recall_dict:
                 recall_result.append(recall_dict[vid])
         #print("u2i recall_result:", recall_result)
+        return recall_result
+
+    def get_U2U2I_reall(self, mid, exp_config=None):
+        #recall_key = "hot_video:"
+        if not mid:
+            return  []
+        recall_key = "u2u2i:"+mid
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+
+        #print(data)
+        recall_result = []
+        recall_dict  = {}
+        video_ids = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                try:
+                    vid = int(per_item[0])
+                    video_ids.append(vid)
+                    recall_dict[vid] = {'videoId': vid, 'flowPool': '',
+                         'rovScore': float(per_item[1]), 'pushFrom': config_.PUSH_FROM['u2u2i_recall'],
+                         'abCode': self.ab_code}
+                except Exception as e:
+                    continue
+        if len(video_ids)<=0:
+            return  recall_result
+        recall_num = 20
+        try:
+            if exp_config and exp_config['recall_get_num']:
+                recall_num = int(exp_config['recall_get_num'])
+        except:
+            recall_num = 20
+        #print("recall_num:", recall_num)
+        video_ids = video_ids[:recall_num]
+        #print(video_ids)
+        filter_ = FilterVideos(request_id=self.request_id,
+                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
+        if filtered_viewed_videos is None:
+            return recall_result
+        #print("filtered_viewed_videos:", filtered_viewed_videos)
+        for vid in filtered_viewed_videos:
+            if vid in recall_dict:
+                recall_result.append(recall_dict[vid])
+        #print("u2i recall_result:", recall_result)
         return recall_result