Prechádzať zdrojové kódy

update video recall

linfan 1 rok pred
rodič
commit
1be8118341
1 zmenil súbory, kde vykonal 117 pridanie a 0 odobranie
  1. 117 0
      video_recall.py

+ 117 - 0
video_recall.py

@@ -2237,3 +2237,120 @@ class PoolRecall(object):
                      'abCode': self.ab_code}
                 )
         return  recall_result
+    def new_flow_pool_recall(self, size=10, flow_pool_id=None):
+        """从流量池中获取视频"""
+        start_time = time.time()
+        # 获取存在城市分组数据的城市编码列表
+        city_code_list = [code for _, code in config_.CITY_CODE.items()]
+        # 获取provinceCode
+        province_code = self.client_info.get('provinceCode', '-1')
+        # 获取cityCode
+        city_code = self.client_info.get('cityCode', '-1')
+
+        if city_code in city_code_list:
+            # 分城市数据存在时,获取城市分组数据
+            region_code = city_code
+        else:
+            region_code = province_code
+        if region_code == '':
+            region_code = '-1'
+
+        flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
+        # print(flow_pool_key)
+        flow_pool_recall_result = []
+        flow_pool_recall_videos = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 记录获取频次
+        freq = 0
+        idx = 0
+        while len(flow_pool_recall_result) < size:
+            freq += 1
+            if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
+                break
+            # 获取数据
+            # st_get = time.time()
+            data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
+                                                              start=idx, end=idx + get_size - 1,
+                                                              with_scores=True)
+            et_get = time.time()
+            # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
+            #     freq, data, (et_get - st_get) * 1000))
+            if not data:
+                # log_.info('流量池中的视频已取完')
+                break
+            # 将video_id 与 flow_pool, score做mapping整理
+            video_ids = []
+            video_mapping = {}
+            video_score = {}
+            for value in data:
+                try:
+                    video_id, flow_pool = value[0].split('-')
+                except Exception as e:
+                    log_.error({
+                        'request_id': self.request_id,
+                        'app_type': self.app_type,
+                        'flow_pool_value': value
+                    })
+                    continue
+                video_id = int(video_id)
+                if video_id not in video_ids:
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                if video_id not in video_mapping:
+                    video_mapping[video_id] = [flow_pool]
+                else:
+                    video_mapping[video_id].append(flow_pool)
+            # 过滤
+            filter_ = FilterVideos(request_id=self.request_id,
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+            filtered_result = filter_.filter_videos(pool_type='flow', region_code=region_code, shield_config=self.shield_config)
+            print("flow filter time:", (time.time()-et_get)*1000)
+            # 检查可分发数
+            if filtered_result:
+                st_check = time.time()
+                ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                ge.join()
+                check_result = ge.get()
+                # log_.info({
+                #     'logTimestamp': int(time.time() * 1000),
+                #     'request_id': self.request_id,
+                #     'app_type': self.app_type,
+                #     'mid': self.mid,
+                #     'uid': self.uid,
+                #     'operation': 'check_video_counts',
+                #     'executeTime': (time.time() - st_check) * 1000
+                # })
+
+                for item in check_result:
+                    video_id = int(item[0])
+                    flow_pool = item[1]
+                    if video_id not in flow_pool_recall_videos:
+                        # 取其中一个 flow_pool 作为召回结果
+                        # 添加视频源参数 pushFrom, abCode
+                        flow_pool_recall_result.append(
+                            {'videoId': video_id, 'flowPool': flow_pool,
+                             'rovScore': video_score[video_id], 'pushFrom': config_.PUSH_FROM['flow_recall'],
+                             'abCode': self.ab_code}
+                        )
+
+                        flow_pool_recall_videos.append(video_id)
+                # et_check = time.time()
+                # log_.info('check result: result = {}, execute time = {}ms'.format(
+                #     check_result, (et_check - st_check) * 1000))
+
+                # # 判断错误标记, True为错误
+                # if error_flag:
+                #     # 结束流量池召回
+                #     break
+
+            idx += get_size
+
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'request_id': self.request_id,
+        #     'operation': 'flow_pool_recall',
+        #     'executeTime': (time.time() - start_time) * 1000
+        # })
+
+        return flow_pool_recall_result[:size]