Browse Source

opt rule rank by h

liqian 3 years ago
parent
commit
2ad096a140
5 changed files with 190 additions and 24 deletions
  1. 6 2
      config.py
  2. 6 1
      recommend.py
  3. 43 0
      utils.py
  4. 8 1
      video_rank.py
  5. 127 20
      video_recall.py

+ 6 - 2
config.py

@@ -40,6 +40,7 @@ class BaseConfig(object):
         'bottom_last': 'bottom_strategy_last',  # 二层兜底
         'position_insert': 'position_insert',  # 按位置插入
         'relevant_video_op': 'relevant_video_op',  # 相关推荐强插
+        'rov_recall_h': 'recall_pool_h',  # 小时级更新列表
     }
 
     # category id mapping
@@ -55,8 +56,8 @@ class BaseConfig(object):
     # ROV召回池redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.{date}
     RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
 
-    # 小程序小时级更新结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.h.{date}.{h}
-    RECALL_KEY_NAME_PREFIX_BY_H = 'com.weiqu.video.recall.hot.item.score.h.'
+    # 小程序小时级更新结果存放 redis key前缀,完整格式:com.weiqu.video.recall.item.score.h.{date}.{h}
+    RECALL_KEY_NAME_PREFIX_BY_H = 'com.weiqu.video.recall.item.score.h.'
 
     # 小程序离线ROV模型结果与小程序小时级更新结果去重后 存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.dup.h.{date}.{h}
     RECALL_KEY_NAME_PREFIX_DUP_H = 'com.weiqu.video.recall.hot.item.score.dup.h.'
@@ -66,6 +67,9 @@ class BaseConfig(object):
     # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.record.mid.{mid}
     H_WITH_MID_RECORD_KEY_NAME_PREFIX = 'com.weiqu.video.h.record.mid.'
 
+    # 小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.h.item
+    H_VIDEO_FILER = 'com.weiqu.video.filter.h.item'
+
     # app应用 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.app.{date}
     RECALL_KEY_NAME_PREFIX_APP = 'com.weiqu.video.recall.hot.item.score.app.'
 

+ 6 - 1
recommend.py

@@ -173,7 +173,12 @@ def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, cli
     pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
                              client_info=client_info)
     _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
-    t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)]
+    if ab_code == config_.AB_CODE['rank_by_h']:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_by_h, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
+    else:
+        t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
     gevent.joinall(t)
     recall_result_list = [i.get() for i in t]
 

+ 43 - 0
utils.py

@@ -148,6 +148,49 @@ class FilterVideos(object):
         self.uid = uid
         self.video_ids = video_ids
 
+    def filter_video_status_h(self, video_ids):
+        """召回小时级更新的视频状态过滤"""
+        # 根据Redis缓存中的数据过滤
+        redis_helper = RedisHelper()
+        # 获取不符合推荐状态的视频
+        filter_videos_list = redis_helper.get_data_from_set(key_name=config_.H_VIDEO_FILER)
+        if not filter_videos_list:
+            return video_ids
+        filter_videos = [int(video) for video in filter_videos_list]
+        filtered_videos = [video_id for video_id in video_ids if video_id not in filter_videos]
+        return filtered_videos
+
+    def filter_videos_h(self):
+        """召回小时级更新的视频过滤"""
+        # 预曝光过滤
+        st_pre = time.time()
+        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        et_pre = time.time()
+        log_.info('filter by previewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format(
+            self.app_type, self.mid, self.uid, filtered_pre_result, (et_pre - st_pre) * 1000))
+        if not filtered_pre_result:
+            return None
+
+        # 视频状态过滤
+        st_status = time.time()
+        filtered_status_result = self.filter_video_status_h(video_ids=filtered_pre_result)
+        et_status = time.time()
+        log_.info('filter by video status: result = {}, execute time = {}ms'.format(
+            filtered_status_result, (et_status - st_status) * 1000))
+        if not filtered_status_result:
+            return None
+
+        # 视频已曝光过滤
+        st_viewed = time.time()
+        filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
+        et_viewed = time.time()
+        log_.info('filter by viewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format(
+            self.app_type, self.mid, self.uid, filtered_viewed_result, (et_viewed - st_viewed) * 1000))
+        if not filtered_viewed_result:
+            return None
+        else:
+            return [int(video_id) for video_id in filtered_viewed_result]
+
     def filter_videos(self):
         """视频过滤"""
         # 预曝光过滤

