Browse Source

Merge branch 'region-h-ab' into dev

liqian 2 years ago
parent
commit
84c0b84884
4 changed files with 256 additions and 15 deletions
  1. 14 1
      config.py
  2. 12 12
      recommend.py
  3. 15 2
      video_rank.py
  4. 215 0
      video_recall.py

+ 14 - 1
config.py

@@ -108,6 +108,8 @@ class BaseConfig(object):
         'rov_recall_h': 'recall_pool_h',  # 小时级更新列表
         'rov_recall_day': 'recall_pool_day',  # 天级规则更新列表
         'old_video': 'old_video_recall',  # 老视频
+        'rov_recall_region_h': 'recall_pool_region_h',  # 地域分组小时级更新列表
+        'rov_recall_region_day': 'recall_pool_region_day',  # 地域分组天级更新列表
     }
 
     # category id mapping
@@ -150,7 +152,13 @@ class BaseConfig(object):
 
     # 小程序地域分组小时级更新结果存放 redis key前缀,完整格式:com.weiqu.video.recall.item.score.region.h.{region}.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_REGION_BY_H = 'com.weiqu.video.recall.item.score.region.h.'
-    # 小程序离线ROV模型结果与小程序地域分组小时级更新结果去重后 存放 redis key前缀,
+    # 小程序地域分组天级更新结果与小程序地域分组小时级更新结果去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup1.region.day.h.{region}.{rule_key}.{date}.{h}
+    RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H = 'com.weiqu.video.recall.hot.item.score.dup1.region.day.h.'
+    # 小程序天级更新结果与 小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup2.region.day.h.{region}.{rule_key}.{date}.{h}
+    RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H = 'com.weiqu.video.recall.hot.item.score.dup2.region.day.h.'
+    # 小程序离线ROV模型结果与 小程序天级更新结果/小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup.region.h.{region}.{rule_key}.{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP_REGION_H = 'com.weiqu.video.recall.hot.item.score.dup.region.h.'
     # 地域分组小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.region.h.item.{region}.{rule_key}
@@ -189,6 +197,8 @@ class BaseConfig(object):
     LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX = 'com.weiqu.video.rov.pool.last.pre.'
     # 用户上一次在rov召回池对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.now.{appType}.{mid}.{date}
     LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX = 'com.weiqu.video.rov.pool.last.now.'
+    # 用户上一次在region dup更新列表中对应的位置 redis key前缀,完整key格式:com.weiqu.video.region.dup.last.{dup}.{appType}.{mid}.{date}
+    LAST_VIDEO_FROM_REGION_DUP_PREFIX = 'com.weiqu.video.region.dup.last.'
 
     # 本地记录视频的可分发数,控制分发,完整key格式:com.weiqu.video.flowpool.local.distribute.count.{h}
     # LOCAL_DISTRIBUTE_COUNT_PREFIX = 'com.weiqu.video.flowpool.local.distribute.count.'
@@ -216,6 +226,9 @@ class BaseConfig(object):
     # 小时级召回池更新时间 每个小时的15分更新成功
     ROV_H_UPDATE_MINUTE = 15
 
+    # 地域分组小时级召回池更新时间
+    REGION_H_UPDATE_MINUTE = 30
+
     # 天级规则更新列表更新时间 00:30更新成功
     ROV_DAY_UPDATE_MINUTE = 0
 

+ 12 - 12
recommend.py

@@ -174,26 +174,26 @@ def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, cli
                              client_info=client_info, rule_key=rule_key, no_op_flag=no_op_flag)
     _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     # 小时级实验
