|
@@ -1222,43 +1222,39 @@ class PoolRecall(object):
|
|
|
if region_code == '':
|
|
|
region_code = '-1'
|
|
|
|
|
|
- # if self.ab_code in [config_.AB_CODE['region_rank_by_h'].get('region_rule_rank5_appType_0_data1'),
|
|
|
- # config_.AB_CODE['region_rank_by_h'].get('abtest_082'),
|
|
|
- # config_.AB_CODE['region_rank_by_h'].get('abtest_112')]:
|
|
|
-
|
|
|
- if region_code == '-1':
|
|
|
- t = [
|
|
|
- # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
|
|
|
- gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
|
|
|
- gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
|
|
|
- ]
|
|
|
+ if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
|
|
|
+ if region_code == '-1':
|
|
|
+ t = [
|
|
|
+ gevent.spawn(self.recall_update_by_day, size, '30day'),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
|
|
|
+ ]
|
|
|
+ else:
|
|
|
+ t = [
|
|
|
+ gevent.spawn(self.recall_update_by_day, size, '30day'),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
|
|
|
+ ]
|
|
|
else:
|
|
|
- t = [
|
|
|
- # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
|
|
|
- # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
|
|
|
- # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
|
|
|
- gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
|
|
|
- gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
|
|
|
- gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
|
|
|
- gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
|
|
|
- ]
|
|
|
- # else:
|
|
|
- # if province_code == '-1':
|
|
|
- # # t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h')]
|
|
|
- # t = [gevent.spawn(self.recall_region_dup_24h, province_code, size, '24h_dup2', expire_time)]
|
|
|
- #
|
|
|
- # else:
|
|
|
- # t = [
|
|
|
- # # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
|
|
|
- # # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
|
|
|
- # # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
|
|
|
- # gevent.spawn(self.recall_region_dup_24h, province_code, size, 'region_h', expire_time),
|
|
|
- # gevent.spawn(self.recall_region_dup_24h, province_code, size, 'region_24h', expire_time),
|
|
|
- # gevent.spawn(self.recall_region_dup_24h, province_code, size, '24h_dup2', expire_time),
|
|
|
- #
|
|
|
- # # gevent.spawn(self.region_dup_recall, province_code, 1, size, expire_time),
|
|
|
- # # gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)
|
|
|
- # ]
|
|
|
+ if region_code == '-1':
|
|
|
+ t = [
|
|
|
+ # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
|
|
|
+ ]
|
|
|
+ else:
|
|
|
+ t = [
|
|
|
+ # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
|
|
|
+ # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
|
|
|
+ # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
|
|
|
+ gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
|
|
|
+ ]
|
|
|
+
|
|
|
gevent.joinall(t)
|
|
|
region_recall_result_list = [i.get() for i in t]
|
|
|
# 将已获取到的视频按顺序去重合并
|
|
@@ -1648,7 +1644,6 @@ class PoolRecall(object):
|
|
|
|
|
|
return recall_result[:size]
|
|
|
|
|
|
-
|
|
|
def update_last_video_record(self, record_key, pool_key_prefix, province_code):
|
|
|
# 判断当前小时的小时级列表是否更新
|
|
|
now_date = datetime.today()
|
|
@@ -1863,3 +1858,147 @@ class PoolRecall(object):
|
|
|
# 'executeTime': (time.time() - start_time) * 1000
|
|
|
# })
|
|
|
return pool_recall_result[:size]
|
|
|
+
|
|
|
+ def update_last_video_record_by_day(self, record_key, pool_key_prefix, expire_time):
|
|
|
+ # 判断当前日期的小时级列表是否更新
|
|
|
+ now_date = datetime.today()
|
|
|
+ now_dt = datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ now_pool_recall_key = f"{pool_key_prefix}:{self.data_key}:{self.rule_key}:{now_dt}"
|
|
|
+ if self.redis_helper.key_exists(key_name=now_pool_recall_key):
|
|
|
+ value = {'date': now_dt}
|
|
|
+ self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
|
|
|
+ else:
|
|
|
+ redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
|
|
|
+ now_pool_recall_key = f"{pool_key_prefix}:{self.data_key}:{self.rule_key}:{redis_dt}"
|
|
|
+ value = {'date': redis_dt}
|
|
|
+ self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
|
|
|
+ return now_pool_recall_key
|
|
|
+
|
|
|
+ def get_last_recommend_video_idx_by_day(self, record_key_prefix, pool_key_prefix, last_video_key_prefix, expire_time):
|
|
|
+ # 判断mid对应上一次视频位置 时间记录
|
|
|
+ record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
|
|
|
+ last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
|
|
|
+
|
|
|
+ if not self.redis_helper.key_exists(key_name=record_key):
|
|
|
+ # ###### 记录key不存在
|
|
|
+ self.redis_helper.del_keys(key_name=last_video_key)
|
|
|
+ idx = 0
|
|
|
+ pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
|
|
|
+ pool_key_prefix=pool_key_prefix,
|
|
|
+ expire_time=expire_time)
|
|
|
+ else:
|
|
|
+ # ###### 记录key存在,判断date
|
|
|
+ now_date = datetime.today()
|
|
|
+ # 获取记录的date
|
|
|
+ record = self.redis_helper.get_data_from_redis(key_name=record_key)
|
|
|
+ record_dt = eval(record).get('date')
|
|
|
+ now_dt = datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ if record_dt == now_dt:
|
|
|
+ # 已获取当前日期数据
|
|
|
+ pool_recall_key = f"{pool_key_prefix}:{self.data_key}:{self.rule_key}:{now_dt}"
|
|
|
+ idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
|
|
|
+ elif record_dt == datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d'):
|
|
|
+ # 记录的dt - 当前dt = 1,判断当前h数据是否已更新
|
|
|
+ now_pool_recall_key = f"{pool_key_prefix}:{self.data_key}:{self.rule_key}:{now_dt}"
|
|
|
+ if self.redis_helper.key_exists(key_name=now_pool_recall_key):
|
|
|
+ new_record = {'date': now_dt}
|
|
|
+ self.redis_helper.set_data_to_redis(key_name=record_key,
|
|
|
+ value=str(new_record),
|
|
|
+ expire_time=expire_time)
|
|
|
+ idx = 0
|
|
|
+ self.redis_helper.del_keys(key_name=last_video_key)
|
|
|
+ pool_recall_key = now_pool_recall_key
|
|
|
+ else:
|
|
|
+ pool_recall_key = f"{pool_key_prefix}:{self.data_key}:{self.rule_key}:{record_dt}"
|
|
|
+ idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
|
|
|
+ else:
|
|
|
+ idx = 0
|
|
|
+ self.redis_helper.del_keys(key_name=last_video_key)
|
|
|
+ pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
|
|
|
+ pool_key_prefix=pool_key_prefix,
|
|
|
+ expire_time=expire_time)
|
|
|
+
|
|
|
+ return pool_recall_key, idx
|
|
|
+
|
|
|
+ def recall_update_by_day(self, size=4, key_flag='', expire_time=24*3600):
|
|
|
+ """
|
|
|
+ 从天级更新列表中获取视频
|
|
|
+ :param size: 获取视频个数
|
|
|
+ :param key_flag: 视频表标记
|
|
|
+ :param expire_time: 末位视频记录redis过期时间
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if key_flag == '30day':
|
|
|
+ # 相对30天计算列表的筛选结果
|
|
|
+ # 视频列表
|
|
|
+ pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
|
|
|
+ # mid对应上一次视频位置 时间记录
|
|
|
+ record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_30DAY
|
|
|
+ # mid对应上一次视频记录
|
|
|
+ last_video_key_prefix = config_.LAST_VIDEO_FROM_30DAY_PREFIX
|
|
|
+ push_from = config_.PUSH_FROM['rov_recall_30day']
|
|
|
+ else:
|
|
|
+ return []
|
|
|
+ # 获取相关redis key, 用户上一次在rov召回池对应的位置
|
|
|
+ pool_key, idx = self.get_last_recommend_video_idx_by_day(record_key_prefix=record_key_prefix,
|
|
|
+ pool_key_prefix=pool_key_prefix,
|
|
|
+ last_video_key_prefix=last_video_key_prefix,
|
|
|
+ expire_time=expire_time)
|
|
|
+ if not pool_key:
|
|
|
+ return []
|
|
|
+ recall_data = []
|
|
|
+ pool_recall_result = []
|
|
|
+ # 每次获取的视频数
|
|
|
+ get_size = size * 5
|
|
|
+ # 记录获取频次
|
|
|
+ freq = 0
|
|
|
+ while len(pool_recall_result) < size:
|
|
|
+ freq += 1
|
|
|
+ if freq > config_.MAX_FREQ_FROM_ROV_POOL:
|
|
|
+ break
|
|
|
+ # 获取数据
|
|
|
+ data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
|
|
|
+ start=idx, end=idx + get_size - 1,
|
|
|
+ with_scores=True)
|
|
|
+ if not data:
|
|
|
+ break
|
|
|
+ recall_data.extend(data)
|
|
|
+ # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
|
|
|
+ video_ids = []
|
|
|
+ video_score = {}
|
|
|
+ for value in data:
|
|
|
+ video_id = int(value[0])
|
|
|
+ video_ids.append(video_id)
|
|
|
+ video_score[video_id] = value[1]
|
|
|
+ # 过滤
|
|
|
+ filter_ = FilterVideos(request_id=self.request_id,
|
|
|
+ app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
|
|
|
+ ge = gevent.spawn(filter_.filter_videos)
|
|
|
+ ge.join()
|
|
|
+ filtered_result = ge.get()
|
|
|
+
|
|
|
+ if filtered_result:
|
|
|
+ # 添加视频源参数 pushFrom, abCode
|
|
|
+ temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
|
+ 'pushFrom': push_from, 'abCode': self.ab_code}
|
|
|
+ for item in filtered_result if video_score.get(int(item)) is not None]
|
|
|
+ pool_recall_result.extend(temp_result)
|
|
|
+
|
|
|
+ idx += get_size
|
|
|
+
|
|
|
+ pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
|
|
|
+
|
|
|
+ if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
|
|
|
+ # 召回数据不为空 & 过滤后结果为空 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
|
|
|
+ last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
|
|
|
+ self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
|
|
|
+ expire_time=expire_time)
|
|
|
+
|
|
|
+ # log_.info({
|
|
|
+ # 'logTimestamp': int(time.time() * 1000),
|
|
|
+ # 'request_id': self.request_id,
|
|
|
+ # 'operation': push_from,
|
|
|
+ # 'pool_recall_result': pool_recall_result,
|
|
|
+ # 'executeTime': (time.time() - start_time) * 1000
|
|
|
+ # })
|
|
|
+ return pool_recall_result[:size]
|