baichongyang 3 years ago
parent
commit
fc8e98240a
3 changed files with 26 additions and 11 deletions
  1. 0 0
      add
  2. 8 4
      utils.py
  3. 18 7
      video_recall.py

+ 0 - 0
add


+ 8 - 4
utils.py

@@ -39,22 +39,26 @@ def get_videos_remain_view_count(app_type, videos):
     :param app_type: 产品标识 type-int
     :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
     :return: data type-list,[(video_id, flow_pool, view_count), ...]
+             error_flag 错误标记,True为错误
     """
+    error_flag = False
     if not videos:
-        return []
+        return [], error_flag
 
     request_data = {'appType': app_type, 'videos': videos}
     result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data, timeout=1)
 
     if result is None:
-        return []
+        error_flag = True
+        return [], error_flag
 
     if result['code'] != 0:
         log_.info('获取视频在流量池中的剩余可分发数失败')
-        return []
+        error_flag = True
+        return [], error_flag
 
     data = [(item['videoId'], item['flowPool'], item['distributeCount']) for item in result['data']]
-    return data
+    return data, error_flag
 
 
 def get_videos_local_distribute_count(video_id, flow_pool):

+ 18 - 7
video_recall.py

@@ -92,7 +92,6 @@ class PoolRecall(object):
         # 记录获取频次
         freq = 0
         idx = 0
-        debug_tm_b = time.time()
         while len(flow_pool_recall_result) < size:
             freq += 1
             # 获取数据
@@ -121,7 +120,6 @@ class PoolRecall(object):
                 else:
                     video_mapping[video_id].append(flow_pool)
             # 过滤
-            debug_tm_fb = time.time()
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
             ge = gevent.spawn(filter_.filter_videos)
             ge.join()
@@ -133,7 +131,13 @@ class PoolRecall(object):
                 ge = gevent.spawn(self.check_video_counts, video_ids=filtered_result, flow_pool_mapping=video_mapping)
                 ge.join()
                 check_result = ge.get()
-                #check_result = self.check_video_counts(video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                #check_result, error_flag = self.check_video_counts(video_ids=filtered_result,
+                                                                   flow_pool_mapping=video_mapping)
+                # 判断错误标记, True为错误
+                if error_flag:
+                    # 结束流量池召回
+                    break
+
                 for item in check_result:
                     if item[0] not in flow_pool_recall_videos:
                         # 取其中一个 flow_pool 作为召回结果
@@ -155,15 +159,22 @@ class PoolRecall(object):
         检查视频剩余可分发数
         :param video_ids: 视频id type-list
         :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
-        :return:
+        :return: check_result, error_flag
         """
         flow_pool_key = self.get_pool_redis_key('flow')
         videos = []
         for video_id in video_ids:
             for flow_pool in flow_pool_mapping[video_id]:
                 videos.append({'videoId': video_id, 'flowPool': flow_pool})
-        view_count_result = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
-        log_.info('view_count_result = {}'.format(view_count_result))
+        view_count_result, error_flag = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
+        log_.info('error_flag = {}, view_count_result = {}'.format(error_flag, view_count_result))
+
+        # 判断返回的错误标记,True为错误
+        if error_flag:
+            # 从流量召回池移除视频videos
+            for item in videos:
+                value = '{}-{}'.format(item['videoId'], item['flowPool'])
+                self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
 
         check_result = []
         for item in view_count_result:
@@ -183,7 +194,7 @@ class PoolRecall(object):
                 # 从流量召回池移除
                 value = '{}-{}'.format(item[0], item[1])
                 self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
-        return check_result
+        return check_result, error_flag
 
     def get_pool_redis_key(self, pool_type):
         """