Browse Source

add ab-test: 095

liqian 2 years ago
parent
commit
13817afe09
4 changed files with 193 additions and 3 deletions
  1. 17 2
      config.py
  2. 15 0
      recommend.py
  3. 6 1
      video_rank.py
  4. 155 0
      video_recall.py

+ 17 - 2
config.py

@@ -58,6 +58,7 @@ class BaseConfig(object):
         'region_rule_rank4_appType_0_data2': '084',
         'region_rule_rank4_appType_19_data2': '089',
         'region_rule_rank4_appType_19_data3': '090',
+        'region_rule_rank5_appType_0_data1': '095',
     }
 
     # abTest
@@ -104,6 +105,7 @@ class BaseConfig(object):
             'region_rule_rank4_appType_0_data2': 60007,
             'region_rule_rank4_appType_19_data2': 60008,
             'region_rule_rank4_appType_19_data3': 60009,
+            'region_rule_rank5_appType_0_data1': 60010
         },  # 地域分组小时级规则实验
 
         'rank_by_24h': {
@@ -140,6 +142,7 @@ class BaseConfig(object):
         'region_rule_rank4_appType_0_data2': {'rule_key': 'rule3', 'data_key': 'data2'},
         'region_rule_rank4_appType_19_data2': {'rule_key': 'rule3', 'data_key': 'data2'},
         'region_rule_rank4_appType_19_data3': {'rule_key': 'rule3', 'data_key': 'data3'},
+        'region_rule_rank5_appType_0_data1': {'rule_key': 'rule4', 'data_key': 'data1'},
     }
 
     # 小程序地域分组小时级列表key不同实验标识
@@ -171,6 +174,7 @@ class BaseConfig(object):
         'rov_recall_region_h': 'recall_pool_region_h',  # 地域分组小时级更新列表
         'rov_recall_region_day': 'recall_pool_region_day',  # 地域分组天级更新列表
         'rov_recall_region_24h': 'recall_pool_region_24h',  # 地域分组小时级更新24h列表
+        'rov_recall_24h_dup': 'rov_recall_24h_dup',  # 小时级更新24h筛选后剩余数据列表
         'top_video_relevant_appType_19': 'relevant_video',  # 相似视频
         'whole_movies': 'whole_movies',  # 完整影视
         'talk_videos': 'talk_videos',  # 影视解说
@@ -215,6 +219,11 @@ class BaseConfig(object):
     # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.region.24h.record.mid.{appType}.{mid}
     H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H = 'com.weiqu.video.h.region.24h.record.mid.'
 
+    # 用户上一次在dup3 24h更新列表中对应的位置 redis key前缀,完整key格式:com.weiqu.video.region.dup3.24h.last.item.{appType}.{mid}
+    LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX = 'com.weiqu.video.region.dup3.24h.last.item.'
+    # 记录 mid-上一次在dup3 24h更新列表中对应的位置key 中数据所属(date,h),完整格式:com.weiqu.video.h.region.24h.last.record.mid.{appType}.{mid}
+    RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_24H = 'com.weiqu.video.h.region.24h.last.record.mid.'
+
     # 小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.h.item.{rule_key}
     H_VIDEO_FILER = 'com.weiqu.video.filter.h.item.'
 
@@ -251,6 +260,9 @@ class BaseConfig(object):
     # 小程序24h更新结果与 小程序地域分组24h更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup2.apptype.region.24h.h.{region}.{appType}.{data_key}.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H = 'com.weiqu.video.recall.hot.item.score.dup2.apptype.region.24h.h.'
+    # 小程序小时级24h数据 筛选后的剩余数据 更新结果 与 小程序24h更新结果/小程序地域分组24h更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup3.apptype.region.24h.h.{region}.{appType}.{data_key}.{rule_key}.{date}.{h}
+    RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H = 'com.weiqu.video.recall.hot.item.score.dup3.apptype.region.24h.h.'
     # 小程序离线ROV模型结果与 小程序天级更新结果/小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup.apptype.region.h.{region}.{appType}.{data_key}.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP_REGION_H = 'com.weiqu.video.recall.hot.item.score.dup.apptype.region.h.'
