Quellcode durchsuchen

add flow pool recall log

liqian vor 1 Jahr
Ursprung
Commit
c0a3022d87
3 geänderte Dateien mit 162 neuen und 26 gelöschten Zeilen
  1. 68 11
      recommend.py
  2. 31 10
      video_rank.py
  3. 63 5
      video_recall.py

+ 68 - 11
recommend.py

@@ -303,6 +303,9 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     result['recallResult'] = recall_result_list
     result['recallTime'] = (time.time() - start_recall) * 1000
 
+    # add_flow_pool_recall_log
+    flow_pool_recall_process = {}
+
     # ####### 排序
     start_rank = time.time()
     # log_.info('====== rank')
@@ -314,7 +317,10 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
         ]:
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                'flow_pool_recall': recall_result_list[1]
+                # add_flow_pool_recall_log
+                'flow_pool_recall': recall_result_list[1][0]
+                # 'flow_pool_recall': recall_result_list[1]
+
             }
         else:
             data = {
@@ -322,7 +328,9 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                 'flow_pool_recall': []
             }
     else:
-        if recall_result_list[1]:
+        # add_flow_pool_recall_log
+        if recall_result_list[1][0]:
+        # if recall_result_list[1]:
             redis_helper = RedisHelper()
             quick_flow_pool_P = redis_helper.get_data_from_redis(
                 key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
@@ -331,13 +339,22 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                 flow_pool_P = quick_flow_pool_P
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                'flow_pool_recall': recall_result_list[1]
+                # add_flow_pool_recall_log
+                'flow_pool_recall': recall_result_list[1][0]
+                # 'flow_pool_recall': recall_result_list[1]
             }
+            # add_flow_pool_recall_log
+            flow_pool_recall_process = recall_result_list[1][1]
         else:
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                'flow_pool_recall': recall_result_list[2]
+                # add_flow_pool_recall_log
+                'flow_pool_recall': recall_result_list[2][0]
+                # 'flow_pool_recall': recall_result_list[2]
             }
+            # add_flow_pool_recall_log
+            flow_pool_recall_process = recall_result_list[2][1]
+
     data['u2i_recall'] = []
     data['u2i_play_recall'] = []
     data['w2v_recall'] = []
@@ -370,7 +387,13 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #     if rank_result:
     #         result['rank_num'] = len(rank_result)
     # else:
-    rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+
+    # rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+
+    # add_flow_pool_recall_log
+    rank_result, flow_pool_recall_process = video_rank(data=data, size=size, top_K=top_K,
+                                                       flow_pool_P=float(flow_pool_P),
+                                                       flow_pool_recall_process=flow_pool_recall_process)
 
     # 老视频实验
     # if ab_code in [config_.AB_CODE['old_video']]:
@@ -422,6 +445,10 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #
     # result['rankResult'] = rank_result
 
+    # add_flow_pool_recall_log
+    flow_pool_recall_process['rank_result'] = rank_result
+    result['flow_pool_recall_process'] = flow_pool_recall_process
+
     return result
     # return rank_result, last_rov_recall_key
 
@@ -626,6 +653,10 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     result['recallResult'] = recall_result_list
     result['recallTime'] = (time.time() - start_recall) * 1000
     #print("recall:", recall_result_list)
+
+    # add_flow_pool_recall_log
+    flow_pool_recall_process = {}
+
     # ####### 排序
     start_rank = time.time()
     # log_.info('====== rank')
@@ -637,7 +668,9 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         ]:
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                'flow_pool_recall': recall_result_list[1]
+                # add_flow_pool_recall_log
+                'flow_pool_recall': recall_result_list[1][0]
+                # 'flow_pool_recall': recall_result_list[1]
             }
         else:
             data = {
@@ -645,7 +678,9 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 'flow_pool_recall': []
             }
     else:
-        if recall_result_list[1]:
+        # add_flow_pool_recall_log
+        if recall_result_list[1][0]:
+        # if recall_result_list[1]:
             redis_helper = RedisHelper()
             quick_flow_pool_P = redis_helper.get_data_from_redis(
                 key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
@@ -654,13 +689,22 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                 flow_pool_P = quick_flow_pool_P
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                'flow_pool_recall': recall_result_list[1]
+                # add_flow_pool_recall_log
+                'flow_pool_recall': recall_result_list[1][0]
+                # 'flow_pool_recall': recall_result_list[1]
             }
