|
@@ -31,11 +31,18 @@ class PoolRecall(object):
|
|
|
def rov_pool_recall(self, size=10):
|
|
|
"""从ROV召回池中获取视频"""
|
|
|
log_.info('====== rov pool recall')
|
|
|
+
|
|
|
+ # 获取修改过rov的视频
|
|
|
+ update_rov_video_ids, update_rov_result = self.get_update_rov_videos()
|
|
|
+ log_.info('update rov result = {}'.format(update_rov_video_ids))
|
|
|
+
|
|
|
# 获取相关redis key, 用户上一次在rov召回池对应的位置
|
|
|
rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx()
|
|
|
if not rov_pool_key:
|
|
|
log_.info('ROV召回池中无视频')
|
|
|
- return []
|
|
|
+ if not update_rov_result:
|
|
|
+ return []
|
|
|
+ return update_rov_result
|
|
|
rov_pool_recall_result = []
|
|
|
# 每次获取的视频数
|
|
|
get_size = size * 2
|
|
@@ -61,6 +68,8 @@ class PoolRecall(object):
|
|
|
video_score = {}
|
|
|
for value in data:
|
|
|
video_id = int(value[0])
|
|
|
+ if video_id in update_rov_video_ids:
|
|
|
+ continue
|
|
|
video_ids.append(video_id)
|
|
|
video_score[video_id] = value[1]
|
|
|
# 过滤
|
|
@@ -69,8 +78,9 @@ class PoolRecall(object):
|
|
|
ge = gevent.spawn(filter_.filter_videos)
|
|
|
ge.join()
|
|
|
filtered_result = ge.get()
|
|
|
- #filtered_result = filter_.filter_videos()
|
|
|
+ # filtered_result = filter_.filter_videos()
|
|
|
debug_tm_e = time.time()
|
|
|
+
|
|
|
if filtered_result:
|
|
|
# 添加视频源参数 pushFrom, abCode
|
|
|
temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
|
|
@@ -81,6 +91,11 @@ class PoolRecall(object):
|
|
|
# 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
|
|
|
self.redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=data[-1][0])
|
|
|
idx += get_size
|
|
|
+
|
|
|
+ # 被修改rov视频、rov召回池视频 归并排序
|
|
|
+ if update_rov_result:
|
|
|
+ rov_pool_recall_result.extend(update_rov_result)
|
|
|
+ rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
|
|
|
return rov_pool_recall_result[:size]
|
|
|
|
|
|
def flow_pool_recall(self, size=10):
|
|
@@ -296,3 +311,48 @@ class PoolRecall(object):
|
|
|
else:
|
|
|
idx = 0
|
|
|
return rov_pool_key, last_rov_recall_key, idx
|
|
|
+
|
|
|
+ def get_update_rov_videos(self):
|
|
|
+ """
|
|
|
+ 获取修改ROV的视频
|
|
|
+ :return: update_rov_video_ids, update_rov_result
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 获取修改过ROV的视频
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME,
|
|
|
+ start=0, end=-1, with_scores=True)
|
|
|
+ # 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
|
|
|
+ if data is None:
|
|
|
+ return [], []
|
|
|
+ video_ids = []
|
|
|
+ video_score = {}
|
|
|
+ for value in data:
|
|
|
+ video_id = int(value[0])
|
|
|
+ video_ids.append(video_id)
|
|
|
+ video_score[video_id] = value[1]
|
|
|
+ # 过滤
|
|
|
+ filter_ = FilterVideos(app_type=self.app_type, video_ids=video_ids, mid=self.mid, uid=self.uid)
|
|
|
+ ge = gevent.spawn(filter_.filter_videos)
|
|
|
+ ge.join()
|
|
|
+ filtered_result = ge.get()
|
|
|
+
|
|
|
+ # 添加视频源参数 pushFrom, abCode
|
|
|
+ update_rov_video_ids, update_rov_result = [], []
|
|
|
+ if not filtered_result:
|
|
|
+ return update_rov_video_ids, update_rov_result
|
|
|
+ for item in filtered_result:
|
|
|
+ video_id = int(item)
|
|
|
+ rov_score = video_score.get(video_id)
|
|
|
+ if rov_score is None:
|
|
|
+ continue
|
|
|
+ update_rov_video_ids.append(video_id)
|
|
|
+ update_rov_result.append({'videoId': video_id, 'rovScore': rov_score,
|
|
|
+ 'pushFrom': 'recall_pool', 'abCode': self.ab_code})
|
|
|
+ return update_rov_video_ids, update_rov_result
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(traceback.format_exc())
|
|
|
+ return [], []
|
|
|
+
|
|
|
+
|