+ 8 - 1
video_rank.py

@@ -23,8 +23,15 @@ def video_rank(data, size, top_K, flow_pool_P):
     if not data['rov_pool_recall'] and not data['flow_pool_recall']:
         return None
     # 将各路召回的视频按照score从大到小排序
+    # 小时级更新数据
+    h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
+    h_recall_rank = sorted(h_recall, key=lambda k: (k.get('rovScore'), 0), reverse=True)
     # ROV召回池
-    rov_recall_rank = sorted(data['rov_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
+    rov_initial_recall = [
+        item for item in data['rov_pool_recall'] if item.get('pushFrom') != config_.PUSH_FROM['rov_recall_h']
+    ]
+    rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: (k.get('rovScore'), 0), reverse=True)
+    rov_recall_rank = h_recall_rank + rov_initial_recall_rank
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
     # 对各路召回的视频进行去重

+ 127 - 20
video_recall.py

@@ -30,7 +30,48 @@ class PoolRecall(object):
         self.client_info = client_info
         self.redis_helper = RedisHelper()
 
+    def copy_redis_zset_data(self, from_key_name, to_key_name):
+        # 获取from_key_name中的数据
+        records = self.redis_helper.get_data_zset_with_index(key_name=from_key_name, start=0, end=-1, with_scores=True)
+        if records is not None:
+            data = {}
+            for video_id, score in records:
+                data[int(video_id)] = score
+            # 重新写入
+            if self.redis_helper.key_exists(to_key_name):
+                self.redis_helper.del_keys(key_name=to_key_name)
+            self.redis_helper.add_data_with_zset(key_name=to_key_name, data=data, expire_time=2*3600)
+            return True
+        else:
+            return False
 
+    def update_mid_data(self):
+        # mid对应小时级视频列表 redis-key
+        h_recall_mid_key = f"{config_.H_WITH_MID_RECALL_KEY_NAME_PREFIX}{self.mid}"
+        # 判断mid对应小时级视频列表 时间记录
+        h_record_key = f"{config_.H_WITH_MID_RECORD_KEY_NAME_PREFIX}{self.mid}"
+        # 判断当前小时的小时级列表是否更新
+        now_date = datetime.today()
+        h = datetime.now().hour
+        now_dt = datetime.strftime(now_date, '%Y%m%d')
+        now_h_recall_key = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{now_dt}.{h}"
+        if self.redis_helper.key_exists(key_name=now_h_recall_key):
+            flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
+            if flag:
+                value = {'date': now_dt, 'h': h}
+                self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=2*3600)
+        else:
+            if h == 0:
+                redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
+                redis_h = 23
+            else:
+                redis_dt = now_dt
+                redis_h = h - 1
+            now_h_recall_key = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{redis_dt}.{redis_h}"
+            flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
+            if flag:
+                value = {'date': redis_dt, 'h': redis_h}
+                self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(value), expire_time=2*3600)
 
     def get_mid_h_key(self):
         # mid对应小时级视频列表 redis-key
@@ -40,7 +81,7 @@ class PoolRecall(object):
         if not self.redis_helper.key_exists(key_name=h_record_key):
             # ###### 记录key不存在,copy列表,更新记录
             self.update_mid_data()
-            return h_recall_mid_key
+            # return h_recall_mid_key
         else:
             # ###### 记录key存在,判断date, h
             now_date = datetime.today()
@@ -52,35 +93,102 @@ class PoolRecall(object):
             now_dt = datetime.strftime(now_date, '%Y%m%d')
             if record_dt == now_dt and int(record_h) == h:
                 # 已获取当前小时数据
-                return h_recall_mid_key
+                pass
+                # return h_recall_mid_key
             elif (record_dt == now_dt and h-int(record_h) == 1) or (h == 0 and int(record_h) == 23):
                 # 记录的h - 当前h = 1,判断当前h数据是否已更新
                 now_h_recall_key = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{now_dt}.{h}"
-                if not self.redis_helper.key_exists(key_name=now_h_recall_key):
+                # if not self.redis_helper.key_exists(key_name=now_h_recall_key):
                     # 未更新
-                    return h_recall_mid_key
-                else:
+                    # return h_recall_mid_key
+                if self.redis_helper.key_exists(key_name=now_h_recall_key):
                     # 已更新,重新获取更新mid对应列表及记录
