Quellcode durchsuchen

Merge remote-tracking branch 'origin' into test

linfan vor 1 Jahr
Ursprung
Commit
fc8f858b83
6 geänderte Dateien mit 162 neuen und 66 gelöschten Zeilen
  1. 1 1
      app.py
  2. 1 1
      config.py
  3. 44 36
      recommend.py
  4. 3 2
      requirements.txt
  5. 19 21
      utils.py
  6. 94 5
      video_recall.py

+ 1 - 1
app.py

@@ -369,7 +369,7 @@ def app_video_hot_list():
 
     except Exception as e:
         log_.error(e)
-        print(traceback.format_exc())
+        # print(traceback.format_exc())
         result = {'code': -1, 'message': 'fail'}
         return json.dumps(result)
 

+ 1 - 1
config.py

@@ -1286,7 +1286,7 @@ class ProductionConfig(BaseConfig):
 def set_config():
     # 获取环境变量 ROV_SERVER_ENV
     env = os.environ.get('ROV_SERVER_ENV')
-    # env = 'pro'
+    # env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 44 - 36
recommend.py

@@ -400,8 +400,8 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
              gevent.spawn(pool_recall.get_region_day_recall, size, region_code),
              gevent.spawn(pool_recall.get_selected_recall, size, region_code),
              gevent.spawn(pool_recall.get_no_selected_recall, size, region_code),
-             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
-             gevent.spawn(pool_recall.flow_pool_recall, size)]
+             gevent.spawn(pool_recall.new_flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.new_flow_pool_recall, size)]
 
         if ab_code ==60049:
             t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall))
@@ -411,7 +411,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     all_recall_result = []
     #print(all_recall_result_list)
     result['recallTime'] = (time.time() - start_recall) * 1000
-
+    #print("recall time:", result['recallTime'])
     if not all_recall_result_list or len(all_recall_result_list)==0:
         return result
     for recall_item in all_recall_result_list:
@@ -422,6 +422,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
 
     #print("all_recall_result:", all_recall_result)
     #2. duplicate
+    dup_time = time.time()
     recall_dict = {}
     fast_flow_set = set('')
     flow_flow_set = set('')
@@ -434,48 +435,54 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
     flowFlag_dict = {}
     for per_item in all_recall_result:
         #print(per_item)
-        vId = int(per_item.get("videoId",0))
-        if vId==0:
-            continue
-        recall_name = per_item.get("pushFrom",'')
-        flow_pool = per_item.get("flowPool", '')
-        if flow_pool != '':
-            flow_pool_id = int(flow_pool.split('#')[0])
-            if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
-                fast_flow_set.add(vId)
+        try:
+            vId = int(per_item.get("videoId",0))
+            if vId==0:
+                continue
+            recall_name = per_item.get("pushFrom",'')
+            flow_pool = per_item.get("flowPool", '')
+            if flow_pool != '':
+                flow_pool_id = int(flow_pool.split('#')[0])
+                if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+                    fast_flow_set.add(vId)
+                else:
+                    flow_flow_set.add(vId)
+                flowFlag_dict[vId] = flow_pool
+
+            #duplicate divide into
+            if vId not in recall_dict:
+                if recall_name == config_.PUSH_FROM['rov_recall_region_h']:
+                    region_h_recall.append(per_item)
+                elif recall_name == config_.PUSH_FROM['rov_recall_region_24h']:
+                    region_day_recall.append(per_item)
+                elif recall_name == config_.PUSH_FROM['rov_recall_24h']:
+                    select_day_recall.append(per_item)
+                elif recall_name == config_.PUSH_FROM['rov_recall_24h_dup']:
+                    no_selected_recall.append(per_item)
+                elif recall_name == config_.PUSH_FROM['sim_hot_vid_recall']:
+                    sim_hot_recall.append(per_item)
+                elif recall_name == config_.PUSH_FROM['flow_recall']:
+                    flow_recall.append(per_item)
+            if vId not in recall_dict:
+                recall_dict[vId] = recall_name
             else:
