Browse Source

add quick_flow_pool update

liqian 2 years ago
parent
commit
8ac3368b12
5 changed files with 137 additions and 60 deletions
  1. 24 0
      config.py
  2. 10 6
      db_helper.py
  3. 50 10
      pool_predict.py
  4. 1 1
      utils.py
  5. 52 43
      videos_filter.py

+ 24 - 0
config.py

@@ -240,6 +240,10 @@ class BaseConfig(object):
 
     # 流量池离线模型结果存放 redis key前缀,完整格式 com.weiqu.video.flowpool.hot.item.score.{appType}
     FLOWPOOL_KEY_NAME_PREFIX = 'com.weiqu.video.flowpool.hot.item.score.'
+    # 快速曝光流量池数据存放 redis key前缀,完整格式 com.weiqu.video.quick.flowpool.hot.item.score.{appType}.{flowPool_id}
+    QUICK_FLOWPOOL_KEY_NAME_PREFIX = 'com.weiqu.video.quick.flowpool.hot.item.score.'
+    # 快速曝光流量池分发概率 redis key前缀,完整格式 com.weiqu.video.quick.flowpool.distribute_rate.{flowPool_id}
+    QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX = 'com.weiqu.video.quick.flowpool.distribute_rate.'
 
     # 兜底视频redis存储key
     BOTTOM_KEY_NAME = 'com.weiqu.video.bottom'
@@ -348,6 +352,11 @@ class DevelopmentConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
+    # 获取流量池分发配置接口地址
+    GET_FLOW_POOL_RECOMMEND_CONFIG_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/getConfig'
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
@@ -419,6 +428,11 @@ class TestConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo_test.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
+    # 获取流量池分发配置接口地址
+    GET_FLOW_POOL_RECOMMEND_CONFIG_URL = 'http://videotest-internal.yishihui.com/longvideoapi/openapi/recommend/getConfig'
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://testapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
@@ -490,6 +504,11 @@ class PreProductionConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
+    # 获取流量池分发配置接口地址
+    GET_FLOW_POOL_RECOMMEND_CONFIG_URL = 'http://prespeed-internal.piaoquantv.com/longvideoapi/openapi/recommend/getConfig'
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://preapi-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址
@@ -561,6 +580,11 @@ class ProductionConfig(BaseConfig):
     # Hologres视频状态存储表名
     VIDEO_STATUS = 'longvideo.dwd_mdm_item_video_stat'
 
+    # 快速曝光流量池ID
+    QUICK_FLOW_POOL_ID = 3
+
+    # 获取流量池分发配置接口地址
+    GET_FLOW_POOL_RECOMMEND_CONFIG_URL = 'http://recommend-common-internal.piaoquantv.com/longvideoapi/openapi/recommend/getConfig'
     # 从流量池获取视频接口地址
     GET_VIDEOS_FROM_POOL_URL = 'http://api-internal.piaoquantv.com/flowpool/video/getAllVideo'
     # 获取视频在流量池中的剩余可分发数接口地址

+ 10 - 6
db_helper.py

@@ -7,6 +7,7 @@ from log import Log
 
 config_, _ = set_config()
 log = Log()
+conn_redis = None
 
 
 class RedisHelper(object):
@@ -25,12 +26,15 @@ class RedisHelper(object):
         连接redis
         :return: conn
         """
-        pool = redis.ConnectionPool(host=self.host,
-                                    port=self.port,
-                                    password=self.password,
-                                    decode_responses=True)
-        conn = redis.Redis(connection_pool=pool)
-        return conn
+        global conn_redis
+        if conn_redis is None:
+            pool = redis.ConnectionPool(host=self.host,
+                                        port=self.port,
+                                        password=self.password,
+                                        decode_responses=True)
+            conn = redis.Redis(connection_pool=pool)
+            conn_redis = conn
+        return conn_redis
 
     def key_exists(self, key_name):
         """

+ 50 - 10
pool_predict.py

@@ -64,6 +64,24 @@ def get_videos_remain_view_count(app_type, videos_info):
     return data
 
 
