Parcourir la source

add rule_rank_24h data in rank_by_h ab test

liqian il y a 2 ans
Parent
commit
45ba3080ba
6 fichiers modifiés avec 139 ajouts et 5 suppressions
  1. 9 1
      config.py
  2. 5 2
      recommend.py
  3. 2 0
      user2new.py
  4. 2 0
      utils.py
  5. 6 1
      video_rank.py
  6. 115 1
      video_recall.py

+ 9 - 1
config.py

@@ -120,6 +120,7 @@ class BaseConfig(object):
         'position_insert': 'position_insert',  # 按位置插入
         'relevant_video_op': 'relevant_video_op',  # 相关推荐强插
         'rov_recall_h': 'recall_pool_h',  # 小时级更新列表
+        'rov_recall_24h': 'recall_pool_24h',  # 小时级更新列表
         'rov_recall_day': 'recall_pool_day',  # 天级规则更新列表
         'old_video': 'old_video_recall',  # 老视频
         'rov_recall_region_h': 'recall_pool_region_h',  # 地域分组小时级更新列表
@@ -143,7 +144,9 @@ class BaseConfig(object):
 
     # 小程序小时级更新结果存放 redis key前缀,完整格式:com.weiqu.video.recall.item.score.h.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_BY_H = 'com.weiqu.video.recall.item.score.h.'
-
+    # 小程序相对24h数据更新结果与 小程序小时级更新结果 去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup.24h.h.{rule_key}.{date}.{h}
+    RECALL_KEY_NAME_PREFIX_DUP_24H_H = 'com.weiqu.video.recall.hot.item.score.dup.24h.h.'
     # 小程序离线ROV模型结果与小程序小时级更新结果去重后 存放 redis key前缀,
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup.h.{rule_key}{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP_H = 'com.weiqu.video.recall.hot.item.score.dup.h.'
@@ -153,6 +156,11 @@ class BaseConfig(object):
     # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.record.mid.{appType}.{mid}
     H_WITH_MID_RECORD_KEY_NAME_PREFIX = 'com.weiqu.video.h.record.mid.'
 
+    # 每个mid存储对应小时级更新结果 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.h.24h.mid.{appType}.{mid}
+    H_WITH_MID_RECALL_KEY_NAME_PREFIX_24H = 'com.weiqu.video.recall.hot.item.score.h.24h.mid.'
+    # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.24h.record.mid.{appType}.{mid}
+    H_WITH_MID_RECORD_KEY_NAME_PREFIX_24H = 'com.weiqu.video.h.24h.record.mid.'
+
     # 每个mid存储对应小时级更新结果 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.h.region.24h.mid.{appType}.{mid}
     H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H = 'com.weiqu.video.recall.hot.item.score.h.region.24h.mid.'
     # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.region.24h.record.mid.{appType}.{mid}

+ 5 - 2
recommend.py

@@ -174,8 +174,11 @@ def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, cli
                              client_info=client_info, rule_key=rule_key, no_op_flag=no_op_flag)
     _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     # 小时级实验
-    if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()] + \
-            [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
+    if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
+        t = [gevent.spawn(pool_recall.rule_recall_by_h, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
+    # 小时级实验
+    elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
         t = [gevent.spawn(pool_recall.rov_pool_recall_by_h, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
     # 地域分组实验

+ 2 - 0
user2new.py

@@ -31,6 +31,8 @@ def user2new(app_type, mid, uid):
         h_key_prefix_list = [
             config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX,
             config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX,
+            config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_24H,
+            config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_24H,
             config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H,
             config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H
         ]

+ 2 - 0
utils.py

@@ -160,6 +160,8 @@ class FilterVideos(object):
                 key_prefix = f"{config_.REGION_H_VIDEO_FILER}{province_code}."
         elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
             key_prefix = config_.H_VIDEO_FILER_24H
+        elif key_flag == '24h':
+            key_prefix = config_.H_VIDEO_FILER_24H
         else:
             key_prefix = config_.H_VIDEO_FILER
         filter_videos_list = redis_helper.get_data_from_set(key_name=f"{key_prefix}{rule_key}")

+ 6 - 1
video_rank.py

@@ -39,6 +39,10 @@ def video_rank(data, size, top_K, flow_pool_P):
     region_day_recall = [item for item in data['rov_pool_recall']
                          if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
     region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 相对24h规则更新数据
+    rule_24h_recall = [item for item in data['rov_pool_recall']
+                       if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
+    rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # 天级规则更新数据
     day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
     day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
@@ -50,12 +54,13 @@ def video_rank(data, size, top_K, flow_pool_P):
             config_.PUSH_FROM['rov_recall_region_h'],
             config_.PUSH_FROM['rov_recall_region_24h'],
             config_.PUSH_FROM['rov_recall_region_day'],
+            config_.PUSH_FROM['rov_recall_24h'],
             config_.PUSH_FROM['rov_recall_day']]
     ]
     rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     rov_recall_rank = h_recall_rank + \
                       region_h_recall_rank + region_24h_recall_rank + region_day_recall_rank + \
-                      day_recall_rank + rov_initial_recall_rank
+                      rule_24h_recall_rank + day_recall_rank + rov_initial_recall_rank
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
     # 对各路召回的视频进行去重

+ 115 - 1
video_recall.py

@@ -79,6 +79,11 @@ class PoolRecall(object):
             h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_REGION_24H}{self.app_type}.{self.mid}"
             # 判断mid对应小时级视频列表 时间记录
             h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H}{self.app_type}.{self.mid}"
+        elif key_flag == '24h':
+            # mid对应小时级视频列表 redis-key
+            h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX_24H}{self.app_type}.{self.mid}"
+            # 判断mid对应小时级视频列表 时间记录
+            h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX_24H}{self.app_type}.{self.mid}"
         else:
             # mid对应小时级视频列表 redis-key
             h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.app_type}.{self.mid}"