@@ -385,13 +397,15 @@ class BaseConfig(object):
                 {"dataListCode": 1, "keyPrefix": RECALL_KEY_NAME_PREFIX_REGION_BY_H},
                 {"dataListCode": 2, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H},
                 {"dataListCode": 3, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
-                {"dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP_REGION_H},
+                {"dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H},
+                {"dataListCode": 5, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP_REGION_H},
             ]
         },
         1: {"dataListDesc": "地域小时级", "dataListCode": 1, "keyPrefix": RECALL_KEY_NAME_PREFIX_REGION_BY_H},
         2: {"dataListDesc": "地域相对24小时级", "dataListCode": 2, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H},
         3: {"dataListDesc": "非地域相对24小时级", "dataListCode": 3, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
-        4: {"dataListDesc": "大列表", "dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP_REGION_H},
+        4: {"dataListDesc": "非地域相对24小时级列表2", "dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
+        5: {"dataListDesc": "大列表", "dataListCode": 5, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP_REGION_H},
     }
     VIDEO_DATA_LIST_AB_EXP_CODE_MAPPING = {
         # "055": {"app_type": 0, "data_key": "data1", "rule_key": "rule2"},
@@ -407,6 +421,7 @@ class BaseConfig(object):
         "084": {"app_type": 0, "data_key": "data2", "rule_key": "rule3"},
         "089": {"app_type": 19, "data_key": "data2", "rule_key": "rule3"},
         "090": {"app_type": 19, "data_key": "data3", "rule_key": "rule3"},
+        "095": {"app_type": 0, "data_key": "data1", "rule_key": "rule4"},
     }
     REGION_CODE = {
         '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000',

+ 15 - 0
recommend.py

@@ -388,6 +388,14 @@ def update_redis_data(result, app_type, mid, last_rov_recall_key, top_K, expire_
                                                    expire_time=expire_time)
                 # log_.info('last video redis update success!')
 
+            # 将此次获取的 相对24h筛选后剩余数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
+            rov_recall_24h_dup_video = [item['videoId'] for item in result[:top_K]
+                                        if item['pushFrom'] == config_.PUSH_FROM['rov_recall_24h_dup']]
+            if len(rov_recall_24h_dup_video) > 0:
+                last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX}{app_type}.{mid}'
+                redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup_video[-1],
+                                               expire_time=expire_time)
+
         # 将此次分发的流量池视频,对 本地分发数-1 进行记录
         if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
             flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
@@ -638,6 +646,13 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, page_type=0)
             data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data3'].get('data_key')
             no_op_flag = True
 
+        elif config_.AB_EXP_CODE['region_rule_rank5_appType_0_data1'] in ab_exp_code_list:
+            ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank5_appType_0_data1')
+            expire_time = 3600
+            rule_key = config_.RULE_KEY_REGION['region_rule_rank5_appType_0_data1'].get('rule_key')
+            data_key = config_.RULE_KEY_REGION['region_rule_rank5_appType_0_data1'].get('data_key')
+            no_op_flag = True
+
         else:
             ab_code = config_.AB_CODE['initial']
             expire_time = 24 * 3600

+ 6 - 1
video_rank.py

@@ -56,6 +56,10 @@ def video_rank(data, size, top_K, flow_pool_P):
     rule_24h_recall = [item for item in data['rov_pool_recall']
                        if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
     rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 相对24h规则筛选后剩余更新数据
+    rule_24h_dup_recall = [item for item in data['rov_pool_recall']
+                           if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
+    rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # 天级规则更新数据
     day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
     day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
@@ -69,6 +73,7 @@ def video_rank(data, size, top_K, flow_pool_P):
             config_.PUSH_FROM['rov_recall_region_24h'],
             config_.PUSH_FROM['rov_recall_region_day'],
             config_.PUSH_FROM['rov_recall_24h'],
+            config_.PUSH_FROM['rov_recall_24h_dup'],
             config_.PUSH_FROM['rov_recall_day'],
             config_.PUSH_FROM['whole_movies'],
             config_.PUSH_FROM['talk_videos']]
@@ -76,7 +81,7 @@ def video_rank(data, size, top_K, flow_pool_P):
     rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     rov_recall_rank = whole_movies_recall_rank + talk_videos_recall_rank + h_recall_rank + \
                       region_h_recall_rank + region_24h_recall_rank + region_day_recall_rank + \
-                      rule_24h_recall_rank + day_recall_rank + rov_initial_recall_rank
+                      rule_24h_recall_rank + rule_24h_dup_recall_rank + day_recall_rank + rov_initial_recall_rank
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
     # 对各路召回的视频进行去重

+ 155 - 0
video_recall.py

@@ -1182,6 +1182,15 @@ class PoolRecall(object):
             else:
                 t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
                      gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)]
+        elif self.ab_code == config_.AB_CODE['region_rank_by_h'].get('region_rule_rank5_appType_0_data1'):
+            if province_code == '-1':
+                t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
+                     gevent.spawn(self.recall_region_dup_24h, province_code, size)]
+            else:
+                t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
+                     gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
+                     gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
+                     gevent.spawn(self.recall_region_dup_24h, province_code, size)]
         else:
             if province_code == '-1':
                 t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h')]
@@ -1577,3 +1586,149 @@ class PoolRecall(object):
 
         return recall_result[:size]
 