-                flow_flow_set.add(vId)
-            flowFlag_dict[vId] = flow_pool
-
-        #duplicate divide into
-        if vId not in recall_dict:
-            if recall_name == config_.PUSH_FROM['rov_recall_region_h']:
-                region_h_recall.append(per_item)
-            elif recall_name == config_.PUSH_FROM['rov_recall_region_24h']:
-                region_day_recall.append(per_item)
-            elif recall_name == config_.PUSH_FROM['rov_recall_24h']:
-                select_day_recall.append(per_item)
-            elif recall_name == config_.PUSH_FROM['rov_recall_24h_dup']:
-                no_selected_recall.append(per_item)
-            elif recall_name == config_.PUSH_FROM['sim_hot_vid_recall']:
-                sim_hot_recall.append(per_item)
-            elif recall_name == config_.PUSH_FROM['flow_recall']:
-                flow_recall.append(per_item)
-        if vId not in recall_dict:
-            recall_dict[vId] = recall_name
-        else:
-            recall_name = recall_dict[vId] + "," + recall_name
-            recall_dict[vId] = recall_name
+                recall_name = recall_dict[vId] + "," + recall_name
+                recall_dict[vId] = recall_name
+        except:
+            continue
     #print("recall_dict:", recall_dict)
+    #print("recall time:", (time.time()-dup_time)*1000)
     #3. filter video, 先过预曝光
+    filter_time = time.time()
     filter_ = FilterVideos(request_id=request_id,
                            app_type=app_type, mid=mid, uid=uid, video_ids=list(recall_dict.keys()))
 
     #print("filer:", list(recall_dict.keys()))
     #a).expose filter
     #all_recall_list = list(recall_dict.keys())
-    all_recall_list = filter_.filter_videos_new(pool_type='rov', region_code=region_code, shield_config=shield_config)
+    all_recall_list = filter_.filter_videos_new(region_code=region_code, shield_config=shield_config, flow_set=flowFlag_dict.keys())
     #print("filer after:", all_recall_list)
+    #print("filter_ time:", (time.time() - filter_time) * 1000)
     #4. sort: old sort: flow 按概率出
     start_rank = time.time()
     #quick_flow_pool_P get from redis
@@ -509,6 +516,7 @@ def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
 
     result['rankResult'] = rank_result
     result['rankTime'] = (time.time() - start_rank) * 1000
+    #print("rank time:", result['rankTime'])
     return result
     # return rank_result, last_rov_recall_key
 
@@ -1324,7 +1332,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
     # 简单召回 - 排序 - 兜底
     get_result_st = time.time()
-    print("ab_code:", ab_code)
+    #print("ab_code:", ab_code)
     if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
         result = new_video_recommend(request_id=request_id,
                              mid=mid, uid=uid, app_type=app_type,

+ 3 - 2
requirements.txt

@@ -1,7 +1,8 @@
-gevent==20.9.0
 numpy==1.19.2
 Flask==1.1.2
+pandas==1.1.3
 redis==3.5.3
-PyMySQL==1.0.2
+gevent==20.9.0
 requests==2.24.0
+PyMySQL==1.0.2
 aliyun_python_sdk==2.2.0

+ 19 - 21
utils.py

@@ -295,6 +295,7 @@ class FilterVideos(object):
         # 预曝光过滤
         st_pre = time.time()
         filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        # print("filtered_pre:", (time.time()-st_pre)*1000)
         # et_pre = time.time()
         # log_.info({
         #     'logTimestamp': int(time.time() * 1000),
@@ -323,6 +324,7 @@ class FilterVideos(object):
         # 视频已曝光过滤
         st_viewed = time.time()
         filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
+        # print("filtered_pre:", (time.time() - st_viewed) * 1000)
         # et_viewed = time.time()
         # log_.info({
         #     'logTimestamp': int(time.time() * 1000),
@@ -361,6 +363,7 @@ class FilterVideos(object):
                         'shield_filter_result': filtered_shield_video_ids,
                         'executeTime': (time.time() - st_viewed) * 1000
                     })
