liqian 2 роки тому
батько
коміт
041635de62
3 змінених файлів з 91 додано та 48 видалено
  1. 14 0
      config.py
  2. 25 5
      pool_predict.py
  3. 52 43
      videos_filter.py

+ 14 - 0
config.py

@@ -240,6 +240,8 @@ class BaseConfig(object):
 
     # 流量池离线模型结果存放 redis key前缀,完整格式 com.weiqu.video.flowpool.hot.item.score.{appType}
     FLOWPOOL_KEY_NAME_PREFIX = 'com.weiqu.video.flowpool.hot.item.score.'
+    # 快速曝光流量池数据存放 redis key前缀,完整格式 com.weiqu.video.quick.flowpool.hot.item.score.{appType}.{flowPool_id}
+    QUICK_FLOWPOOL_KEY_NAME_PREFIX = 'com.weiqu.video.quick.flowpool.hot.item.score.'
 
     # 兜底视频redis存储key
     BOTTOM_KEY_NAME = 'com.weiqu.video.bottom'
@@ -348,6 +350,9 @@ class DevelopmentConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
@@ -419,6 +424,9 @@ class TestConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
@@ -490,6 +498,9 @@ class PreProductionConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://preapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
@@ -561,6 +572,9 @@ class ProductionConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://api-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址

+ 25 - 5
pool_predict.py

@@ -104,19 +104,39 @@ def predict(app_type):
         log_.info('predict finished!')
         # 上传数据到redis
         redis_data = {}
+        quick_flow_pool_redis_data = {}
         for i in range(len(video_score)):
             video_id = filtered_videos[i]
             score = video_score[i]
             for flow_pool in mapping.get(video_id):
+                # 判断是否为快速曝光流量池视频
                 value = '{}-{}'.format(video_id, flow_pool)
-                redis_data[value] = score
-        key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+                flow_pool_id = int(flow_pool.split('#')[0])  # flowPool: 流量池ID#分级ID#级别Level#生命周期ID
+                if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+                    quick_flow_pool_redis_data[value] = score
+                else:
+                    redis_data[value] = score
+
+        # 快速曝光流量池视频写入redis
         redis_helper = RedisHelper()
+        quick_flow_pool_key_name = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}.{config_.QUICK_FLOW_POOL_ID}"
         # 如果key已存在,删除key
-        if redis_helper.key_exists(key_name):
-            redis_helper.del_keys(key_name)
+        if redis_helper.key_exists(quick_flow_pool_key_name):
+            redis_helper.del_keys(quick_flow_pool_key_name)
         # 写入redis
-        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
+        if quick_flow_pool_redis_data:
+            log_.info(f"quick_flow_pool_redis_data = {quick_flow_pool_redis_data}")
+            redis_helper.add_data_with_zset(key_name=quick_flow_pool_key_name, data=quick_flow_pool_redis_data,
+                                            expire_time=24 * 3600)
+        # 普通流量池视频写入redis
+        flow_pool_key_name = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}"
+        # 如果key已存在,删除key
+        if redis_helper.key_exists(flow_pool_key_name):
+            redis_helper.del_keys(flow_pool_key_name)
+        # 写入redis
+        if redis_data:
+            redis_helper.add_data_with_zset(key_name=flow_pool_key_name, data=redis_data, expire_time=24 * 3600)
+
         log_.info('data to redis finished!')
     except Exception as e:
         log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(

+ 52 - 43
videos_filter.py

@@ -156,48 +156,53 @@ def filter_flow_pool():
         if app_type in app_type_list:
             filter_flow_pool_18_19(app_type=app_type)
         else:
-            # 拼接redis-key
-            key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
-            # 获取视频
-            redis_helper = RedisHelper()
-            data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
-            if data is None:
-                log_.info("data is None")
-                log_.info("app_type {} videos filter end!".format(app_type))
-                continue
-            # videoId与flowPool做mapping
-            video_ids = []
-            mapping = {}
-            for video in data:
-                video_id, flow_pool = video.split('-')
-                video_id = int(video_id)
-                if video_id not in video_ids:
-                    video_ids.append(video_id)
-                    mapping[video_id] = [flow_pool]
+            for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]:
+                log_.info(f"flow_pool_id = {flow_pool_id}")
+                # 拼接redis-key
+                key_name = get_pool_redis_key(pool_type='flow', app_type=app_type, flow_pool_id=flow_pool_id)
+                # 获取视频
+                redis_helper = RedisHelper()
+                data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
+                if data is None:
+                    log_.info(f"flow_pool_id = {flow_pool_id}, data is None")
+                    log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
+                    continue
+                # videoId与flowPool做mapping
+                video_ids = []
+                mapping = {}
+                for video in data:
+                    video_id, flow_pool = video.split('-')
+                    video_id = int(video_id)
+                    if video_id not in video_ids:
+                        video_ids.append(video_id)
+                        mapping[video_id] = [flow_pool]
+                    else:
+                        mapping[video_id].append(flow_pool)
+                # 过滤
+                if len(video_ids) == 0:
+                    log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}")
+                    log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
+                    continue
+                if app_type == config_.APP_TYPE['APP']:
+                    filtered_result = filter_video_status_app(video_ids=video_ids)
                 else:
-                    mapping[video_id].append(flow_pool)
-            # 过滤
-            if len(video_ids) == 0:
-                log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
-                log_.info("app_type {} videos filter end!".format(app_type))
-                continue
-            if app_type == config_.APP_TYPE['APP']:
-                filtered_result = filter_video_status_app(video_ids=video_ids)
-            else:
-                filtered_result = filter_video_status(video_ids=video_ids)
-            # 求差集,获取需要过滤掉的视频,并从redis中移除
-            filter_videos = set(video_ids) - set(filtered_result)
-            log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
-                len(data), len(video_ids), len(filtered_result), len(filter_videos)))
-            # 移除
-            if len(filter_videos) == 0:
-                log_.info("app_type {} videos filter end!".format(app_type))
-                continue
-            remove_videos = ['{}-{}'.format(video_id, flow_pool)
-                             for video_id in filter_videos
-                             for flow_pool in mapping[video_id]]
-            redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
-            log_.info("app_type {} videos filter end!".format(app_type))
+                    filtered_result = filter_video_status(video_ids=video_ids)
+                # 求差集,获取需要过滤掉的视频,并从redis中移除
+                filter_videos = set(video_ids) - set(filtered_result)
+                log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
+                    len(data), len(video_ids), len(filtered_result), len(filter_videos)))
+                # 移除
+                if len(filter_videos) == 0:
+                    log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
+                    continue
+                remove_videos = ['{}-{}'.format(video_id, flow_pool)
+                                 for video_id in filter_videos
+                                 for flow_pool in mapping[video_id]]
+                redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
+                log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
+
+            log_.info(f"app_type = {app_type} videos filter end!")
+
     log_.info("flow pool filter end!")
 
 
@@ -313,11 +318,12 @@ def filter_rov_updated_app():
     log_.info("update rov videos app filter end!")
 
 
-def get_pool_redis_key(pool_type, app_type=None):
+def get_pool_redis_key(pool_type, app_type=None, flow_pool_id=None):
     """
     拼接key
     :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
     :param app_type: 产品标识
+    :param flow_pool_id: 流量池ID
     :return: key_name
     """
     redis_helper = RedisHelper()
@@ -380,7 +386,10 @@ def get_pool_redis_key(pool_type, app_type=None):
 
     elif pool_type == 'flow':
         # 流量池
-        return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+        if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+            return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}.{flow_pool_id}"
+        else:
+            return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}"
 
     else:
         log_.error('pool type error')