+def get_flow_pool_recommend_config(flow_pool_id):
+    """获取流量池推荐分发配置"""
+    result = request_post(request_url=config_.GET_FLOW_POOL_RECOMMEND_CONFIG_URL)
+    if result is None:
+        return None
+    if result['code'] != 0:
+        return None
+    flow_pool_distribute_config = result['data'].get('flowPoolDistributeConfig')
+    if flow_pool_distribute_config:
+        print(flow_pool_distribute_config)
+        if int(eval(flow_pool_distribute_config).get('flowPoolId')) == flow_pool_id:
+            return eval(eval(flow_pool_distribute_config).get('distributeRate'))
+        else:
+            return None
+    else:
+        return None
+
+
 def get_score(video_ids):
     # 以[0, 100]之间的随机浮点数作为score
     return [random.uniform(0, 100) for _ in range(len(video_ids))]
@@ -104,19 +122,46 @@ def predict(app_type):
         log_.info('predict finished!')
         # 上传数据到redis
         redis_data = {}
+        quick_flow_pool_redis_data = {}
         for i in range(len(video_score)):
             video_id = filtered_videos[i]
             score = video_score[i]
             for flow_pool in mapping.get(video_id):
+                # 判断是否为快速曝光流量池视频
                 value = '{}-{}'.format(video_id, flow_pool)
-                redis_data[value] = score
-        key_name = config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+                flow_pool_id = int(flow_pool.split('#')[0])  # flowPool: 流量池ID#分级ID#级别Level#生命周期ID
+                if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+                    quick_flow_pool_redis_data[value] = score
+                else:
+                    redis_data[value] = score
+
+        # 快速曝光流量池视频写入redis
         redis_helper = RedisHelper()
+        quick_flow_pool_key_name = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}.{config_.QUICK_FLOW_POOL_ID}"
         # 如果key已存在,删除key
-        if redis_helper.key_exists(key_name):
-            redis_helper.del_keys(key_name)
+        if redis_helper.key_exists(quick_flow_pool_key_name):
+            redis_helper.del_keys(quick_flow_pool_key_name)
         # 写入redis