+
+    def update_last_video_record(self, record_key, pool_key_prefix, province_code):
+        # 判断当前小时的小时级列表是否更新
+        now_date = datetime.today()
+        h = datetime.now().hour
+        now_dt = datetime.strftime(now_date, '%Y%m%d')
+        now_pool_recall_key = f"{pool_key_prefix}{province_code}.{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
+        if self.redis_helper.key_exists(key_name=now_pool_recall_key):
+            value = {'date': now_dt, 'h': h}
+            self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
+        else:
+            if h == 0:
+                redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
+                redis_h = 23
+            else:
+                redis_dt = now_dt
+                redis_h = h - 1
+            now_pool_recall_key = f"{pool_key_prefix}{province_code}.{self.app_type}.{self.data_key}.{self.rule_key}.{redis_dt}.{redis_h}"
+            value = {'date': redis_dt, 'h': redis_h}
+            self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=2 * 3600)
+        return now_pool_recall_key
+
+    def get_video_idx(self, pool_recall_key):
+        """
+        获取上次视频所在位置
+        :param pool_recall_key: 视频所在列表 key
+        :return: idx
+        """
+        last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX}{self.app_type}.{self.mid}'
+        value = self.redis_helper.get_data_from_redis(last_video_key)
+        if value:
+            idx = self.redis_helper.get_index_with_data(key_name=pool_recall_key, value=value)
+            if not idx:
+                idx = 0
+            else:
+                idx += 1
+        else:
+            idx = 0
+        return idx
+
+    def get_last_recommend_video_idx(self, province_code):
+        # 判断mid对应上一次视频位置 时间记录
+        record_key = f"{config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP3_24H}{self.app_type}.{self.mid}"
+        pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
+        if not self.redis_helper.key_exists(key_name=record_key):
+            # ###### 记录key不存在
+            idx = 0
+            pool_recall_key = self.update_last_video_record(record_key=record_key, pool_key_prefix=pool_key_prefix,
+                                                            province_code=province_code)
+        else:
+            # ###### 记录key存在,判断date, h
+            now_date = datetime.today()
+            h = datetime.now().hour
+            # 获取记录的date, h
+            record = self.redis_helper.get_data_from_redis(key_name=record_key)
+            record_dt = eval(record).get('date')
+            record_h = eval(record).get('h')
+            now_dt = datetime.strftime(now_date, '%Y%m%d')
+            if record_dt == now_dt and int(record_h) == h:
+                # 已获取当前小时数据
+                pool_recall_key = f"{pool_key_prefix}{province_code}.{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
+                idx = self.get_video_idx(pool_recall_key=pool_recall_key)
+            elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
+                # 记录的h - 当前h = 1,判断当前h数据是否已更新
+                now_pool_recall_key = f"{pool_key_prefix}{province_code}.{self.app_type}.{self.data_key}.{self.rule_key}.{now_dt}.{h}"
+                if self.redis_helper.key_exists(key_name=now_pool_recall_key):
+                    new_record = {'date': now_dt, 'h': h}
+                    self.redis_helper.set_data_to_redis(key_name=record_key, value=str(new_record), expire_time=2*3600)
+                    idx = 0
+                    pool_recall_key = now_pool_recall_key
+                else:
+                    pool_recall_key = f"{pool_key_prefix}{province_code}.{self.app_type}.{self.data_key}.{self.rule_key}.{record_dt}.{record_h}"
+                    idx = self.get_video_idx(pool_recall_key=pool_recall_key)
+            else:
+                idx = 0
+                pool_recall_key = self.update_last_video_record(record_key=record_key, pool_key_prefix=pool_key_prefix,
+                                                                province_code=province_code)
+
+        return pool_recall_key, idx
+
+    def recall_region_dup_24h(self, province_code, size=4, expire_time=2*3600, push_from=config_.PUSH_FROM['rov_recall_24h_dup']):
+        """
+        从小程序小时级24h数据 筛选后的剩余数据 更新结果中获取视频
+        :param size: 获取视频个数
+        :param expire_time: 末位视频记录redis过期时间
+        :param push_from: 视频来源标记
+        :return:
+        """
+        start_time = time.time()
+        # 获取相关redis key, 用户上一次在rov召回池对应的位置
+        pool_key, idx = self.get_last_recommend_video_idx(province_code=province_code)
+        if not pool_key:
+            return []
+        pool_recall_result = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 记录获取频次
+        freq = 0
+        while len(pool_recall_result) < size:
+            freq += 1
+            if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                break
+            # 获取数据
+            data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
+                                                              start=idx, end=idx + get_size - 1,
+                                                              with_scores=True)
+            if not data:
+                break
+            # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+            video_ids = []
+            video_score = {}
+            for value in data:
+                video_id = int(value[0])
+                video_ids.append(video_id)
+                video_score[video_id] = value[1]
+            # 过滤
+            filter_ = FilterVideos(request_id=self.request_id,
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+            ge = gevent.spawn(filter_.filter_videos)
+            ge.join()
+            filtered_result = ge.get()
+
+            if filtered_result:
+                # 添加视频源参数 pushFrom, abCode
+                temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                'pushFrom': push_from, 'abCode': self.ab_code}
+                               for item in filtered_result if video_score.get(int(item)) is not None]
+                pool_recall_result.extend(temp_result)
+            else:
+                # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
+                if self.mid:
+                    # mid为空时,不做记录
+                    last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX}{self.app_type}.{self.mid}'
+                    self.redis_helper.set_data_to_redis(key_name=last_video_key, value=data[-1][0],
+                                                        expire_time=expire_time)
+            idx += get_size
+
+        pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
+
+        log_.info({
+            'logTimestamp': int(time.time() * 1000),
+            'request_id': self.request_id,
+            'operation': 'rov_recall_24h_dup',
+            'executeTime': (time.time() - start_time) * 1000
+        })
+        return pool_recall_result[:size]