Browse Source

add region_h_by24h recall test

liqian 2 years ago
parent
commit
a6817f6221
4 changed files with 66 additions and 35 deletions
  1. 11 0
      config.py
  2. 8 4
      utils.py
  3. 9 2
      video_rank.py
  4. 38 29
      video_recall.py

+ 11 - 0
config.py

@@ -121,6 +121,7 @@ class BaseConfig(object):
         'old_video': 'old_video_recall',  # 老视频
         'rov_recall_region_h': 'recall_pool_region_h',  # 地域分组小时级更新列表
         'rov_recall_region_day': 'recall_pool_region_day',  # 地域分组天级更新列表
+        'rov_recall_region_24h': 'recall_pool_region_24h',  # 地域分组小时级更新24h列表
     }
 
     # category id mapping
@@ -149,6 +150,11 @@ class BaseConfig(object):
     # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.record.mid.{mid}
     H_WITH_MID_RECORD_KEY_NAME_PREFIX = 'com.weiqu.video.h.record.mid.'
 
+    # 每个mid存储对应小时级更新结果 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.h.region.24h.mid.{mid}
+    H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H = 'com.weiqu.video.recall.hot.item.score.h.region.24h.mid.'
+    # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.region.24h.record.mid.{mid}
+    H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H = 'com.weiqu.video.h.region.24h.record.mid.'
+
     # 小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.h.item.{rule_key}
     H_VIDEO_FILER = 'com.weiqu.video.filter.h.item.'
 
@@ -174,6 +180,9 @@ class BaseConfig(object):
     # 小程序地域分组天级更新结果与小程序地域分组小时级更新结果去重后 存放 redis key前缀,
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup1.region.day.h.{region}.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H = 'com.weiqu.video.recall.hot.item.score.dup1.region.day.h.'
+    # 小程序地域分组小时级更新24h结果与小程序地域分组小时级更新结果去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup1.region.day.h.{region}.{rule_key}.{date}.{h}
+    RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H = 'com.weiqu.video.recall.hot.item.score.dup1.region.24h.h.'
     # 小程序天级更新结果与 小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup2.region.day.h.{region}.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H = 'com.weiqu.video.recall.hot.item.score.dup2.region.day.h.'
@@ -182,6 +191,8 @@ class BaseConfig(object):
     RECALL_KEY_NAME_PREFIX_DUP_REGION_H = 'com.weiqu.video.recall.hot.item.score.dup.region.h.'
     # 地域分组小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.region.h.item.{region}.{rule_key}
     REGION_H_VIDEO_FILER = 'com.weiqu.video.filter.region.h.item.'
+    # 地域分组小时级更新24h视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.region.h.item.24h.{region}.{rule_key}
+    REGION_H_VIDEO_FILER_24H = 'com.weiqu.video.filter.region.h.item.24h.'
 
     # app应用 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.app.{date}
     RECALL_KEY_NAME_PREFIX_APP = 'com.weiqu.video.recall.hot.item.score.app.'

+ 8 - 4
utils.py

@@ -148,13 +148,16 @@ class FilterVideos(object):
         self.uid = uid
         self.video_ids = video_ids
 
-    def filter_video_status_h(self, video_ids, rule_key, ab_code, province_code):
+    def filter_video_status_h(self, video_ids, rule_key, ab_code, province_code, key_flag=''):
         """召回小时级更新的视频状态过滤"""
         # 根据Redis缓存中的数据过滤
         redis_helper = RedisHelper()
         # 获取不符合推荐状态的视频
         if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
-            key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
+            if key_flag == 'region_24h':
+                key_prefix = f"{config_.REGION_H_VIDEO_FILER_24H}{province_code}."
+            else:
+                key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
         elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
             key_prefix = config_.H_VIDEO_FILER_24H
         else:
@@ -166,7 +169,7 @@ class FilterVideos(object):
         filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
         return filtered_videos
 
-    def filter_videos_h(self, rule_key, ab_code, province_code):
+    def filter_videos_h(self, rule_key, ab_code, province_code, key_flag=''):
         """召回小时级更新的视频过滤"""
         # 预曝光过滤
         st_pre = time.time()
@@ -180,7 +183,8 @@ class FilterVideos(object):
         # 视频状态过滤
         st_status = time.time()
         filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result, rule_key=rule_key,
-                                                            ab_code=ab_code, province_code=province_code)
+                                                            ab_code=ab_code, province_code=province_code,
+                                                            key_flag=key_flag)
         et_status = time.time()
         log_.info('filter by video status: result = {}, execute time = {}ms'.format(
             filtered_status_result, (et_status - st_status) * 1000))

+ 9 - 2
video_rank.py

@@ -30,6 +30,11 @@ def video_rank(data, size, top_K, flow_pool_P):
     region_h_recall = [item for item in data['rov_pool_recall']
                          if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
     region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 地域分组小时级更新24h规则更新数据
+    region_24h_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
+    region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+
     # 地域分组天级规则更新数据
     region_day_recall = [item for item in data['rov_pool_recall']
                          if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
@@ -43,12 +48,14 @@ def video_rank(data, size, top_K, flow_pool_P):
         if item.get('pushFrom') not in
            [config_.PUSH_FROM['rov_recall_h'],
             config_.PUSH_FROM['rov_recall_region_h'],
+            config_.PUSH_FROM['rov_recall_region_24h'],
             config_.PUSH_FROM['rov_recall_region_day'],
             config_.PUSH_FROM['rov_recall_day']]
     ]
     rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
-    rov_recall_rank = h_recall_rank + region_h_recall_rank + region_day_recall_rank \
-                      + day_recall_rank + rov_initial_recall_rank
+    rov_recall_rank = h_recall_rank + \
+                      region_h_recall_rank + region_24h_recall_rank + region_day_recall_rank + \
+                      day_recall_rank + rov_initial_recall_rank
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
     # 对各路召回的视频进行去重

+ 38 - 29
video_recall.py

@@ -49,21 +49,11 @@ class PoolRecall(object):
         else:
             return False
 
-    def update_mid_data(self, province_code):
-        # mid对应小时级视频列表 redis-key
-        h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.mid}"
-        # 判断mid对应小时级视频列表 时间记录
-        h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX}{self.mid}"
+    def update_mid_data(self, h_recall_mid_key, h_record_key, key_prefix):
         # 判断当前小时的小时级列表是否更新
         now_date = datetime.today()
         h = datetime.now().hour
         now_dt = datetime.strftime(now_date, '%Y%m%d')
