Browse Source

opt recommend flowpool

liqian 2 years ago
parent
commit
c7297399a6
3 changed files with 113 additions and 35 deletions
  1. 3 0
      config.py
  2. 10 2
      recommend.py
  3. 100 33
      video_recall.py

+ 3 - 0
config.py

@@ -690,6 +690,9 @@ class BaseConfig(object):
         'other': (1, 6, 7,),  # 其他
     }
 
+    # 召回池分发视频在流量池中存在,该视频是否进行本地分发数-1 开关,1-开/0-关
+    IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME = 'recall:videos:in:flow:pool:count:switch'
+
     # 免广告用户组列表
     NO_AD_MID_GROUP_LIST = {
         'class1': ['return25_nmids'],

+ 10 - 2
recommend.py

@@ -382,7 +382,6 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
     :param result: 排序结果
     :param app_type: 产品标识
     :param mid: mid
-    :param last_rov_recall_key: 用户上一次在rov召回池对应的位置 redis key
     :param top_K: 保证topK为召回池视频 type-int
     :param expire_time: 末位视频记录redis过期时间
     :return: None
@@ -464,7 +463,16 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
 
         # 将此次分发的流量池视频,对 本地分发数-1 进行记录
         if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
-            flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
+            # 获取本地分发数-1策略开关
+            switch = redis_helper.get_data_from_redis(key_name=config_.IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME)
+            if switch is not None:
+                if int(switch) == 1:
+                    flow_recall_video = [item for item in result if item.get('flowPool', None) is not None]
+                else:
+                    flow_recall_video = [item for item in result if
+                                         item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
+            else:
+                flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
             if flow_recall_video:
                 update_local_distribute_count(flow_recall_video)
                 # log_.info('update local distribute count success!')

+ 100 - 33
video_recall.py

@@ -1202,7 +1202,7 @@ class PoolRecall(object):
             idx = 0
         return key_name, last_region_dup_key, idx
 
-    def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
+    def rov_pool_recall_with_region_process(self, size=4, expire_time=24*3600):
         """
         地域分组召回视频
         :param size: 获取视频个数
@@ -1225,38 +1225,39 @@ class PoolRecall(object):
         if region_code == '':
             region_code = '-1'
 
-        if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
-            if region_code == '-1':
-                t = [
-                    gevent.spawn(self.recall_update_by_day, size, '30day'),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-                ]
-            else:
-                t = [
-                    gevent.spawn(self.recall_update_by_day, size, '30day'),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-                ]
+        # if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
+        #     if region_code == '-1':
+        #         t = [
+        #             gevent.spawn(self.recall_update_by_day, size, '30day'),
+        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+        #         ]
+        #     else:
+        #         t = [
+        #             gevent.spawn(self.recall_update_by_day, size, '30day'),
+        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
+        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
+        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+        #             gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+        #         ]
+        # else:
+
+        if region_code == '-1':
+            t = [
+                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
+                gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+                gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+            ]
         else:
-            if region_code == '-1':
-                t = [
-                    # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-                ]
-            else:
-                t = [
-                    # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
-                     # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
-                     # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-                     gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
-                     gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
-                     gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-                     gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-                ]
+            t = [
+                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
+                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
+                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
+                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
+                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
+                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+            ]
 
         gevent.joinall(t)
         region_recall_result_list = [i.get() for i in t]
@@ -1297,9 +1298,75 @@ class PoolRecall(object):
         #     'operation': 'rov_pool_recall_with_region',
         #     'executeTime': (time.time() - start_time) * 1000
         # })
-
         return recall_result[:size]
 
+    def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
+        t = [
+            gevent.spawn(self.rov_pool_recall_with_region_process, size, expire_time),
+            gevent.spawn(self.get_flow_pool_videos)
+        ]
+        gevent.joinall(t)
+        result_list = [i.get() for i in t]
+        videos = result_list[0]
+        flow_pool_video_id_list, videos_flow_pool = result_list[1]['video_id_list'], result_list[1]['videos_flow_pool']
+        # 对在流量池中存在的视频添加标记字段
+        result = self.add_flow_pool_tag(videos, flow_pool_video_id_list, videos_flow_pool)
+        return result
+
+    def add_flow_pool_tag(self, videos, flow_pool_video_id_list, videos_flow_pool):
+        """对在流量池中存在的视频添加标记字段"""
+        result = []
+        # 判断视频是否在流量池视频中
+        for item in videos:
+            if item['videoId'] in flow_pool_video_id_list:
+                flow_pool_list = videos_flow_pool.get(item['videoId'], [])
+                if len(flow_pool_list) > 0:
+                    flow_pool = flow_pool_list[0]
+                    item['flowPool'] = flow_pool
+                    item['isInFlowPool'] = 1
+            result.append(item)
+        return result
+
+    def get_flow_pool_videos(self):
+        """获取当前可分发的流量池视频,以及对应的标记列表"""
+        video_id_list = []
+        videos_flow_pool = {}
+        redis_helper = RedisHelper()
+        # 快速曝光流量池
+        key_name_quick = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
+        # 其他流量池
+        key_name_other = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
+        for key_name in [key_name_quick, key_name_other]:
+            data = redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=False)
+            if data is None or len(data) == 0:
+                continue
+            for item in data:
+                video_id, flow_pool = item.split('-')
+                video_id = int(video_id)
+                # ### 对该视频做分发数检查
+                cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
+                # 无记录
+                if cur_count is None:
+                    continue
+                # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+                if cur_count <= 0:
+                    remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
+                    self.redis_helper.del_keys(remain_count_key)
+                    for app_name in config_.APP_TYPE:
+                        flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(app_name)}"
+                        quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}" \
+                                              f"{config_.APP_TYPE.get(app_name)}:{config_.QUICK_FLOW_POOL_ID}"
+                        self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=item)
+                        self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=item)
+                    continue
+                # 本地分发数 cur_count > 0
+                if video_id in video_id_list:
+                    videos_flow_pool[video_id].append(flow_pool)
+                else:
+                    videos_flow_pool[video_id] = [flow_pool]
+                    video_id_list.append(video_id)
+        return {'video_id_list': video_id_list, 'videos_flow_pool': videos_flow_pool}
+
     def rov_pool_recall_with_region_by_h(self, province_code, size=4, key_flag=''):
         """
         地域分组小时级视频召回