liqian 3 anos atrás
pai
commit
6d2dd50cf5
3 arquivos alterados com 188 adições e 4 exclusões
  1. 31 2
      config.py
  2. 11 0
      recommend.py
  3. 146 2
      video_recall.py

+ 31 - 2
config.py

@@ -27,6 +27,7 @@ class BaseConfig(object):
         'rule_rank4': '024',
         'rule_rank5': '025',
         'rov_rank_appType_19': '027',
+        'day_rule_rank1': '026',
     }
 
     # abTest
@@ -51,6 +52,9 @@ class BaseConfig(object):
             'rule_rank4': 20004,
             'rule_rank5': 20005,
         },  # 小时级别更新rov列表实验
+        'rank_by_day': {
+            'day_rule_rank1': 40001,
+        },  # 天级别规则更新rov列表实验
     }
 
     # 小程序小时级列表key不同实验标识
@@ -63,6 +67,12 @@ class BaseConfig(object):
         'rule_rank5': 'rule5',
     }
 
+    # 小程序天级别规则列表key不同实验标识
+    RULE_KEY_DAY = {
+        'initial': '',
+        'day_rule_rank1': 'rule1',
+    }
+
     # pushFrom
     PUSH_FROM = {
         'rov_recall': 'recall_pool',  # rov召回池
@@ -73,6 +83,7 @@ class BaseConfig(object):
         'position_insert': 'position_insert',  # 按位置插入
         'relevant_video_op': 'relevant_video_op',  # 相关推荐强插
         'rov_recall_h': 'recall_pool_h',  # 小时级更新列表
+        'rov_recall_day': 'recall_pool_day',  # 天级规则更新列表
     }
 
     # category id mapping
@@ -104,6 +115,15 @@ class BaseConfig(object):
     # 小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.h.item.{rule_key}
     H_VIDEO_FILER = 'com.weiqu.video.filter.h.item.'
 
+    # 小程序天级更新结果存放 redis key前缀,完整格式:com.weiqu.video.recall.item.score.day.{rule_key}.{date}
+    RECALL_KEY_NAME_PREFIX_BY_DAY = 'com.weiqu.video.recall.item.score.day.'
+    # 小程序离线ROV模型结果与小程序天级更新结果去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup.day.now.{rule_key}.{date}
+    RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW = 'com.weiqu.video.recall.hot.item.score.dup.day.now.'
+    # 使用前一天小程序离线ROV模型结果与小程序天级更新结果去重后 存放 redis key前缀,
+    # 完整格式:com.weiqu.video.recall.hot.item.score.dup.day.pre.{rule_key}.{date}
+    RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE = 'com.weiqu.video.recall.hot.item.score.dup.day.pre.'
+
     # app应用 小程序离线ROV模型结果存放 redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.app.{date}
     RECALL_KEY_NAME_PREFIX_APP = 'com.weiqu.video.recall.hot.item.score.app.'
 
@@ -128,6 +148,12 @@ class BaseConfig(object):
 
     # 用户上一次在rov召回池对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.{appType}.{mid}.{date}
     LAST_VIDEO_FROM_ROV_POOL_PREFIX = 'com.weiqu.video.rov.pool.last.'
+    # 用户上一次在天级规则更新列表中对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.{appType}.{mid}.{date}
+    LAST_VIDEO_FROM_RULE_DAY_POOL_PREFIX = 'com.weiqu.video.rule.day.pool.last.'
+    # 用户上一次在rov召回池对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.pre.{appType}.{mid}.{date}
+    LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX = 'com.weiqu.video.rov.pool.last.pre.'
+    # 用户上一次在rov召回池对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.now.{appType}.{mid}.{date}
+    LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX = 'com.weiqu.video.rov.pool.last.now.'
 
     # 本地记录视频的可分发数,控制分发,完整key格式:com.weiqu.video.flowpool.local.distribute.count.{h}
     # LOCAL_DISTRIBUTE_COUNT_PREFIX = 'com.weiqu.video.flowpool.local.distribute.count.'
@@ -155,6 +181,9 @@ class BaseConfig(object):
     # 小时级召回池更新时间 每个小时的15分更新成功
     ROV_H_UPDATE_MINUTE = 15
 
+    # 天级规则更新列表更新时间 00:30更新成功
+    ROV_DAY_UPDATE_MINUTE = 0
+
     # 置顶视频区域 为 全部 的code
     ALL_AREA_CODE = '000000'
 
@@ -396,8 +425,8 @@ class ProductionConfig(BaseConfig):
 
 def set_config():
     # 获取环境变量 ROV_SERVER_ENV
-    env = os.environ.get('ROV_SERVER_ENV')
-    # env = 'dev'
+    # env = os.environ.get('ROV_SERVER_ENV')
+    env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 11 - 0
recommend.py

@@ -182,6 +182,9 @@ def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, cli
              gevent.spawn(pool_recall.flow_pool_recall_18_19, size)]
         # else:
         #     t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time)]
+    elif ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_by_day, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall, size)]
@@ -446,6 +449,10 @@ def video_homepage_recommend(mid, uid, size, app_type, algo_type, client_info, a
                 ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank5')
                 expire_time = 3600
                 rule_key = config_.RULE_KEY['rule_rank5']
+            elif config_.AB_EXP_CODE['day_rule_rank1'] in ab_exp_code_list:
+                ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank1')
+                expire_time = 24 * 3600
+                rule_key = config_.RULE_KEY_DAY['day_rule_rank1']
             else:
                 ab_code = config_.AB_CODE['initial']
                 expire_time = 24 * 3600
@@ -548,6 +555,10 @@ def video_relevant_recommend(video_id, mid, uid, size, app_type, ab_exp_info):
             ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank5')
             expire_time = 3600
             rule_key = config_.RULE_KEY['rule_rank5']
+        elif config_.AB_EXP_CODE['day_rule_rank1'] in ab_exp_code_list:
+            ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank1')
+            expire_time = 24 * 3600
+            rule_key = config_.RULE_KEY_DAY['day_rule_rank1']
         else:
             ab_code = config_.AB_CODE['initial']
             expire_time = 24 * 3600

+ 146 - 2
video_recall.py

@@ -191,6 +191,78 @@ class PoolRecall(object):
 
         return recall_result[:size]
 
+    def rov_pool_recall_by_day(self, size=4, expire_time=24*3600):
+        """
+        从天级规则更新列表中获取视频
+        :param size: 获取视频个数
+        :param expire_time: 末位视频记录redis过期时间
+        :return:
+        """
+        # 获取天级规则更新列表相关redis key, 用户上一次在天级规则更新列表对应的位置
+        rule_key_name, last_rule_day_recall_key, idx = self.get_video_last_idx_day()
+        # 获取天级规则更新列表数据
+        if not rule_key_name:
+            log_.info('天级规则更新列表中无视频')
+            recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
+        else:
+            recall_result = []
+            # 每次获取的视频数
+            get_size = size * 5
+            # 记录获取频次
+            freq = 0
+            while len(recall_result) < size:
+                freq += 1
+                if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                    break
+                # 获取数据
+                data = self.redis_helper.get_data_zset_with_index(key_name=rule_key_name,
+                                                                  start=idx, end=idx + get_size - 1,
+                                                                  with_scores=True)
+                if not data:
+                    log_.info('天级规则更新视频已取完')
+                    break
+                # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+                video_ids = []
+                video_score = {}
+                for value in data:
+                    video_id = int(value[0])
+                    video_ids.append(video_id)
+                    video_score[video_id] = value[1]
+                # 过滤
+                filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                ge = gevent.spawn(filter_.filter_videos)
+                ge.join()
+                filtered_result = ge.get()
+
+                if filtered_result:
+                    # 添加视频源参数 pushFrom, abCode
+                    temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                    'pushFrom': config_.PUSH_FROM['rov_recall_day'], 'abCode': self.ab_code}
+                                   for item in filtered_result if video_score.get(int(item)) is not None]
+                    recall_result.extend(temp_result)
+                else:
+                    # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
+                    self.redis_helper.set_data_to_redis(key_name=last_rule_day_recall_key, value=data[-1][0],
+                                                        expire_time=expire_time)
+                idx += get_size
+
+            # 判断获取到的天级规则数据数量
+            if len(recall_result) < size:
+                # 补充数据
+                rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
+                # 去重合并
+                now_video_ids = [item.get('videoId') for item in recall_result]
+                for video in rov_recall_result:
+                    vid = video.get('videoId')
+                    if vid not in now_video_ids:
+                        recall_result.append(video)
+                        now_video_ids.append(vid)
+                        if len(recall_result) >= size:
+                            break
+                        else:
+                            continue
+        return recall_result[:size]
+
     def rov_pool_recall(self, size=10, expire_time=24*3600):
         """
         从ROV召回池中获取视频
@@ -495,12 +567,21 @@ class PoolRecall(object):
                             config_.AB_CODE['rov_rank_appType_19']] or \
                 self.app_type == config_.APP_TYPE['APP']:
             rov_pool_key, redis_date = self.get_pool_redis_key_with_h('rov')
+        elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
+            rov_pool_key, redis_date = self.get_pool_redis_key_with_day('dup')
         else:
             rov_pool_key, redis_date = self.get_pool_redis_key('rov')
         if not rov_pool_key:
             return None, None, None
-        last_rov_recall_key = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX + '{}.{}.{}'.format(
-            self.app_type, self.mid, redis_date)
+        if self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
+            now_h = datetime.now().hour
+            if now_h < 7:
+                last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX
+            else:
+                last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX
+        else:
+            last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX
+        last_rov_recall_key = f'{last_key_prefix}{self.app_type}.{self.mid}.{redis_date}'
         value = self.redis_helper.get_data_from_redis(last_rov_recall_key)
         if value:
             idx = self.redis_helper.get_index_with_data(rov_pool_key, value)
@@ -766,3 +847,66 @@ class PoolRecall(object):
             idx += get_size
 
         return flow_pool_recall_result[:size]
+
+    def get_pool_redis_key_with_day(self, pool_type):
+        """
+        拼接key,获取以天级别规则更新的视频列表
+        :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
+        :return: key_name
+        """
+        now_date = datetime.today()
+        now_dt = datetime.strftime(now_date, '%Y%m%d')
+        if pool_type == 'rov':
+            # 判断列表是否更新,未更新则使用前一天的列表
+            rule_key_name = f'{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{self.rule_key}.{now_dt}'
+            if self.redis_helper.key_exists(key_name=rule_key_name):
+                return rule_key_name, now_dt
+            else:
+                redis_dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
+                rule_key_name = f'{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{self.rule_key}.{redis_dt}'
+
+                # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
+                now_h = datetime.now().hour
+                feishu_text = f'{config_.ENV_TEXT} —— appType = {self.app_type}, date = {now_dt}, ' \
+                              f'rule_key = {self.rule_key} 数据未按时更新,请及时查看解决。'
+                if now_h > config_.ROV_DAY_UPDATE_MINUTE:
+                    send_msg_to_feishu(feishu_text)
+                return rule_key_name, redis_dt
+        elif pool_type == 'dup':
+            now_h = datetime.now().hour
+            if now_h < 7:
+                dup_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
+            else:
+                dup_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
+            dup_key_name = f'{dup_key_name_prefix}{self.rule_key}.{now_dt}'
+            if self.redis_helper.key_exists(key_name=dup_key_name):
+                return dup_key_name, now_dt
+            else:
+                redis_dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
+                dup_key_name = f'{dup_key_name_prefix}{self.rule_key}.{redis_dt}'
+                return dup_key_name, redis_dt
+
+        elif pool_type == 'flow':
+            return config_.FLOW_POOL_KEY_NAME_PREFIX + str(self.app_type)
+
+        else:
+            log_.error('pool type error')
+            return None, None
+
+    def get_video_last_idx_day(self):
+        """获取用户上一次在天级规则更新列表中对应的位置"""
+        rule_key_name, redis_dt = self.get_pool_redis_key_with_day('rov')
+        if not rule_key_name:
+            return None, None, None
+        last_rule_day_recall_key = \
+            f'{config_.LAST_VIDEO_FROM_RULE_DAY_POOL_PREFIX}{self.app_type}.{self.mid}.{redis_dt}'
+        value = self.redis_helper.get_data_from_redis(last_rule_day_recall_key)
+        if value:
+            idx = self.redis_helper.get_index_with_data(rule_key_name, value)
+            if not idx:
+                idx = 0
+            else:
+                idx += 1
+        else:
+            idx = 0
+        return rule_key_name, last_rule_day_recall_key, idx