-    if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()] + \
-            [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
+    if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
         t = [gevent.spawn(pool_recall.rov_pool_recall_by_h, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
+    # 地域分组实验
+    elif ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
     # 最惊奇/老好看实验
     elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
-        # if ab_code == config_.AB_CODE['rov_rank_appType_18_19']:
         t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall_18_19, size)]
-        # else:
-        #     t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time)]
     # 天级实验
     elif ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
         t = [gevent.spawn(pool_recall.rov_pool_recall_by_day, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
     # 老视频实验
-    elif ab_code in [config_.AB_CODE['old_video']]:
-        t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
-             gevent.spawn(pool_recall.flow_pool_recall, size),
-             gevent.spawn(pool_recall.old_videos_recall, size)]
+    # elif ab_code in [config_.AB_CODE['old_video']]:
+    #     t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
+    #          gevent.spawn(pool_recall.flow_pool_recall, size),
+    #          gevent.spawn(pool_recall.old_videos_recall, size)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
@@ -226,9 +226,9 @@ def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, cli
     rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=flow_pool_P)
 
     # 老视频实验
-    if ab_code in [config_.AB_CODE['old_video']]:
-        rank_result = video_rank_with_old_video(rank_result=rank_result, old_video_recall=recall_result_list[2],
-                                                size=size, top_K=top_K, old_video_index=old_video_index)
+    # if ab_code in [config_.AB_CODE['old_video']]:
+    #     rank_result = video_rank_with_old_video(rank_result=rank_result, old_video_recall=recall_result_list[2],
+    #                                             size=size, top_K=top_K, old_video_index=old_video_index)
 
     end_rank = time.time()
     log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(

+ 15 - 2
video_rank.py

@@ -26,16 +26,29 @@ def video_rank(data, size, top_K, flow_pool_P):
     # 小时级更新数据
     h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
     h_recall_rank = sorted(h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 地域分组小时级规则更新数据
+    region_h_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
+    region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # 地域分组天级规则更新数据
+    region_day_recall = [item for item in data['rov_pool_recall']
+                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
+    region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # 天级规则更新数据
     day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
     day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # ROV召回池
     rov_initial_recall = [
         item for item in data['rov_pool_recall']
-        if item.get('pushFrom') not in [config_.PUSH_FROM['rov_recall_h'], config_.PUSH_FROM['rov_recall_day']]
+        if item.get('pushFrom') not in
+           [config_.PUSH_FROM['rov_recall_h'],
+            config_.PUSH_FROM['rov_recall_region_h'],
+            config_.PUSH_FROM['rov_recall_region_day'],
+            config_.PUSH_FROM['rov_recall_day']]
     ]
     rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
-    rov_recall_rank = h_recall_rank + day_recall_rank + rov_initial_recall_rank
+    rov_recall_rank = h_recall_rank + region_h_recall_rank + region_day_recall_rank \
+                      + day_recall_rank + rov_initial_recall_rank
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
     # 对各路召回的视频进行去重

+ 215 - 0
video_recall.py

@@ -958,3 +958,218 @@ class PoolRecall(object):
         # 随机抽取 size+1 条数据
         random.shuffle(old_video_result)
         return old_video_result[:size+1]
+
+    def get_region_dup_video_last_idx_h(self, province_code, region_dup=None):
+        """获取用户上一次在 地域分组 相关去重列表中对应的位置"""
+        now_date = date.today().strftime('%Y%m%d')
+        h = datetime.now().hour
+
+        if region_dup == 1:
+            # 小程序地域分组天级更新结果与小程序地域分组小时级更新结果去重
+            key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H
+        elif region_dup == 2:
+            # 小程序天级更新结果与 小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重
+            key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H
+        else:
+            key_name_prefix = ''
+        key_name = f"{key_name_prefix}{province_code}.{self.rule_key}.{now_date}.{h}"
+        last_region_dup_key = \
+            f'{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{h}'
+
+        if not self.redis_helper.key_exists(key_name=key_name):
+            if h == 0:
+                redis_h = 23
+                redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
+            else:
+                redis_h = h - 1
+                redis_date = now_date
+            key_name = f"{key_name_prefix}{province_code}.{self.rule_key}.{redis_date}.{redis_h}"
+            last_region_dup_key = \
+                f"{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{redis_h}"
+
+            # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
+            now_m = datetime.now().minute
+            feishu_text = f"{config_.ENV_TEXT} —— appType = {self.app_type}, h = {h}, key = {key_name}, " \
+                          f"province_code = {province_code} 数据未按时更新,请及时查看解决。"
+            if now_m > config_.REGION_H_UPDATE_MINUTE:
+                send_msg_to_feishu(feishu_text)
+
+        value = self.redis_helper.get_data_from_redis(last_region_dup_key)
+        if value:
+            idx = self.redis_helper.get_index_with_data(key_name, value)
+            if not idx:
+                idx = 0
+            else:
+                idx += 1
+        else:
+            idx = 0
+        return key_name, last_region_dup_key, idx
+
+    def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
+        """
+        地域分组召回视频
+        :param size: 获取视频个数
+        :param expire_time: 末位视频记录redis过期时间
+        :return:
+        """
+        # 获取provinceCode
+        province_code = self.client_info.get('provinceCode', '-1')
+        if province_code == '':
+            province_code = '-1'
+        t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
+             gevent.spawn(self.region_dup_recall, province_code, 1, size, expire_time),
+             gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)]
+        gevent.joinall(t)
+        region_recall_result_list = [i.get() for i in t]
+        # 将已获取到的视频按顺序去重合并
+        now_video_ids = []
+        recall_result = []
+        for region_result in region_recall_result_list:
+            for video in region_result:
+                video_id = video.get('videoId')
+                if video_id not in now_video_ids:
+                    recall_result.append(video)
+                    now_video_ids.append(video_id)
+                    if len(recall_result) >= size:
+                        break
+                    else:
+                        continue
+        # 判断获取到的小时级数据数量
+        if len(recall_result) < size:
+            # 补充数据
+            rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
+            # 去重合并
+            for video in rov_recall_result:
+                vid = video.get('videoId')
+                if vid not in now_video_ids:
+                    recall_result.append(video)
+                    now_video_ids.append(vid)
+                    if len(recall_result) >= size:
+                        break
+                    else:
+                        continue
+
+        return recall_result[:size]
+
+    def rov_pool_recall_with_region_by_h(self, province_code, size=4):
+        """
+        地域分组小时级视频召回
+        :param size: 视频数
+        :param province_code: 省份code
+        :return:
+        """
+        # 获取mid对应的小时级列表redis-key
+        h_recall_mid_key = self.get_mid_h_key(province_code=province_code)
+        if not self.redis_helper.key_exists(h_recall_mid_key):
+            recall_result = []
+        else:
+            # 过滤的视频
+            fil_video_ids = []
+            recall_result = []
+            # 每次获取的视频数
+            get_size = size * 5
+            # 记录获取频次
+            freq = 0
+            while len(recall_result) < size:
+                freq += 1
+                if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                    break
+                # 获取数据
+                data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
+                                                                  start=(freq - 1) * get_size, end=freq * get_size - 1,
+                                                                  with_scores=True)
+                if not data:
+                    log_.info('地域分组小时级更新视频已取完')
+                    break
+                # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+                video_ids = []
+                video_score = {}
+                for value in data:
+                    video_id = int(value[0])
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                # 过滤
+                filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, province_code)
+                ge.join()
+                filtered_result = ge.get()
+
+                if filtered_result:
+                    # 添加视频源参数 pushFrom, abCode
+                    temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                    'pushFrom': config_.PUSH_FROM['rov_recall_region_h'], 'abCode': self.ab_code}
+                                   for item in filtered_result if video_score.get(int(item)) is not None]
+                    recall_result.extend(temp_result)
+                    fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
+                else:
+                    fil_video_ids.extend(video_ids)
+            # 将被过滤的视频进行移除
+            for value in fil_video_ids:
+                self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
+
+        return recall_result[:size]
+
+    def region_dup_recall(self, province_code, region_dup, size=4, expire_time=23*3600):
+        """
+        region dup 更新列表视频召回
+        :param province_code:
+        :param region_dup:
+        :param size:
+        :param expire_time:
+        :return:
+        """
+        if region_dup == 1:
+            push_from = config_.PUSH_FROM['rov_recall_region_day']
+        elif region_dup == 2:
+            push_from = config_.PUSH_FROM['rov_recall_day']
+
+        # 获取region dup更新列表相关redis key, 用户上一次在列表对应的位置
+        key_name, last_region_dup_key, idx = self.get_region_dup_video_last_idx_h(
+            province_code=province_code, region_dup=region_dup)
+        # 获取天级规则更新列表数据
+        if not key_name:
+            log_.info(f'region dup 更新列表中无视频, region_dup = {region_dup}')
+            recall_result = []
+        else:
+            recall_result = []
+            # 每次获取的视频数
+            get_size = size * 5
+            # 记录获取频次
+            freq = 0
+            while len(recall_result) < size:
+                freq += 1
+                if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                    break
+                # 获取数据
+                data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
+                                                                  start=idx, end=idx + get_size - 1,
+                                                                  with_scores=True)
+                if not data:
+                    log_.info(f'region dup 更新视频已取完, region_dup = {region_dup}')
+                    break
+                # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+                video_ids = []
+                video_score = {}
+                for value in data:
+                    video_id = int(value[0])
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                # 过滤
+                filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                ge = gevent.spawn(filter_.filter_videos)
+                ge.join()
+                filtered_result = ge.get()
+
+                if filtered_result:
+                    # 添加视频源参数 pushFrom, abCode
+                    temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                    'pushFrom': push_from, 'abCode': self.ab_code}
+                                   for item in filtered_result if video_score.get(int(item)) is not None]
+                    recall_result.extend(temp_result)
+                else:
+                    # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
+                    self.redis_helper.set_data_to_redis(key_name=last_region_dup_key, value=data[-1][0],
+                                                        expire_time=expire_time)
+                idx += get_size
+
+        return recall_result[:size]