+                    # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
                     return filtered_shield_video_ids
                 else:
                     return filtered_viewed_videos
@@ -447,13 +450,13 @@ class FilterVideos(object):
                         "uid": self.uid,
                         "types": list(types),
                         "videoIds": video_ids}
-        print(request_data)
+        # print(request_data)
         # 调用http接口
         result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
 
-        print("result:", result)
+        # print("result:", result)
         if result is None:
-            print("result is None")
+            # print("result is None")
             # log_.info('过滤失败,types: {}'.format(types))
             return []
 
@@ -508,7 +511,7 @@ class FilterVideos(object):
         :param shield_key_name_list: 过滤视频 redis-key
         :return: filtered_videos  过滤后的列表  type-list
         """
-        print("filter_shield_video:", len(filter_shield_video))
+        # print("filter_shield_video:", len(filter_shield_video))
         if len(video_ids) == 0:
             return video_ids
         # 根据Redis缓存中的数据过滤
@@ -523,7 +526,7 @@ class FilterVideos(object):
             #     continue
             # shield_videos = [int(video) for video in shield_videos_list]
             # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
-        print("video_ids:", len(video_ids))
+        # print("video_ids:", len(video_ids))
         return video_ids
 
     def new_filter_video(self):
@@ -606,7 +609,7 @@ class FilterVideos(object):
             #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
             return video_ids
 
-    def filter_videos_new(self, pool_type='rov', region_code=None, shield_config=None):
+    def filter_videos_new(self, region_code=None, shield_config=None, flow_set=None):
         """视频过滤"""
         # 预曝光过滤
         st_pre = time.time()
@@ -638,31 +641,26 @@ class FilterVideos(object):
             return None
         filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
         #print("result:", filtered_viewed_videos)
-        if pool_type != 'flow':
+        if flow_set is None:
             return  filtered_viewed_videos
         else:
             # 流量池视频需过滤屏蔽视频
             if region_code is None or shield_config is None:
                 return filtered_viewed_videos
             else:
+                normal_recall_ids = []
+                left_flow_ids = []
+                for vid in filtered_viewed_videos:
+                    if vid in flow_set:
+                        left_flow_ids.append(vid)
+                    else:
+                        normal_recall_ids.append(vid)
                 shield_key_name_list = shield_config.get(region_code, None)
                 if shield_key_name_list is not None:
                     filtered_shield_video_ids = self.filter_shield_video(
-                       video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
+                       video_ids=left_flow_ids, shield_key_name_list=shield_key_name_list
                     )
-                    # log_.info({
-                    #     'logTimestamp': int(time.time() * 1000),
-                    #     'pool_type': pool_type,
-                    #     'request_id': self.request_id,
-                    #     'app_type': self.app_type,
-                    #     'mid': self.mid,
-                    #     'uid': self.uid,
-                    #     'operation': 'shield_filter',
-                    #     'request_videos': filtered_viewed_videos,
-                    #     'shield_filter_result': filtered_shield_video_ids,
-                    #     'executeTime': (time.time() - st_viewed) * 1000
-                    #  })
-                    return filtered_shield_video_ids
+                    return normal_recall_ids+filtered_shield_video_ids
                 else:
                     return filtered_viewed_videos
 

+ 94 - 5
video_recall.py

@@ -2130,7 +2130,7 @@ class PoolRecall(object):
                      'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
                      'abCode': self.ab_code}
                 )
-        return recall_result[:100]
+        return recall_result[:200]
 
     # get region_hour_recall
     def get_region_hour_recall(self, size=4, region_code='-1'):
@@ -2149,7 +2149,7 @@ class PoolRecall(object):
                      'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_h'],
                      'abCode': self.ab_code}
                 )
-        return recall_result[:100]
+        return recall_result[:200]
 
     # get region_day_recall
     def get_region_day_recall(self, size=4,region_code='-1'):
@@ -2169,7 +2169,7 @@ class PoolRecall(object):
                      'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_24h'],
                      'abCode': self.ab_code}
                 )
-        return recall_result[:100]
+        return recall_result[:200]
 
 
     def get_selected_recall(self, size=4, region_code='-1'):
@@ -2190,7 +2190,7 @@ class PoolRecall(object):
                      'abCode': self.ab_code}
                 )
         #print("recall_result:", recall_result)
-        return recall_result[:100]
+        return recall_result[:200]
 
     def get_no_selected_recall(self, size=4, region_code='-1'):
         """未选择召回池召回视频"""
@@ -2209,7 +2209,7 @@ class PoolRecall(object):
                      'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h_dup'],
                      'abCode': self.ab_code}
                 )
-        return recall_result[:100]
+        return recall_result[:200]
 
     def get_fast_flow_pool_recall(self, size=4):
         """快速流量池召回视频"""
@@ -2245,3 +2245,92 @@ class PoolRecall(object):
                      'abCode': self.ab_code}
                 )
         return  recall_result
+    def new_flow_pool_recall(self, size=10, flow_pool_id=None):
+        """从流量池中获取视频"""
+        start_time = time.time()
+        flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
+
+        flow_pool_recall_result = []
+        flow_pool_recall_videos = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 记录获取频次
+        freq = 0
+        idx = 0
+        while len(flow_pool_recall_result) < size:
+            freq += 1
+            if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
+                break
+            # 获取数据
+            # st_get = time.time()
+            data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
+                                                              start=idx, end=idx + get_size - 1,
+                                                              with_scores=True)
+            et_get = time.time()
+            # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
+            #     freq, data, (et_get - st_get) * 1000))
+            if not data:
+                # log_.info('流量池中的视频已取完')
+                break
+            # 将video_id 与 flow_pool, score做mapping整理
+            video_ids = []
+            video_mapping = {}
+            video_score = {}
+            for value in data:
+                try:
+                    video_id, flow_pool = value[0].split('-')
+                except Exception as e:
+                    log_.error({
+                        'request_id': self.request_id,
+                        'app_type': self.app_type,
+                        'flow_pool_value': value
+                    })
+                    continue
+                video_id = int(video_id)
+                if video_id not in video_ids:
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                if video_id not in video_mapping:
+                    video_mapping[video_id] = [flow_pool]
+                else:
+                    video_mapping[video_id].append(flow_pool)
+            # 过滤
+            # filter_ = FilterVideos(request_id=self.request_id,
+            #                        app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+            # filtered_result = filter_.filter_videos(pool_type='flow', region_code=region_code, shield_config=self.shield_config)
+            # print("flow filter time:", (time.time()-et_get)*1000)
+            # 检查可分发数
+            if video_ids and len(video_ids)>0:
+                check_result = self.check_video_counts(video_ids=video_ids, flow_pool_mapping=video_mapping)
+                for item in check_result:
+                    video_id = int(item[0])
+                    flow_pool = item[1]
+                    if video_id not in flow_pool_recall_videos:
+                        # 取其中一个 flow_pool 作为召回结果
+                        # 添加视频源参数 pushFrom, abCode
+                        flow_pool_recall_result.append(
+                            {'videoId': video_id, 'flowPool': flow_pool,
+                             'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
+                             'abCode': self.ab_code}
+                        )
+
+                        flow_pool_recall_videos.append(video_id)
+                # et_check = time.time()
+                # log_.info('check result: result = {}, execute time = {}ms'.format(
+                #     check_result, (et_check - st_check) * 1000))
+
+                # # 判断错误标记, True为错误
+                # if error_flag:
+                #     # 结束流量池召回
+                #     break
+
+            idx += get_size
+
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'request_id': self.request_id,
+        #     'operation': 'flow_pool_recall',
+        #     'executeTime': (time.time() - start_time) * 1000
+        # })
+
+        return flow_pool_recall_result[:size]