|  | @@ -973,6 +973,114 @@ class PoolRecall(object):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          return flow_pool_recall_result[:size], flow_pool_recall_process
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    def flow_pool_recall_new_with_level_score2(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
 | 
	
		
			
				|  |  | +        """从流量池中获取视频"""
 | 
	
		
			
				|  |  | +        # add_flow_pool_recall_log
 | 
	
		
			
				|  |  | +        flow_pool_recall_process = {}
 | 
	
		
			
				|  |  | +        # 获取存在城市分组数据的城市编码列表
 | 
	
		
			
				|  |  | +        city_code_list = [code for _, code in config_.CITY_CODE.items()]
 | 
	
		
			
				|  |  | +        # 获取provinceCode
 | 
	
		
			
				|  |  | +        province_code = self.client_info.get('provinceCode', '-1')
 | 
	
		
			
				|  |  | +        # 获取cityCode
 | 
	
		
			
				|  |  | +        city_code = self.client_info.get('cityCode', '-1')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if city_code in city_code_list:
 | 
	
		
			
				|  |  | +            # 分城市数据存在时,获取城市分组数据
 | 
	
		
			
				|  |  | +            region_code = city_code
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            region_code = province_code
 | 
	
		
			
				|  |  | +        if region_code == '':
 | 
	
		
			
				|  |  | +            region_code = '-1'
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # add_flow_pool_recall_log
 | 
	
		
			
				|  |  | +        flow_pool_recall_process['flow_pool_key'] = flow_pool_key
 | 
	
		
			
				|  |  | +        flow_pool_recall_process['level'] = level
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if flow_pool_key is None:
 | 
	
		
			
				|  |  | +            return [], flow_pool_recall_process
 | 
	
		
			
				|  |  | +        # print(flow_pool_key)
 | 
	
		
			
				|  |  | +        flow_pool_recall_result = []
 | 
	
		
			
				|  |  | +        flow_pool_recall_videos = []
 | 
	
		
			
				|  |  | +        # 每次获取的视频数
 | 
	
		
			
				|  |  | +        get_size = size * 5
 | 
	
		
			
				|  |  | +        # 获取数据
 | 
	
		
			
				|  |  | +        idx = 0
 | 
	
		
			
				|  |  | +        data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
 | 
	
		
			
				|  |  | +                                                          start=idx, end=idx + get_size * 5 - 1,
 | 
	
		
			
				|  |  | +                                                          with_scores=True)
 | 
	
		
			
				|  |  | +        flow_pool_recall_process['initial_data'] = data
 | 
	
		
			
				|  |  | +        if not data:
 | 
	
		
			
				|  |  | +            return [], flow_pool_recall_process
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # 将video_id 与 flow_pool, score做mapping整理
 | 
	
		
			
				|  |  | +        video_ids = []
 | 
	
		
			
				|  |  | +        video_mapping = {}
 | 
	
		
			
				|  |  | +        video_score = {}
 | 
	
		
			
				|  |  | +        for value in data:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                video_id, flow_pool = value[0].split('-')
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                log_.error({
 | 
	
		
			
				|  |  | +                    'request_id': self.request_id,
 | 
	
		
			
				|  |  | +                    'app_type': self.app_type,
 | 
	
		
			
				|  |  | +                    'flow_pool_value': value
 | 
	
		
			
				|  |  | +                })
 | 
	
		
			
				|  |  | +                continue
 | 
	
		
			
				|  |  | +            video_id = int(video_id)
 | 
	
		
			
				|  |  | +            video_score[value[0]] = value[1]
 | 
	
		
			
				|  |  | +            if video_id not in video_ids:
 | 
	
		
			
				|  |  | +                video_ids.append(video_id)
 | 
	
		
			
				|  |  | +            if video_id not in video_mapping:
 | 
	
		
			
				|  |  | +                video_mapping[video_id] = [flow_pool]
 | 
	
		
			
				|  |  | +            else:
 | 
	
		
			
				|  |  | +                video_mapping[video_id].append(flow_pool)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # 检查可分发数
 | 
	
		
			
				|  |  | +        ge = gevent.spawn(self.check_video_counts_new_with_level_score,
 | 
	
		
			
				|  |  | +                          video_ids=video_ids, flow_pool_mapping=video_mapping)
 | 
	
		
			
				|  |  | +        ge.join()
 | 
	
		
			
				|  |  | +        check_result = ge.get()
 | 
	
		
			
				|  |  | +        # add_flow_pool_recall_log
 | 
	
		
			
				|  |  | +        flow_pool_recall_process['check_counts_data'] = check_result
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        check_result_mapping = {}
 | 
	
		
			
				|  |  | +        check_result_items = []
 | 
	
		
			
				|  |  | +        if check_result:
 | 
	
		
			
				|  |  | +            # 获取score top20 视频进入过滤
 | 
	
		
			
				|  |  | +            for item in check_result:
 | 
	
		
			
				|  |  | +                video_id = int(item[0])
 | 
	
		
			
				|  |  | +                flow_pool = item[1]
 | 
	
		
			
				|  |  | +                score = video_score[f"{video_id}-{flow_pool}"]
 | 
	
		
			
				|  |  | +                if video_id not in flow_pool_recall_videos:
 | 
	
		
			
				|  |  | +                    check_result_mapping[video_id] = [flow_pool, score]
 | 
	
		
			
				|  |  | +                    check_result_items.append([video_id, flow_pool, score])
 | 
	
		
			
				|  |  | +            check_result_items = sorted(check_result_items, key=lambda x: x[2], reverse=True)
 | 
	
		
			
				|  |  | +            to_filter_videos = [item[0] for item in check_result_items[:get_size]]
 | 
	
		
			
				|  |  | +            # 过滤
 | 
	
		
			
				|  |  | +            filter_ = FilterVideos(request_id=self.request_id,
 | 
	
		
			
				|  |  | +                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos)
 | 
	
		
			
				|  |  | +            ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
 | 
	
		
			
				|  |  | +                              region_code=region_code, shield_config=self.shield_config)
 | 
	
		
			
				|  |  | +            ge.join()
 | 
	
		
			
				|  |  | +            filtered_result = ge.get()
 | 
	
		
			
				|  |  | +            # add_flow_pool_recall_log
 | 
	
		
			
				|  |  | +            flow_pool_recall_process['filtered_data'] = filtered_result
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            if filtered_result:
 | 
	
		
			
				|  |  | +                for item in filtered_result:
 | 
	
		
			
				|  |  | +                    video_id = int(item)
 | 
	
		
			
				|  |  | +                    # 添加视频源参数 pushFrom, abCode
 | 
	
		
			
				|  |  | +                    flow_pool_recall_result.append(
 | 
	
		
			
				|  |  | +                        {'videoId': video_id, 'flowPool': check_result_mapping[video_id][0], 'level': level,
 | 
	
		
			
				|  |  | +                         'rovScore': check_result_mapping[video_id][1],
 | 
	
		
			
				|  |  | +                         'pushFrom': config_.PUSH_FROM['flow_recall'],
 | 
	
		
			
				|  |  | +                         'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return flow_pool_recall_result[:size], flow_pool_recall_process
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def check_video_counts(self, video_ids, flow_pool_mapping):
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          检查视频剩余可分发数
 |