@@ -96,6 +101,8 @@ class PoolRecall(object):
                 key_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{province_code}."
         elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
             key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_24H
+        elif key_flag == '24h':
+            key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H
         else:
             key_prefix = config_.RECALL_KEY_NAME_PREFIX_BY_H
 
@@ -149,6 +156,10 @@ class PoolRecall(object):
         province_code = self.client_info.get('provinceCode', '-1')
         if province_code == '':
             province_code = '-1'
+        if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
+            push_from = config_.PUSH_FROM['rov_recall_24h']
+        else:
+            push_from = config_.PUSH_FROM['rov_recall_h']
         # 获取mid对应的小时级列表redis-key
         h_recall_mid_key = self.get_mid_h_key(province_code=province_code)
         if not self.redis_helper.key_exists(h_recall_mid_key):
@@ -188,7 +199,7 @@ class PoolRecall(object):
                 if filtered_result:
                     # 添加视频源参数 pushFrom, abCode
                     temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
-                                    'pushFrom': config_.PUSH_FROM['rov_recall_h'], 'abCode': self.ab_code}
+                                    'pushFrom': push_from, 'abCode': self.ab_code}
                                    for item in filtered_result if video_score.get(int(item)) is not None]
                     recall_result.extend(temp_result)
                     fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
@@ -1210,3 +1221,106 @@ class PoolRecall(object):
                 idx += get_size
 
         return recall_result[:size]
+
+    def rule_recall_by_h(self, size=4, expire_time=24*3600):
+        """
+        小时级召回视频
+        :param size: 获取视频个数
+        :param expire_time: 末位视频记录redis过期时间
+        :return:
+        """
+        t = [gevent.spawn(self.rov_pool_recall_h, size),
+             gevent.spawn(self.rov_pool_recall_h, size, '24h')]
+        gevent.joinall(t)
+        h_recall_result_list = [i.get() for i in t]
+        # 将已获取到的视频按顺序去重合并
+        now_video_ids = []
+        recall_result = []
+        for h_result in h_recall_result_list:
+            for video in h_result:
+                video_id = video.get('videoId')
+                if video_id not in now_video_ids:
+                    recall_result.append(video)
+                    now_video_ids.append(video_id)
+                    if len(recall_result) >= size:
+                        break
+                    else:
+                        continue
+        # 判断获取到的小时级数据数量
+        if len(recall_result) < size:
+            # 补充数据
+            rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
+            # 去重合并
+            for video in rov_recall_result:
+                vid = video.get('videoId')
+                if vid not in now_video_ids:
+                    recall_result.append(video)
+                    now_video_ids.append(vid)
+                    if len(recall_result) >= size:
+                        break
+                    else:
+                        continue
+
+        return recall_result[:size]
+
+    def rov_pool_recall_h(self, size=4, key_flag=''):
+        """
+        小时级视频召回
+        :param size: 视频数
+        :param key_flag:
+        :return:
+        """
+        if key_flag == '24h':
+            push_from = config_.PUSH_FROM['rov_recall_24h']
+        else:
+            push_from = config_.PUSH_FROM['rov_recall_h']
+        # 获取mid对应的小时级列表redis-key
+        h_recall_mid_key = self.get_mid_h_key(province_code='', key_flag=key_flag)
+        if not self.redis_helper.key_exists(h_recall_mid_key):
+            recall_result = []
+        else:
+            # 过滤的视频
+            fil_video_ids = []
+            recall_result = []
+            # 每次获取的视频数
+            get_size = size * 5
+            # 记录获取频次
+            freq = 0
+            while len(recall_result) < size:
+                freq += 1
+                if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                    break
+                # 获取数据
+                data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
+                                                                  start=(freq - 1) * get_size, end=freq * get_size - 1,
+                                                                  with_scores=True)
+                if not data:
+                    log_.info('小时级更新视频已取完')
+                    break
+                # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+                video_ids = []
+                video_score = {}
+                for value in data:
+                    video_id = int(value[0])
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                # 过滤
+                filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, '', key_flag)
+                ge.join()
+                filtered_result = ge.get()
+
+                if filtered_result:
+                    # 添加视频源参数 pushFrom, abCode
+                    temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                    'pushFrom': push_from, 'abCode': self.ab_code}
+                                   for item in filtered_result if video_score.get(int(item)) is not None]
+                    recall_result.extend(temp_result)
+                    fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
+                else:
+                    fil_video_ids.extend(video_ids)
+            # 将被过滤的视频进行移除
+            for value in fil_video_ids:
+                self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
+
+        return recall_result[:size]