|
@@ -301,11 +301,13 @@ class PoolRecall(object):
|
|
continue
|
|
continue
|
|
return recall_result[:size]
|
|
return recall_result[:size]
|
|
|
|
|
|
- def rov_pool_recall(self, size=10, expire_time=24*3600):
|
|
|
|
|
|
+ def rov_pool_recall(self, size=10, expire_time=24*3600, video_type='', push_from=config_.PUSH_FROM['rov_recall']):
|
|
"""
|
|
"""
|
|
从ROV召回池中获取视频
|
|
从ROV召回池中获取视频
|
|
:param size: 获取视频个数
|
|
:param size: 获取视频个数
|
|
:param expire_time: 末位视频记录redis过期时间
|
|
:param expire_time: 末位视频记录redis过期时间
|
|
|
|
+ :param video_type: 视频列表类别
|
|
|
|
+ :param push_from: 视频来源标记
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
# log_.info('====== rov pool recall')
|
|
# log_.info('====== rov pool recall')
|
|
@@ -334,7 +336,7 @@ class PoolRecall(object):
|
|
update_rov_dup_result.append(item)
|
|
update_rov_dup_result.append(item)
|
|
|
|
|
|
# 获取相关redis key, 用户上一次在rov召回池对应的位置
|
|
# 获取相关redis key, 用户上一次在rov召回池对应的位置
|
|
- rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx()
|
|
|
|
|
|
+ rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx(video_type=video_type)
|
|
if not rov_pool_key:
|
|
if not rov_pool_key:
|
|
log_.info('ROV召回池中无视频')
|
|
log_.info('ROV召回池中无视频')
|
|
if (not update_rov_dup_result) and (not top_video_result):
|
|
if (not update_rov_dup_result) and (not top_video_result):
|
|
@@ -382,7 +384,7 @@ class PoolRecall(object):
|
|
if filtered_result:
|
|
if filtered_result:
|
|
# 添加视频源参数 pushFrom, abCode
|
|
# 添加视频源参数 pushFrom, abCode
|
|
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
- 'pushFrom': config_.PUSH_FROM['rov_recall'], 'abCode': self.ab_code}
|
|
|
|
|
|
+ 'pushFrom': push_from, 'abCode': self.ab_code}
|
|
for item in filtered_result if video_score.get(int(item)) is not None]
|
|
for item in filtered_result if video_score.get(int(item)) is not None]
|
|
rov_pool_recall_result.extend(temp_result)
|
|
rov_pool_recall_result.extend(temp_result)
|
|
else:
|
|
else:
|
|
@@ -596,7 +598,7 @@ class PoolRecall(object):
|
|
log_.error('pool type error')
|
|
log_.error('pool type error')
|
|
return None, None
|
|
return None, None
|
|
|
|
|
|
- def get_video_last_idx(self):
|
|
|
|
|
|
+ def get_video_last_idx(self, video_type=''):
|
|
"""获取用户上一次在rov召回池对应的位置"""
|
|
"""获取用户上一次在rov召回池对应的位置"""
|
|
# if self.ab_code in [config_.AB_CODE['rank_by_h']] or self.app_type == config_.APP_TYPE['APP']:
|
|
# if self.ab_code in [config_.AB_CODE['rank_by_h']] or self.app_type == config_.APP_TYPE['APP']:
|
|
# abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
|
|
# abCode = 30001 # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
|
|
@@ -605,7 +607,8 @@ class PoolRecall(object):
|
|
[config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'],
|
|
[config_.AB_CODE['rov_rank_appType_18_19'], config_.AB_CODE['rov_rank_appType_19'],
|
|
config_.AB_CODE['top_video_relevant_appType_19']] + \
|
|
config_.AB_CODE['top_video_relevant_appType_19']] + \
|
|
[code for _, code in config_.AB_CODE['rank_by_24h'].items()] or \
|
|
[code for _, code in config_.AB_CODE['rank_by_24h'].items()] or \
|
|
- self.app_type == config_.APP_TYPE['APP']:
|
|
|
|
|
|
+ self.app_type == config_.APP_TYPE['APP'] or \
|
|
|
|
+ video_type == 'whole_movies':
|
|
rov_pool_key, redis_date = self.get_pool_redis_key_with_h('rov')
|
|
rov_pool_key, redis_date = self.get_pool_redis_key_with_h('rov')
|
|
|
|
|
|
elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
|
|
elif self.ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
|
|
@@ -622,6 +625,8 @@ class PoolRecall(object):
|
|
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX
|
|
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PRE_PREFIX
|
|
else:
|
|
else:
|
|
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX
|
|
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_NOW_PREFIX
|
|
|
|
+ elif video_type == 'whole_movies':
|
|
|
|
+ last_key_prefix = config_.LAST_VIDEO_FROM_WHOLE_MOVIES_PREFIX
|
|
else:
|
|
else:
|
|
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX
|
|
last_key_prefix = config_.LAST_VIDEO_FROM_ROV_POOL_PREFIX
|
|
last_rov_recall_key = f'{last_key_prefix}{self.app_type}.{self.mid}.{redis_date}'
|
|
last_rov_recall_key = f'{last_key_prefix}{self.app_type}.{self.mid}.{redis_date}'
|
|
@@ -749,10 +754,11 @@ class PoolRecall(object):
|
|
log_.error(traceback.format_exc())
|
|
log_.error(traceback.format_exc())
|
|
return [], []
|
|
return [], []
|
|
|
|
|
|
- def get_pool_redis_key_with_h(self, pool_type):
|
|
|
|
|
|
+ def get_pool_redis_key_with_h(self, pool_type, video_type=''):
|
|
"""
|
|
"""
|
|
拼接key,获取以小时级别更新的视频列表
|
|
拼接key,获取以小时级别更新的视频列表
|
|
:param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
|
|
:param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
|
|
|
|
+ :param video_type: 视频列表区分 whole_movies - 完整影视资源
|
|
:return: key_name
|
|
:return: key_name
|
|
"""
|
|
"""
|
|
if pool_type == 'rov':
|
|
if pool_type == 'rov':
|
|
@@ -818,6 +824,29 @@ class PoolRecall(object):
|
|
send_msg_to_feishu(feishu_text)
|
|
send_msg_to_feishu(feishu_text)
|
|
return key_name, redis_h
|
|
return key_name, redis_h
|
|
|
|
|
|
|
|
+ # 完整影视资源
|
|
|
|
+ elif video_type == 'whole_movies':
|
|
|
|
+ # 判断完整影视资源列表是否更新,未更新则使用前一小时的热度列表
|
|
|
|
+ key_name_prefix = f"{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}"
|
|
|
|
+ key_name = f"{key_name_prefix}{now_date}.{h}"
|
|
|
|
+ if self.redis_helper.key_exists(key_name):
|
|
|
|
+ return key_name, h
|
|
|
|
+ else:
|
|
|
|
+ if h == 0:
|
|
|
|
+ redis_h = 23
|
|
|
|
+ redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
|
|
|
|
+ else:
|
|
|
|
+ redis_h = h - 1
|
|
|
|
+ redis_date = now_date
|
|
|
|
+ key_name = f"{key_name_prefix}{redis_date}.{redis_h}"
|
|
|
|
+ # 判断当前时间是否晚于数据正常更新时间,发送消息到飞书
|
|
|
|
+ now_m = datetime.now().minute
|
|
|
|
+ feishu_text = '{} —— appType = {}, h = {} 完整影视资源数据未按时更新,请及时查看解决。'.format(
|
|
|
|
+ config_.ENV_TEXT, self.app_type, h)
|
|
|
|
+ if now_m > config_.ROV_H_UPDATE_MINUTE:
|
|
|
|
+ send_msg_to_feishu(feishu_text)
|
|
|
|
+ return key_name, redis_h
|
|
|
|
+
|
|
else:
|
|
else:
|
|
# 判断热度列表是否更新,未更新则使用前一小时的热度列表
|
|
# 判断热度列表是否更新,未更新则使用前一小时的热度列表
|
|
if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
|
|
if self.ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
|
|
@@ -859,7 +888,7 @@ class PoolRecall(object):
|
|
log_.error('pool type error')
|
|
log_.error('pool type error')
|
|
return None, None
|
|
return None, None
|
|
|
|
|
|
- def flow_pool_recall_18_19(self, size=10):
|
|
|
|
|
|
+ def flow_pool_recall_18_19(self, size=4, push_from=config_.PUSH_FROM['flow_recall']):
|
|
"""从流量池中获取视频"""
|
|
"""从流量池中获取视频"""
|
|
# log_.info('====== flow pool recall')
|
|
# log_.info('====== flow pool recall')
|
|
flow_pool_key = self.get_pool_redis_key('flow')
|
|
flow_pool_key = self.get_pool_redis_key('flow')
|
|
@@ -900,7 +929,7 @@ class PoolRecall(object):
|
|
if filtered_result:
|
|
if filtered_result:
|
|
# 添加视频源参数 pushFrom, abCode
|
|
# 添加视频源参数 pushFrom, abCode
|
|
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
- 'pushFrom': config_.PUSH_FROM['flow_recall'], 'abCode': self.ab_code}
|
|
|
|
|
|
+ 'pushFrom': push_from, 'abCode': self.ab_code}
|
|
for item in filtered_result if video_score.get(int(item)) is not None]
|
|
for item in filtered_result if video_score.get(int(item)) is not None]
|
|
flow_pool_recall_result.extend(temp_result)
|
|
flow_pool_recall_result.extend(temp_result)
|
|
idx += get_size
|
|
idx += get_size
|
|
@@ -1399,3 +1428,44 @@ class PoolRecall(object):
|
|
now_video_ids.append(video_id)
|
|
now_video_ids.append(video_id)
|
|
return recall_result[:size]
|
|
return recall_result[:size]
|
|
|
|
|
|
|
|
+ def rov_pool_recall_19(self, size=4, expire_time=24*3600):
|
|
|
|
+ """
|
|
|
|
+ 最惊奇视频召回
|
|
|
|
+ :param size: 获取视频个数
|
|
|
|
+ :param expire_time: 末位视频记录redis过期时间
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ t = [gevent.spawn(self.rov_pool_recall, size, expire_time=3600, video_type='whole_movies', push_from=config_.PUSH_FROM['whole_movies']),
|
|
|
|
+ gevent.spawn(self.flow_pool_recall_18_19, size, push_from=config_.PUSH_FROM['talk_videos'])]
|
|
|
|
+ gevent.joinall(t)
|
|
|
|
+ recall_result_list = [i.get() for i in t]
|
|
|
|
+ # 将已获取到的视频按顺序去重合并
|
|
|
|
+ now_video_ids = []
|
|
|
|
+ recall_result = []
|
|
|
|
+ for item in recall_result_list:
|
|
|
|
+ for video in item:
|
|
|
|
+ video_id = video.get('videoId')
|
|
|
|
+ if video_id not in now_video_ids:
|
|
|
|
+ recall_result.append(video)
|
|
|
|
+ now_video_ids.append(video_id)
|
|
|
|
+ if len(recall_result) >= size:
|
|
|
|
+ break
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+ # 判断获取到的小时级数据数量
|
|
|
|
+ if len(recall_result) < size:
|
|
|
|
+ # 补充数据
|
|
|
|
+ rov_recall_result = self.rov_pool_recall(size=size, expire_time=expire_time)
|
|
|
|
+ # 去重合并
|
|
|
|
+ for video in rov_recall_result:
|
|
|
|
+ vid = video.get('videoId')
|
|
|
|
+ if vid not in now_video_ids:
|
|
|
|
+ recall_result.append(video)
|
|
|
|
+ now_video_ids.append(vid)
|
|
|
|
+ if len(recall_result) >= size:
|
|
|
|
+ break
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ return recall_result[:size]
|
|
|
|
+
|