-        redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=24 * 3600)
+        if quick_flow_pool_redis_data:
+            log_.info(f"quick_flow_pool_redis_data = {quick_flow_pool_redis_data}")
+            redis_helper.add_data_with_zset(key_name=quick_flow_pool_key_name, data=quick_flow_pool_redis_data,
+                                            expire_time=24 * 3600)
+            # 快速流量池分发概率存入redis
+            distribute_rate_key_name = f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
+            distribute_rate = get_flow_pool_recommend_config(flow_pool_id=config_.QUICK_FLOW_POOL_ID)
+            if distribute_rate is not None:
+                redis_helper.set_data_to_redis(key_name=distribute_rate_key_name, value=distribute_rate,
+                                               expire_time=15 * 60)
+
+        # 普通流量池视频写入redis
+        flow_pool_key_name = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}"
+        # 如果key已存在,删除key
+        if redis_helper.key_exists(flow_pool_key_name):
+            redis_helper.del_keys(flow_pool_key_name)
+        # 写入redis
+        if redis_data:
+            redis_helper.add_data_with_zset(key_name=flow_pool_key_name, data=redis_data, expire_time=24 * 3600)
+
         log_.info('data to redis finished!')
     except Exception as e:
         log_.error('流量池更新失败, appType: {} exception: {}, traceback: {}'.format(
@@ -245,10 +290,6 @@ def predict_19(app_type):
 
 
 if __name__ == '__main__':
-    # res = get_videos_from_pool(app_type=0)
-    # res = get_videos_remain_view_count(app_type=0, videos_info=[('12345', '#2#1#111')])
-    # print(res)
-
     app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
     log_.info('flow pool predict start...')
     for app_name, app_type in config_.APP_TYPE.items():
@@ -263,7 +304,6 @@ if __name__ == '__main__':
     log_.info('flow pool predict end...')
 
 
-
     # 将日志上传到oss
     # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
     #                                                    config_.OSS_FOLDER_LOGS + 'flow_pool/')

+ 1 - 1
utils.py

@@ -100,7 +100,7 @@ def send_msg_to_feishu(webhook, key_word, msg_text):
     print(response.text)
 
 
-def request_post(request_url, request_data):
+def request_post(request_url, request_data=None):
     """
     post 请求 HTTP接口
     :param request_url: 接口URL

+ 52 - 43
videos_filter.py

@@ -156,48 +156,53 @@ def filter_flow_pool():
         if app_type in app_type_list:
             filter_flow_pool_18_19(app_type=app_type)
         else:
-            # 拼接redis-key
-            key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
-            # 获取视频
-            redis_helper = RedisHelper()
-            data = redis_helper.get_all_data_from_zset(key_name=key_name)
-            if data is None:
-                log_.info("data is None")
-                log_.info("app_type {} videos filter end!".format(app_type))
-                continue
-            # videoId与flowPool做mapping
-            video_ids = []
-            mapping = {}
-            for video in data:
-                video_id, flow_pool = video.split('-')
-                video_id = int(video_id)
-                if video_id not in video_ids:
-                    video_ids.append(video_id)
-                    mapping[video_id] = [flow_pool]
+            for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]:
+                log_.info(f"flow_pool_id = {flow_pool_id}")
+                # 拼接redis-key
+                key_name = get_pool_redis_key(pool_type='flow', app_type=app_type, flow_pool_id=flow_pool_id)
+                # 获取视频
+                redis_helper = RedisHelper()
+                data = redis_helper.get_all_data_from_zset(key_name=key_name)
+                if data is None:
+                    log_.info(f"flow_pool_id = {flow_pool_id}, data is None")
+                    log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
+                    continue
+                # videoId与flowPool做mapping
+                video_ids = []
+                mapping = {}
+                for video in data:
+                    video_id, flow_pool = video.split('-')
+                    video_id = int(video_id)
+                    if video_id not in video_ids:
+                        video_ids.append(video_id)
+                        mapping[video_id] = [flow_pool]
+                    else:
+                        mapping[video_id].append(flow_pool)
+                # 过滤
+                if len(video_ids) == 0:
+                    log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}")
+                    log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
+                    continue
+                if app_type == config_.APP_TYPE['APP']:
+                    filtered_result = filter_video_status_app(video_ids=video_ids)
                 else:
-                    mapping[video_id].append(flow_pool)
-            # 过滤
-            if len(video_ids) == 0:
-                log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
-                log_.info("app_type {} videos filter end!".format(app_type))
-                continue
-            if app_type == config_.APP_TYPE['APP']:
-                filtered_result = filter_video_status_app(video_ids=video_ids)
-            else:
-                filtered_result = filter_video_status(video_ids=video_ids)
-            # 求差集,获取需要过滤掉的视频,并从redis中移除
-            filter_videos = set(video_ids) - set(filtered_result)
-            log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
-                len(data), len(video_ids), len(filtered_result), len(filter_videos)))
-            # 移除
-            if len(filter_videos) == 0:
-                log_.info("app_type {} videos filter end!".format(app_type))
-                continue
-            remove_videos = ['{}-{}'.format(video_id, flow_pool)
-                             for video_id in filter_videos
-                             for flow_pool in mapping[video_id]]
-            redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
-            log_.info("app_type {} videos filter end!".format(app_type))
+                    filtered_result = filter_video_status(video_ids=video_ids)
+                # 求差集,获取需要过滤掉的视频,并从redis中移除
+                filter_videos = set(video_ids) - set(filtered_result)
+                log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
+                    len(data), len(video_ids), len(filtered_result), len(filter_videos)))
+                # 移除
+                if len(filter_videos) == 0:
+                    log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
+                    continue
+                remove_videos = ['{}-{}'.format(video_id, flow_pool)
+                                 for video_id in filter_videos
+                                 for flow_pool in mapping[video_id]]
+                redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
+                log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
+
+            log_.info(f"app_type = {app_type} videos filter end!")
+
     log_.info("flow pool filter end!")
 
 
@@ -313,11 +318,12 @@ def filter_rov_updated_app():
     log_.info("update rov videos app filter end!")
 
 
-def get_pool_redis_key(pool_type, app_type=None):
+def get_pool_redis_key(pool_type, app_type=None, flow_pool_id=None):
     """
     拼接key
     :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
     :param app_type: 产品标识
+    :param flow_pool_id: 流量池ID
     :return: key_name
     """
     redis_helper = RedisHelper()
@@ -380,7 +386,10 @@ def get_pool_redis_key(pool_type, app_type=None):
 
     elif pool_type == 'flow':
         # 流量池
-        return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
+        if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+            return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}.{flow_pool_id}"
+        else:
+            return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}"
 
     else:
         log_.error('pool type error')