+            # add_flow_pool_recall_log
+            flow_pool_recall_process = recall_result_list[1][1]
         else:
             data = {
                 'rov_pool_recall': recall_result_list[0],
-                'flow_pool_recall': recall_result_list[2]
+                # add_flow_pool_recall_log
+                'flow_pool_recall': recall_result_list[2][0]
+                # 'flow_pool_recall': recall_result_list[2]
             }
+            # add_flow_pool_recall_log
+            flow_pool_recall_process = recall_result_list[2][1]
+
     # 3. 特征回流
     #
     # for recall_item in data['rov_pool_recall']:
@@ -691,9 +735,17 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         env_json = env_dict
     #4.
     # rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
-    rank_result, flow_num = video_new_rank3(
-        data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix
+
+    # rank_result, flow_num = video_new_rank3(
+    #     data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix
+    # )
+
+    # add_flow_pool_recall_log
+    rank_result, flow_num, flow_pool_recall_process = video_new_rank3(
+        data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
+        flow_pool_recall_process=flow_pool_recall_process
     )
+
     #print(rank_result)
     if rank_result:
         result['rank_num'] = len(rank_result)
@@ -724,6 +776,11 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     result['rankResult'] = rank_result
     result['flow_num'] = flow_num
     result['rankTime'] = (time.time() - start_rank) * 1000
+
+    # add_flow_pool_recall_log
+    flow_pool_recall_process['rank_result'] = rank_result
+    result['flow_pool_recall_process'] = flow_pool_recall_process
+
     return result, env_json
     # return rank_result, last_rov_recall_key
 

+ 31 - 10
video_rank.py

@@ -13,7 +13,7 @@ log_ = Log()
 config_ = set_config()
 
 
-def video_rank(data, size, top_K, flow_pool_P):
+def video_rank(data, size, top_K, flow_pool_P, flow_pool_recall_process=None):
     """
     视频分发排序
     :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -23,7 +23,7 @@ def video_rank(data, size, top_K, flow_pool_P):
     :return: rank_result
     """
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
-        return []
+        return [], flow_pool_recall_process
     # 将各路召回的视频按照score从大到小排序
     # 最惊奇相关推荐相似视频
     # relevant_recall = [item for item in data['rov_pool_recall']
@@ -120,6 +120,12 @@ def video_rank(data, size, top_K, flow_pool_P):
     # rank_result = relevant_recall_rank
     rank_result = []
 
+    # add_flow_pool_recall_log
+    if flow_pool_recall_process is None:
+        flow_pool_recall_process = {}
+    flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
+                                                        'flow_recall_rank': flow_recall_rank}
+
     # 从ROV召回池中获取top k
     if len(rov_recall_rank) > 0:
         rank_result.extend(rov_recall_rank[:top_K])
@@ -133,6 +139,11 @@ def video_rank(data, size, top_K, flow_pool_P):
     while i < size - top_K:
         # 随机生成[0, 1)浮点数
         rand = random.random()
+
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['flow_pool_P'] = flow_pool_P
+        flow_pool_recall_process[f'{i}_rand'] = rand
+
         # log_.info('rand: {}'.format(rand))
         if rand < flow_pool_P:
             if flow_recall_rank:
@@ -140,16 +151,16 @@ def video_rank(data, size, top_K, flow_pool_P):
                 flow_recall_rank.remove(flow_recall_rank[0])
             else:
                 rank_result.extend(rov_recall_rank[:size - top_K - i])
-                return rank_result[:size]
+                return rank_result[:size], flow_pool_recall_process
         else:
             if rov_recall_rank:
                 rank_result.append(rov_recall_rank[0])
                 rov_recall_rank.remove(rov_recall_rank[0])
             else:
                 rank_result.extend(flow_recall_rank[:size - top_K - i])
-                return rank_result[:size]
+                return rank_result[:size], flow_pool_recall_process
         i += 1
-    return rank_result[:size]
+    return rank_result[:size], flow_pool_recall_process
 
 
 def video_new_rank(videoIds, fast_flow_set, flow_set, size, top_K, flow_pool_P):
@@ -774,7 +785,7 @@ def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=Non
         return rank_result[:size], flow_num
 
 
