linfan пре 2 година
родитељ
комит
c920a63f95
5 измењених фајлова са 102 додато и 3 уклоњено
  1. 1 0
      config.py
  2. 4 0
      db_helper.py
  3. 8 3
      recommend.py
  4. 71 0
      video_rank.py
  5. 18 0
      video_recall.py

+ 1 - 0
config.py

@@ -386,6 +386,7 @@ class BaseConfig(object):
         'talk_videos': 'talk_videos',  # 影视解说
         'talk_videos': 'talk_videos',  # 影视解说
         'special_mid': 'special_mid_videos',  # 特殊mid指定视频
         'special_mid': 'special_mid_videos',  # 特殊mid指定视频
         'rov_recall_30day': 'recall_pool_30day',  # 天级更新相对30天列表
         'rov_recall_30day': 'recall_pool_30day',  # 天级更新相对30天列表
+        'sim_hot_vid_recall': 'sim_hot_vid_recall',  # 相似视频召回
     }
     }
 
 
     # category id mapping
     # category id mapping

+ 4 - 0
db_helper.py

@@ -418,6 +418,10 @@ class RedisHelper(object):
         #         'executeTime': (time.time() - start_time) * 1000
         #         'executeTime': (time.time() - start_time) * 1000
         #     })
         #     })
 
 
+    def get_batch_key(self, name_list):
+        conn = self.connect()
+        res = conn.mget(name_list)
+        return  res
 
 
 #hologres_info = config_.HOLOGRES_INFO
 #hologres_info = config_.HOLOGRES_INFO
 #conn = psycopg2.connect(**hologres_info)
 #conn = psycopg2.connect(**hologres_info)

+ 8 - 3
recommend.py

@@ -10,7 +10,7 @@ import config
 from log import Log
 from log import Log
 from config import set_config
 from config import set_config
 from video_recall import PoolRecall
 from video_recall import PoolRecall
-from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
+from video_rank import video_new_rank,video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
 from db_helper import RedisHelper
 from db_helper import RedisHelper
 import gevent
 import gevent
 from utils import FilterVideos, get_user_has30day_return
 from utils import FilterVideos, get_user_has30day_return
@@ -200,7 +200,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     else:
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
              gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
-             gevent.spawn(pool_recall.flow_pool_recall, size)]
+             gevent.spawn(pool_recall.flow_pool_recall, size),
+             gevent.spawn(pool_recall.get_sim_hot_item_reall, size)]
 
 
     # 最惊奇相关推荐实验
     # 最惊奇相关推荐实验
     # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
     # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
@@ -281,7 +282,10 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                 'rov_pool_recall': recall_result_list[0],
                 'rov_pool_recall': recall_result_list[0],
                 'flow_pool_recall': recall_result_list[2]
                 'flow_pool_recall': recall_result_list[2]
             }
             }
-    rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+    if ab_code=="ab_new_test":
+        rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+    else:
+        rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
 
 
     # 老视频实验
     # 老视频实验
     # if ab_code in [config_.AB_CODE['old_video']]:
     # if ab_code in [config_.AB_CODE['old_video']]:
@@ -298,6 +302,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #     'rank_result': rank_result,
     #     'rank_result': rank_result,
     #     'executeTime': (time.time() - start_rank) * 1000
     #     'executeTime': (time.time() - start_rank) * 1000
     # })
     # })
+
     result['rankResult'] = rank_result
     result['rankResult'] = rank_result
     result['rankTime'] = (time.time() - start_rank) * 1000
     result['rankTime'] = (time.time() - start_rank) * 1000
 
 

+ 71 - 0
video_rank.py

@@ -1,3 +1,4 @@
+import json
 import random
 import random
 import numpy
 import numpy
 
 
@@ -150,6 +151,76 @@ def video_rank(data, size, top_K, flow_pool_P):
     return rank_result[:size]
     return rank_result[:size]
 
 
 
 
+def video_new_rank(data, size, top_K, flow_pool_P):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :return: rank_result
+        """
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
+        return []
+
+    redisObj = RedisHelper()
+    video_id_list = []
+    for d in data:
+        video_id_list.append(d.get('rovScore', '0'))
+    video_scores = redisObj.get_batch_key(video_id_list)
+    video_items = []
+    for i in range(len(video_id_list)):
+        try:
+            video_score_str = json.load(video_scores[i])
+            video_items.append((video_id_list[i], video_score_str[0]))
+        except Exception:
+            video_items.append((video_id_list[i], 0.0))
+    sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
+    rov_recall_rank = sort_items
+    # 流量池
+    flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 对各路召回的视频进行去重
+    rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
+                                                         top_K=top_K)
+    # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
+    #     rov_recall_rank, flow_recall_rank))
+
+    # rank_result = relevant_recall_rank
+    rank_result = []
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(flow_recall_rank[:top_K])
+        flow_recall_rank = flow_recall_rank[top_K:]
+
+    # 按概率 p 及score排序获取 size - k 个视频
+    i = 0
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            if flow_recall_rank:
+                rank_result.append(flow_recall_rank[0])
+                flow_recall_rank.remove(flow_recall_rank[0])
+            else:
+                rank_result.extend(rov_recall_rank[:size - top_K - i])
+                return rank_result[:size]
+        else:
+            if rov_recall_rank:
+                rank_result.append(rov_recall_rank[0])
+                rov_recall_rank.remove(rov_recall_rank[0])
+            else:
+                rank_result.extend(flow_recall_rank[:size - top_K - i])
+                return rank_result[:size]
+        i += 1
+    return rank_result[:size]
+
+
+
 def remove_duplicate(rov_recall, flow_recall, top_K):
 def remove_duplicate(rov_recall, flow_recall, top_K):
     """
     """
     对多路召回的视频去重
     对多路召回的视频去重

+ 18 - 0
video_recall.py

@@ -8,6 +8,7 @@ from db_helper import RedisHelper
 from config import set_config
 from config import set_config
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count, send_msg_to_feishu
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count, send_msg_to_feishu
 import gevent
 import gevent
+import  json
 
 
 log_ = Log()
 log_ = Log()
 config_ = set_config()
 config_ = set_config()
@@ -1256,6 +1257,7 @@ class PoolRecall(object):
             ]
             ]
         else:
         else:
             t = [
             t = [
+                #add recall video
                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
                  # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
                  # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
                  # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
                  # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
@@ -1263,6 +1265,7 @@ class PoolRecall(object):
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+                 #
             ]
             ]
 
 
         gevent.joinall(t)
         gevent.joinall(t)
@@ -2101,3 +2104,18 @@ class PoolRecall(object):
         #     'executeTime': (time.time() - start_time) * 1000
         #     'executeTime': (time.time() - start_time) * 1000
         # })
         # })
         return pool_recall_result[:size]
         return pool_recall_result[:size]
+
+    #linfan
+    def get_sim_hot_item_reall(self, mainVid):
+        recall_key = "sim_hot_"+mainVid
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        json_result =json.load(data)
+        # print(rec_json_list)
+        recall_result = []
+        for per_item in json_result:
+            recall_result.append(
+                {'videoId': per_item[0], 'flowPool': '',
+                 'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
+                 'abCode': self.ab_code}
+            )
+        return recall_result