-                    self.redis_helper.del_keys(key_name=h_recall_mid_key)
-                    self.redis_helper.del_keys(key_name=h_record_key)
-                    ?????
-
-
-
-
-
-
-
+                    # self.redis_helper.del_keys(key_name=h_recall_mid_key)
+                    # self.redis_helper.del_keys(key_name=h_record_key)
+                    flag = self.copy_redis_zset_data(from_key_name=now_h_recall_key, to_key_name=h_recall_mid_key)
+                    if flag:
+                        new_record = {'date': now_dt, 'h': h}
+                        self.redis_helper.set_data_to_redis(key_name=h_record_key, value=str(new_record), expire_time=2*3600)
+                    # return h_recall_mid_key
+            else:
+                self.update_mid_data()
+                # return h_recall_mid_key
 
+        return h_recall_mid_key
 
-    def rov_h_pool_recall(self, size=10, expire_time=24*3600):
+    def rov_pool_recall_by_h(self, size=10, expire_time=24*3600):
         """
         从小时级更新ROV召回池中获取视频
         :param size: 获取视频个数
         :param expire_time: 末位视频记录redis过期时间
         :return:
         """
-        pass
+        # 获取mid对应的小时级列表redis-key
+        h_recall_mid_key = self.get_mid_h_key()
+        if not self.redis_helper.key_exists(h_recall_mid_key):
+            recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
+        else:
+            # 过滤的视频
+            fil_video_ids = []
+            recall_result = []
+            # 每次获取的视频数
+            get_size = size * 2
+            # 记录获取频次
+            freq = 0
+            while len(recall_result) < size:
+                freq += 1
+                if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                    break
+                # 获取数据
+                data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
+                                                                  start=(freq - 1) * get_size, end=freq * get_size - 1,
+                                                                  with_scores=True)
+                if not data:
+                    log_.info('小时级更新视频已取完')
+                    break
+                # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+                video_ids = []
+                video_score = {}
+                for value in data:
+                    video_id = int(value[0])
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                # 过滤
+                filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                ge = gevent.spawn(filter_.filter_videos_h)
+                ge.join()
+                filtered_result = ge.get()
+
+                if filtered_result:
+                    # 添加视频源参数 pushFrom, abCode
+                    temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                    'pushFrom': config_.PUSH_FROM['rov_recall_h'], 'abCode': self.ab_code}
+                                   for item in filtered_result if video_score.get(int(item)) is not None]
+                    recall_result.extend(temp_result)
+                    fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
+                else:
+                    fil_video_ids.extend(video_ids)
+            # 将被过滤的视频进行移除
+            for value in fil_video_ids:
+                self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
+
+            # 判断获取到的小时级数据数量
+            if len(recall_result) < size:
+                # 补充数据
+                rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
+                # 去重合并
+                now_video_ids = [item.get('videoId') for item in recall_result]
+                for video in rov_recall_result:
+                    vid = video.get('videoId')
+                    if vid not in now_video_ids:
+                        recall_result.append(video)
+                        now_video_ids.append(vid)
+                        if len(recall_result) >= size:
+                            break
+                        else:
+                            continue
+
+        return recall_result[:size]
 
     def rov_pool_recall(self, size=10, expire_time=24*3600):
         """
@@ -543,7 +651,7 @@ class PoolRecall(object):
 
             else:
                 # 判断热度列表是否更新,未更新则使用前一小时的热度列表
-                key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{now_date}.{h}"
+                key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H}{now_date}.{h}"
                 if self.redis_helper.key_exists(key_name):
                     return key_name, h
                 else:
@@ -553,13 +661,12 @@ class PoolRecall(object):
                     else:
                         redis_h = h - 1
                         redis_date = now_date
-                    key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{redis_date}.{redis_h}"
+                    key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H}{redis_date}.{redis_h}"
                     # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
                     now_m = datetime.now().minute
                     feishu_text = '{} —— appType = {}, h = {} 数据未按时更新,请及时查看解决。'.format(
                         config_.ENV_TEXT, self.app_type, h)
                     if now_m > config_.ROV_H_UPDATE_MINUTE:
-                        # 0<=h<8时,数据不做更新,不做报警
                         send_msg_to_feishu(feishu_text)
                     return key_name, redis_h