-def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:'):
+def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:', flow_pool_recall_process=None):
     """
         视频分发排序
         :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
@@ -786,7 +797,7 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     """
     redis_helper = RedisHelper()
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
-        return [], 0
+        return [], 0, flow_pool_recall_process
 
     rov_recall_rank = data['rov_pool_recall']
     vid_keys = []
@@ -817,6 +828,11 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     )
     rank_result = []
 
+    # add_flow_pool_recall_log
+    if flow_pool_recall_process is None:
+        flow_pool_recall_process = {}
+    flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank, 'flow_recall_rank': flow_recall_rank}
+
     # 从ROV召回池中获取top k
     if len(rov_recall_rank) > 0:
         rank_result.extend(rov_recall_rank[:top_K])
@@ -830,6 +846,11 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
     while i < size - top_K:
         # 随机生成[0, 1)浮点数
         rand = random.random()
+
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['flow_pool_P'] = flow_pool_P
+        flow_pool_recall_process[f'{i}_rand'] = rand
+
         # log_.info('rand: {}'.format(rand))
         if rand < flow_pool_P:
             if flow_recall_rank:
@@ -837,16 +858,16 @@ def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1
                 flow_recall_rank.remove(flow_recall_rank[0])
             else:
                 rank_result.extend(rov_recall_rank[:size - top_K - i])
-                return rank_result[:size], flow_num
+                return rank_result[:size], flow_num, flow_pool_recall_process
         else:
             if rov_recall_rank:
                 rank_result.append(rov_recall_rank[0])
                 rov_recall_rank.remove(rov_recall_rank[0])
             else:
                 rank_result.extend(flow_recall_rank[:size - top_K - i])
-                return rank_result[:size], flow_num
+                return rank_result[:size], flow_num, flow_pool_recall_process
         i += 1
-    return rank_result[:size], flow_num
+    return rank_result[:size], flow_num, flow_pool_recall_process
 
 
 # 排序服务兜底

+ 63 - 5
video_recall.py

@@ -439,6 +439,9 @@ class PoolRecall(object):
 
     def flow_pool_recall(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
         """从流量池中获取视频"""
+        # add_flow_pool_recall_log
+        flow_pool_recall_process = {}
+
         start_time = time.time()
         # 获取存在城市分组数据的城市编码列表
         city_code_list = [code for _, code in config_.CITY_CODE.items()]
@@ -456,6 +459,10 @@ class PoolRecall(object):
             region_code = '-1'
 
         flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
+
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['flow_pool_key'] = flow_pool_key
+
         # print(flow_pool_key)
         flow_pool_recall_result = []
         flow_pool_recall_videos = []
@@ -473,6 +480,10 @@ class PoolRecall(object):
             data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
                                                               start=idx, end=idx + get_size - 1,
                                                               with_scores=True)
+
+            # add_flow_pool_recall_log
+            flow_pool_recall_process['initial_data'] = data
+
             # et_get = time.time()
             # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
             #     freq, data, (et_get - st_get) * 1000))
@@ -508,12 +519,20 @@ class PoolRecall(object):
                               region_code=region_code, shield_config=self.shield_config)
             ge.join()
             filtered_result = ge.get()
+
+            # add_flow_pool_recall_log
+            flow_pool_recall_process['filtered_data'] = filtered_result
+
             # 检查可分发数
             if filtered_result:
                 st_check = time.time()
                 ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 ge.join()
                 check_result = ge.get()
+
+                # add_flow_pool_recall_log
+                flow_pool_recall_process['check_counts_data'] = check_result
+
                 # log_.info({
                 #     'logTimestamp': int(time.time() * 1000),
                 #     'request_id': self.request_id,
@@ -555,7 +574,7 @@ class PoolRecall(object):
         #     'executeTime': (time.time() - start_time) * 1000
         # })
 
-        return flow_pool_recall_result[:size]
+        return flow_pool_recall_result[:size], flow_pool_recall_process
 
     def flow_pool_recall_new(self, size=10, flow_pool_id=None):
         """从流量池中获取视频"""
