liqian 2 lat temu
rodzic
commit
8dbc662963
4 zmienionych plików z 94 dodań i 26 usunięć
  1. 9 0
      config.py
  2. 15 2
      db_helper.py
  3. 24 1
      recommend.py
  4. 46 23
      video_recall.py

+ 9 - 0
config.py

@@ -693,6 +693,15 @@ class BaseConfig(object):
     # 召回池分发视频在流量池中存在,该视频是否进行本地分发数-1 开关,1-开/0-关
     IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME = 'recall:videos:in:flow:pool:count:switch'
 
+    # 流量池videoId redis key前缀,完整格式 flow:pool:video:ids:{appType}
+    FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX = 'flow:pool:video:ids:'
+    # 快速曝光流量池videoId redis key前缀,完整格式 flow:pool:quick:video:ids:{appType}:{flowPool_id}
+    QUICK_FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX = 'flow:pool:quick:video:ids:'
+    # 流量池视频标记flowPool redis key前缀,完整格式 flow:pool:video:{appType}:{videoId}
+    FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX = 'flow:pool:video:'
+    # 快速曝光流量池视频标记flowPool redis key前缀,完整格式 flow:pool:quick:video:{appType}:{flowPool_id}:{videoId}
+    QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX = 'flow:pool:quick:video:'
+
     # 免广告用户组列表
     NO_AD_MID_GROUP_LIST = {
         'class1': ['return25_nmids'],

+ 15 - 2
db_helper.py

@@ -329,6 +329,17 @@ class RedisHelper(object):
         #     })
         return res
 
+    def get_data_with_count_from_set(self, key_name, count=1):
+        """
+        从set中随机获取元素,并放回
+        :param key_name: key
+        :param count: 获取个数, 默认为1
+        :return:
+        """
+        conn = self.connect()
+        data = conn.srandmember(name=key_name, number=count)
+        return data
+
     def remove_value_from_set(self, key_name, values):
         """
         删除set中的指定元素
@@ -489,8 +500,10 @@ if __name__ == '__main__':
     # print(len(data))
     # key_name = 'com.weiqu.video.hot.recommend.previewed.4.weixin_openid_otjoB5VG780SB4aVjYqBBNLb - X6M'
     # values = (6134455, 9772930, 9912678, 9901969, 9926876, 9904203, 2384831, 9932272, 9737653, 9925240)
-    key_name = 'com.weiqu.video.hot.recommend.previewed.4.abcd1'
+    # key_name = 'com.weiqu.video.hot.recommend.previewed.4.abcd1'
     # values = (9902612, 9905573, 9928264, 9932148, 9809440, 9919900, 6093379, 9917093, 9793537, 9814345)
     # redis_helper.add_data_with_set(key_name=key_name, values=values, expire_time=30 * 60)
-    res = redis_helper.get_data_from_set(key_name=key_name)
+    # res = redis_helper.get_data_from_set(key_name=key_name)
+
+    res = redis_helper.data_exists_with_set(key_name="flow:pool:video:0:161694911", value='1')
     print(res)

+ 24 - 1
recommend.py

@@ -508,9 +508,32 @@ def update_local_distribute_count(videos):
     try:
         redis_helper = RedisHelper()
         for item in videos:
-            key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{item['videoId']}:{item['flowPool']}"
+            video_id, flow_pool = item['videoId'], item['flowPool']
+            key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
             # 本地记录的分发数 - 1
             redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
+            # 对该视频做分发数检查
+            cur_count = redis_helper.get_data_from_redis(key_name=key_name)
+            # 无记录
+            if cur_count is None:
+                continue
+            # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+            if int(cur_count) <= 0:
+                redis_helper.del_keys(key_name=key_name)
+                for app_name in config_.APP_TYPE:
+                    app_type = config_.APP_TYPE.get(app_name)
+                    flow_pool_key_list = [
+                        f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}",
+                        f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
+                    ]
+                    for key in flow_pool_key_list:
+                        redis_helper.remove_value_from_zset(key_name=key, value=f"{video_id}-{flow_pool}")
+                    video_flow_pool_key_list = [
+                        f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}:{video_id}",
+                        f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{video_id}"
+                    ]
+                    for key in video_flow_pool_key_list:
+                        redis_helper.remove_value_from_set(key_name=key, values=(flow_pool, ))
 
             # if redis_helper.key_exists(key_name=key_name):
             #     # 该视频本地有记录,本地记录的分发数 - 1

+ 46 - 23
video_recall.py

@@ -452,7 +452,7 @@ class PoolRecall(object):
             region_code = '-1'
 
         flow_pool_key = self.get_pool_redis_key('flow', flow_pool_id=flow_pool_id)
-        print(flow_pool_key)
+        # print(flow_pool_key)
         flow_pool_recall_result = []
         flow_pool_recall_videos = []
         # 每次获取的视频数
@@ -1301,43 +1301,66 @@ class PoolRecall(object):
         return recall_result[:size]
 
     def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
-        t = [
-            gevent.spawn(self.rov_pool_recall_with_region_process, size, expire_time),
-            gevent.spawn(self.get_flow_pool_videos)
-        ]
-        gevent.joinall(t)
-        result_list = [i.get() for i in t]
-        videos = result_list[0]
-        flow_pool_video_id_list, videos_flow_pool = result_list[1]['video_id_list'], result_list[1]['videos_flow_pool']
+        """召回池召回视频"""
+        # 获取召回池中视频
+        videos = self.rov_pool_recall_with_region_process(size=size, expire_time=expire_time)
         # 对在流量池中存在的视频添加标记字段
-        result = self.add_flow_pool_tag(videos, flow_pool_video_id_list, videos_flow_pool)
-        return result
-
-    def add_flow_pool_tag(self, videos, flow_pool_video_id_list, videos_flow_pool):
-        """对在流量池中存在的视频添加标记字段"""
         result = []
-        # 判断视频是否在流量池视频中
         for item in videos:
-            if item['videoId'] in flow_pool_video_id_list:
-                flow_pool_list = videos_flow_pool.get(item['videoId'], [])
-                if len(flow_pool_list) > 0:
-                    flow_pool = flow_pool_list[0]
-                    item['flowPool'] = flow_pool
-                    item['isInFlowPool'] = 1
+            video_id = item['videoId']
+            t = [
+                gevent.spawn(self.get_video_flow_pool, video_id, True),
+                gevent.spawn(self.get_video_flow_pool, video_id, False)
+            ]
+            gevent.joinall(t)
+            flow_pool_list = [i.get() for i in t]
+            flow_pool_list = [item for item in flow_pool_list if item != '']
+            if len(flow_pool_list) > 0:
+                flow_pool = flow_pool_list[0]
+                item['flowPool'] = flow_pool
+                item['isInFlowPool'] = 1
             result.append(item)
         return result
 
+    def get_video_flow_pool(self, video_id, quick_flow_pool=False):
+        """
+        获取videoId对应的任意一个flowPool
+        :param video_id: videoId
+        :param quick_flow_pool: 是否为快速曝光流量池标识,默认:否 False
+        :return: flow_pool
+        """
+        if quick_flow_pool is True:
+            isin_flow_pool_key = \
+                f"{config_.QUICK_FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
+            flow_pool_key = \
+                f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{self.app_type}:" \
+                f"{config_.QUICK_FLOW_POOL_ID}:{video_id}"
+        else:
+            isin_flow_pool_key = \
+                f"{config_.FLOWPOOL_VIDEO_ID_KEY_NAME_PREFIX}{self.app_type}"
+            flow_pool_key = \
+                f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{self.app_type}:{video_id}"
+
+        # 判断是否在流量池中
+        isin_flow_pool = self.redis_helper.data_exists_with_set(key_name=isin_flow_pool_key, value=video_id)
+        flow_pool = ''
+        if isin_flow_pool:
+            # 随机获取一个flowPool标记
+            flow_pool_list = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=1)
+            if len(flow_pool_list) > 0:
+                flow_pool = flow_pool_list[0]
+        return flow_pool
+
     def get_flow_pool_videos(self):
         """获取当前可分发的流量池视频,以及对应的标记列表"""
         video_id_list = []
         videos_flow_pool = {}
-        redis_helper = RedisHelper()
         # 快速曝光流量池
         key_name_quick = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
         # 其他流量池
         key_name_other = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
         for key_name in [key_name_quick, key_name_other]:
-            data = redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=False)
+            data = self.redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=False)
             if data is None or len(data) == 0:
                 continue
             for item in data: