|
@@ -958,3 +958,203 @@ class PoolRecall(object):
|
|
|
|
|
|
random.shuffle(old_video_result)
|
|
|
return old_video_result[:size+1]
|
|
|
+
|
|
|
+ def get_region_dup_video_last_idx_h(self, province_code, region_dup=None):
|
|
|
+ """获取用户上一次在 地域分组 相关去重列表中对应的位置"""
|
|
|
+ now_date = date.today().strftime('%Y%m%d')
|
|
|
+ h = datetime.now().hour
|
|
|
+
|
|
|
+ if region_dup == 1:
|
|
|
+
|
|
|
+ key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H}{province_code}.{self.rule_key}.{now_date}.{h}"
|
|
|
+ elif region_dup == 2:
|
|
|
+
|
|
|
+ key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H}{province_code}.{self.rule_key}.{now_date}.{h}"
|
|
|
+ else:
|
|
|
+ key_name = None
|
|
|
+
|
|
|
+ if not self.redis_helper.key_exists(key_name=key_name):
|
|
|
+ return None, None, None
|
|
|
+ last_region_dup_key = \
|
|
|
+ f'{config_.LAST_VIDEO_FROM_REGION_DUP_PREFIX}{region_dup}.{self.app_type}.{self.mid}.{h}'
|
|
|
+ value = self.redis_helper.get_data_from_redis(last_region_dup_key)
|
|
|
+ if value:
|
|
|
+ idx = self.redis_helper.get_index_with_data(key_name, value)
|
|
|
+ if not idx:
|
|
|
+ idx = 0
|
|
|
+ else:
|
|
|
+ idx += 1
|
|
|
+ else:
|
|
|
+ idx = 0
|
|
|
+ return key_name, last_region_dup_key, idx
|
|
|
+
|
|
|
+ def rov_pool_recall_with_region(self, size=4, expire_time=24*3600):
|
|
|
+ """
|
|
|
+ 地域分组召回视频
|
|
|
+ :param size: 获取视频个数
|
|
|
+ :param expire_time: 末位视频记录redis过期时间
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+
|
|
|
+ province_code = self.client_info.get('provinceCode', '-1')
|
|
|
+ if province_code == '':
|
|
|
+ province_code = '-1'
|
|
|
+ t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
|
|
|
+ gevent.spawn(self.region_dup_recall, province_code, 1, size, expire_time),
|
|
|
+ gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)]
|
|
|
+ gevent.joinall(t)
|
|
|
+ region_recall_result_list = [i.get() for i in t]
|
|
|
+
|
|
|
+ now_video_ids = []
|
|
|
+ recall_result = []
|
|
|
+ for region_result in region_recall_result_list:
|
|
|
+ for video in region_result:
|
|
|
+ video_id = video.get('videoId')
|
|
|
+ if video_id not in now_video_ids:
|
|
|
+ recall_result.append(video)
|
|
|
+ now_video_ids.append(video_id)
|
|
|
+ if len(recall_result) >= size:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if len(recall_result) < size:
|
|
|
+
|
|
|
+ rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
|
|
|
+
|
|
|
+ for video in rov_recall_result:
|
|
|
+ vid = video.get('videoId')
|
|
|
+ if vid not in now_video_ids:
|
|
|
+ recall_result.append(video)
|
|
|
+ now_video_ids.append(vid)
|
|
|
+ if len(recall_result) >= size:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ return recall_result[:size]
|
|
|
+
|
|
|
+ def rov_pool_recall_with_region_by_h(self, province_code, size=4):
|
|
|
+ """
|
|
|
+ 地域分组小时级视频召回
|
|
|
+ :param size: 视频数
|
|
|
+ :param province_code: 省份code
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+
|
|
|
+ h_recall_mid_key = self.get_mid_h_key(province_code=province_code)
|
|
|
+ if not self.redis_helper.key_exists(h_recall_mid_key):
|
|
|
+ recall_result = []
|
|
|
+ else:
|
|
|
+
|
|
|
+ fil_video_ids = []
|
|
|
+ recall_result = []
|
|
|
+
|
|
|
+ get_size = size * 5
|
|
|
+
|
|
|
+ freq = 0
|
|
|
+ while len(recall_result) < size:
|
|
|
+ freq += 1
|
|
|
+ if freq > config_.MAX_FREQ_FROM_ROV_POOL:
|
|
|
+ break
|
|
|
+
|
|
|
+ data = self.redis_helper.get_data_zset_with_index(key_name=h_recall_mid_key,
|
|
|
+ start=(freq - 1) * get_size, end=freq * get_size - 1,
|
|
|
+ with_scores=True)
|
|
|
+ if not data:
|
|
|
+ log_.info('地域分组小时级更新视频已取完')
|
|
|
+ break
|
|
|
+
|
|
|
+ video_ids = []
|
|
|
+ video_score = {}
|
|
|
+ for value in data:
|
|
|
+ video_id = int(value[0])
|
|
|
+ video_ids.append(video_id)
|
|
|
+ video_score[video_id] = value[1]
|
|
|
+
|
|
|
+ filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
|
|
|
+ ge = gevent.spawn(filter_.filter_videos_h, self.rule_key, self.ab_code, province_code)
|
|
|
+ ge.join()
|
|
|
+ filtered_result = ge.get()
|
|
|
+
|
|
|
+ if filtered_result:
|
|
|
+
|
|
|
+ temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
|
+ 'pushFrom': config_.PUSH_FROM['rov_recall_region_h'], 'abCode': self.ab_code}
|
|
|
+ for item in filtered_result if video_score.get(int(item)) is not None]
|
|
|
+ recall_result.extend(temp_result)
|
|
|
+ fil_video_ids.extend(list(set(video_ids) - set([item.get('videoId') for item in temp_result])))
|
|
|
+ else:
|
|
|
+ fil_video_ids.extend(video_ids)
|
|
|
+
|
|
|
+ for value in fil_video_ids:
|
|
|
+ self.redis_helper.remove_value_from_zset(key_name=h_recall_mid_key, value=value)
|
|
|
+
|
|
|
+ return recall_result[:size]
|
|
|
+
|
|
|
+ def region_dup_recall(self, province_code, region_dup, size=4, expire_time=23*3600):
|
|
|
+ """
|
|
|
+ region dup 更新列表视频召回
|
|
|
+ :param province_code:
|
|
|
+ :param region_dup:
|
|
|
+ :param size:
|
|
|
+ :param expire_time:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if region_dup == 1:
|
|
|
+ push_from = config_.PUSH_FROM['rov_recall_region_day']
|
|
|
+ elif region_dup == 2:
|
|
|
+ push_from = config_.PUSH_FROM['rov_recall_day']
|
|
|
+
|
|
|
+
|
|
|
+ key_name, last_region_dup_key, idx = self.get_region_dup_video_last_idx_h(
|
|
|
+ province_code=province_code, region_dup=region_dup)
|
|
|
+
|
|
|
+ if not key_name:
|
|
|
+ log_.info(f'region dup 更新列表中无视频, region_dup = {region_dup}')
|
|
|
+ recall_result = []
|
|
|
+ else:
|
|
|
+ recall_result = []
|
|
|
+
|
|
|
+ get_size = size * 5
|
|
|
+
|
|
|
+ freq = 0
|
|
|
+ while len(recall_result) < size:
|
|
|
+ freq += 1
|
|
|
+ if freq > config_.MAX_FREQ_FROM_ROV_POOL:
|
|
|
+ break
|
|
|
+
|
|
|
+ data = self.redis_helper.get_data_zset_with_index(key_name=key_name,
|
|
|
+ start=idx, end=idx + get_size - 1,
|
|
|
+ with_scores=True)
|
|
|
+ if not data:
|
|
|
+ log_.info(f'region dup 更新视频已取完, region_dup = {region_dup}')
|
|
|
+ break
|
|
|
+
|
|
|
+ video_ids = []
|
|
|
+ video_score = {}
|
|
|
+ for value in data:
|
|
|
+ video_id = int(value[0])
|
|
|
+ video_ids.append(video_id)
|
|
|
+ video_score[video_id] = value[1]
|
|
|
+
|
|
|
+ filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
|
|
|
+ ge = gevent.spawn(filter_.filter_videos)
|
|
|
+ ge.join()
|
|
|
+ filtered_result = ge.get()
|
|
|
+
|
|
|
+ if filtered_result:
|
|
|
+
|
|
|
+ temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
|
+ 'pushFrom': push_from, 'abCode': self.ab_code}
|
|
|
+ for item in filtered_result if video_score.get(int(item)) is not None]
|
|
|
+ recall_result.extend(temp_result)
|
|
|
+ else:
|
|
|
+
|
|
|
+ self.redis_helper.set_data_to_redis(key_name=last_region_dup_key, value=data[-1][0],
|
|
|
+ expire_time=expire_time)
|
|
|
+ idx += get_size
|
|
|
+
|
|
|
+ return recall_result[:size]
|