@@ -676,6 +695,9 @@ class PoolRecall(object):
 
     def flow_pool_recall_new_with_level(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
         """从流量池中获取视频"""
+        # add_flow_pool_recall_log
+        flow_pool_recall_process = {}
+
         start_time = time.time()
         # 获取存在城市分组数据的城市编码列表
         city_code_list = [code for _, code in config_.CITY_CODE.items()]
@@ -693,8 +715,13 @@ class PoolRecall(object):
             region_code = '-1'
 
         flow_pool_key, level = self.get_pool_redis_key('flow_set_level', flow_pool_id=flow_pool_id)
+
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['flow_pool_key'] = flow_pool_key
+        flow_pool_recall_process['level'] = level
+
         if flow_pool_key is None:
-            return []
+            return [], flow_pool_recall_process
         # print(flow_pool_key)
         flow_pool_recall_result = []
         flow_pool_recall_videos = []
@@ -710,6 +737,10 @@ class PoolRecall(object):
             # 获取数据
             # st_get = time.time()
             data = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=get_size)
+
+            # add_flow_pool_recall_log
+            flow_pool_recall_process['initial_data'] = data
+
             # et_get = time.time()
             # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
             #     freq, data, (et_get - st_get) * 1000))
@@ -744,12 +775,20 @@ class PoolRecall(object):
                               region_code=region_code, shield_config=self.shield_config)
             ge.join()
             filtered_result = ge.get()
+
+            # add_flow_pool_recall_log
+            flow_pool_recall_process['filtered_data'] = filtered_result
+
             # 检查可分发数
             if filtered_result:
                 # st_check = time.time()
                 ge = gevent.spawn(self.check_video_counts_new_with_level, video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 ge.join()
                 check_result = ge.get()
+
+                # add_flow_pool_recall_log
+                flow_pool_recall_process['check_counts_data'] = check_result
+
                 # log_.info({
                 #     'logTimestamp': int(time.time() * 1000),
                 #     'request_id': self.request_id,
@@ -791,10 +830,12 @@ class PoolRecall(object):
         #     'executeTime': (time.time() - start_time) * 1000
         # })
 
-        return flow_pool_recall_result[:size]
+        return flow_pool_recall_result[:size], flow_pool_recall_process
 
     def flow_pool_recall_new_with_level_score(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
         """从流量池中获取视频"""
+        # add_flow_pool_recall_log
+        flow_pool_recall_process = {}
         # 获取存在城市分组数据的城市编码列表
         city_code_list = [code for _, code in config_.CITY_CODE.items()]
         # 获取provinceCode
@@ -811,8 +852,13 @@ class PoolRecall(object):
             region_code = '-1'
 
         flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
+
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['flow_pool_key'] = flow_pool_key
+        flow_pool_recall_process['level'] = level
+
         if flow_pool_key is None:
-            return []
+            return [], flow_pool_recall_process
         # print(flow_pool_key)
         flow_pool_recall_result = []
         flow_pool_recall_videos = []
@@ -830,6 +876,10 @@ class PoolRecall(object):
             data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
                                                               start=idx, end=idx + get_size - 1,
                                                               with_scores=True)
+            # add_flow_pool_recall_log
+            print(data)
+            flow_pool_recall_process['initial_data'] = data
+
             # et_get = time.time()
             # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
             #     freq, data, (et_get - st_get) * 1000))
@@ -865,6 +915,10 @@ class PoolRecall(object):
                               region_code=region_code, shield_config=self.shield_config)
             ge.join()
             filtered_result = ge.get()
+
+            # add_flow_pool_recall_log
+            flow_pool_recall_process['filtered_data'] = filtered_result
+
             # 检查可分发数
             if filtered_result:
                 # st_check = time.time()
@@ -872,6 +926,10 @@ class PoolRecall(object):
                                   video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 ge.join()
                 check_result = ge.get()
+
+                # add_flow_pool_recall_log
+                flow_pool_recall_process['check_counts_data'] = check_result
+
                 # log_.info({
                 #     'logTimestamp': int(time.time() * 1000),
                 #     'request_id': self.request_id,
@@ -913,7 +971,7 @@ class PoolRecall(object):
         #     'executeTime': (time.time() - start_time) * 1000
         # })
 
-        return flow_pool_recall_result[:size]
+        return flow_pool_recall_result[:size], flow_pool_recall_process
 
     def check_video_counts(self, video_ids, flow_pool_mapping):
         """