-        if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
-            key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{province_code}."
-        elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
-            key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_24H
-        else:
-            key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H
         now_h_recall_key = f"{key_prefix}{self.rule_key}.{now_dt}.{h}"
         if self.redis_helper.key_exists(key_name=now_h_recall_key):
             flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
@@ -83,14 +73,32 @@ class PoolRecall(object):
                 value = {'date': redis_dt, 'h': redis_h}
                 self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=2*3600)
 
-    def get_mid_h_key(self, province_code):
-        # mid对应小时级视频列表 redis-key
-        h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.mid}"
-        # 判断mid对应小时级视频列表 时间记录
-        h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX}{self.mid}"
+    def get_mid_h_key(self, province_code, key_flag=''):
+        if key_flag == 'region_24h':
+            # mid对应小时级视频列表 redis-key
+            h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H}{self.mid}"
+            # 判断mid对应小时级视频列表 时间记录
+            h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H}{self.mid}"
+        else:
+            # mid对应小时级视频列表 redis-key
+            h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.mid}"
+            # 判断mid对应小时级视频列表 时间记录
+            h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX}{self.mid}"
+
+        # 列表存储 redis-key prefix
+        if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
+            if key_flag == 'region_24h':
+                key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{province_code}."
+            else:
+                key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{province_code}."
+        elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
+            key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_24H
+        else:
+            key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H
+
         if not self.redis_helper.key_exists(key_name=h_record_key):
             # ###### 记录key不存在,copy列表,更新记录
-            self.update_mid_data(province_code=province_code)
+            self.update_mid_data(h_recall_mid_key=h_recall_mid_key, h_record_key=h_record_key, key_prefix=key_prefix)
             # return h_recall_mid_key
         else:
             # ###### 记录key存在,判断date, h
@@ -107,12 +115,6 @@ class PoolRecall(object):
                 # return h_recall_mid_key
             elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
                 # 记录的h - 当前h = 1,判断当前h数据是否已更新
-                if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
-                    key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{province_code}."
-                elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
-                    key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_24H
-                else:
-                    key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H
                 now_h_recall_key = f"{key_prefix}{self.rule_key}.{now_dt}.{h}"
                 # if not self.redis_helper.key_exists(key_name=now_h_recall_key):
                     # 未更新
@@ -127,7 +129,8 @@ class PoolRecall(object):
                         self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(new_record), expire_time=2*3600)
                     # return h_recall_mid_key
             else:
-                self.update_mid_data(province_code=province_code)
+                self.update_mid_data(h_recall_mid_key=h_recall_mid_key, h_record_key=h_record_key, key_prefix=key_prefix)
+
                 # return h_recall_mid_key
 
         return h_recall_mid_key
@@ -1030,7 +1033,8 @@ class PoolRecall(object):
                  gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)]
         else:
             t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
-                 gevent.spawn(self.region_dup_recall, province_code, 1, size, expire_time),
+                 gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
+                 # gevent.spawn(self.region_dup_recall, province_code, 1, size, expire_time),
                  gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)]
         gevent.joinall(t)
         region_recall_result_list = [i.get() for i in t]
@@ -1064,15 +1068,20 @@ class PoolRecall(object):
 
         return recall_result[:size]
 
-    def rov_pool_recall_with_region_by_h(self, province_code, size=4):
+    def rov_pool_recall_with_region_by_h(self, province_code, size=4, key_flag=''):
         """
         地域分组小时级视频召回
         :param size: 视频数
         :param province_code: 省份code
+        :param key_flag:
         :return:
         """
+        if key_flag == 'region_24h':
+            push_from = config_.PUSH_FROM['rov_recall_region_24h']
+        else:
+            push_from = config_.PUSH_FROM['rov_recall_region_h']
         # 获取mid对应的小时级列表redis-key
-        h_recall_mid_key = self.get_mid_h_key(province_code=province_code)
+        h_recall_mid_key = self.get_mid_h_key(province_code=province_code, key_flag=key_flag)
         if not self.redis_helper.key_exists(h_recall_mid_key):
             recall_result = []
         else:
@@ -1103,14 +1112,14 @@ class PoolRecall(object):
                     video_score[video_id] = value[1]
                 # 过滤
                 filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-                ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, province_code)
+                ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, province_code, key_flag)
                 ge.join()
                 filtered_result = ge.get()
 
                 if filtered_result:
                     # 添加视频源参数 pushFrom, abCode
                     temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
-                                    'pushFrom': config_.PUSH_FROM['rov_recall_region_h'], 'abCode': self.ab_code}
+                                    'pushFrom': push_from, 'abCode': self.ab_code}
                                    for item in filtered_result if video_score.get(int(item)) is not None]
                     recall_result.extend(temp_result)
                     fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))