|
@@ -32,9 +32,19 @@ class PoolRecall(object):
|
|
|
"""从ROV召回池中获取视频"""
|
|
|
log_.info('====== rov pool recall')
|
|
|
|
|
|
+ # 获取生效中的置顶视频
|
|
|
+ top_video_ids, top_video_result = self.get_top_videos()
|
|
|
+ log_.info('top video result = {}'.format(top_video_ids))
|
|
|
+
|
|
|
# 获取修改过rov的视频
|
|
|
update_rov_video_ids, update_rov_result = self.get_update_rov_videos()
|
|
|
log_.info('update rov result = {}'.format(update_rov_video_ids))
|
|
|
+ # 与置顶视频去重
|
|
|
+ update_rov_video_ids_dup, update_rov_dup_result = [], []
|
|
|
+ for item in update_rov_result:
|
|
|
+ if item['videoId'] not in top_video_ids:
|
|
|
+ update_rov_video_ids_dup.append(item['videoId'])
|
|
|
+ update_rov_dup_result.append(item)
|
|
|
|
|
|
# 获取相关redis key, 用户上一次在rov召回池对应的位置
|
|
|
rov_pool_key, last_rov_recall_key, idx = self.get_video_last_idx()
|
|
@@ -68,18 +78,17 @@ class PoolRecall(object):
|
|
|
video_score = {}
|
|
|
for value in data:
|
|
|
video_id = int(value[0])
|
|
|
- if video_id in update_rov_video_ids:
|
|
|
+ # 视频id在 生效中的置顶视频 或 修改过rov的视频 中,跳过
|
|
|
+ if video_id in update_rov_video_ids_dup or video_id in top_video_ids:
|
|
|
continue
|
|
|
video_ids.append(video_id)
|
|
|
video_score[video_id] = value[1]
|
|
|
# 过滤
|
|
|
- debug_tm_b = time.time()
|
|
|
filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
|
|
|
ge = gevent.spawn(filter_.filter_videos)
|
|
|
ge.join()
|
|
|
filtered_result = ge.get()
|
|
|
# filtered_result = filter_.filter_videos()
|
|
|
- debug_tm_e = time.time()
|
|
|
|
|
|
if filtered_result:
|
|
|
# 添加视频源参数 pushFrom, abCode
|
|
@@ -92,10 +101,12 @@ class PoolRecall(object):
|
|
|
self.redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=data[-1][0])
|
|
|
idx += get_size
|
|
|
|
|
|
- # 被修改rov视频、rov召回池视频 归并排序
|
|
|
- if update_rov_result:
|
|
|
- rov_pool_recall_result.extend(update_rov_result)
|
|
|
- rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
|
|
|
+ # 生效中的置顶视频、被修改rov视频、rov召回池视频 归并排序
|
|
|
+ if top_video_result:
|
|
|
+ rov_pool_recall_result.extend(top_video_result)
|
|
|
+ if update_rov_dup_result:
|
|
|
+ rov_pool_recall_result.extend(update_rov_dup_result)
|
|
|
+ rov_pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
|
|
|
return rov_pool_recall_result[:size]
|
|
|
|
|
|
def flow_pool_recall(self, size=10):
|
|
@@ -355,4 +366,45 @@ class PoolRecall(object):
|
|
|
log_.error(traceback.format_exc())
|
|
|
return [], []
|
|
|
|
|
|
+ def get_top_videos(self):
|
|
|
+ """
|
|
|
+ 获取置顶视频
|
|
|
+ :return: top_video_ids, top_video_result
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 获取生效中的置顶视频列表
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ data = redis_helper.get_data_from_redis(key_name=config_.TOP_VIDEO_LIST_KEY_NAME)
|
|
|
+ if data is None:
|
|
|
+ return [], []
|
|
|
+ # 获取视频id,并转换类型为int,将videoId和score做mapping,并存储为key-value{videoId: score}
|
|
|
+ video_ids = []
|
|
|
+ video_info = {}
|
|
|
+ for item in eval(data):
|
|
|
+ video_id = int(item['videoId'])
|
|
|
+ video_ids.append(video_id)
|
|
|
+ video_info[video_id] = {'score': item['score'], 'area': item['area']}
|
|
|
+
|
|
|
+ # 过滤
|
|
|
+ filter_ = FilterVideos(app_type=self.app_type, video_ids=video_ids, mid=self.mid, uid=self.uid)
|
|
|
+ ge = gevent.spawn(filter_.filter_videos)
|
|
|
+ ge.join()
|
|
|
+ filtered_result = ge.get()
|
|
|
+
|
|
|
+ # 添加视频源参数 pushFrom = 'op_manual', abCode
|
|
|
+ top_video_ids, top_video_result = [], []
|
|
|
+ if not filtered_result:
|
|
|
+ return top_video_ids, top_video_result
|
|
|
+ for item in filtered_result:
|
|
|
+ video_id = int(item)
|
|
|
+ item_info = video_info.get(video_id)
|
|
|
+ if item_info is None:
|
|
|
+ continue
|
|
|
+ top_video_ids.append(video_id)
|
|
|
+ top_video_result.append({'videoId': video_id, 'rovScore': item_info.get('score'),
|
|
|
+ 'pushFrom': 'op_manual', 'abCode': self.ab_code})
|
|
|
+ return top_video_ids, top_video_result
|
|
|
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(traceback.format_exc())
|
|
|
+ return [], []
|