|
@@ -11,9 +11,10 @@ import gevent
|
|
|
import json
|
|
|
import sys
|
|
|
from parameter_update import param_update_expansion_factor
|
|
|
-from parameter_update import param_update_risk_filter_flag
|
|
|
+from parameter_update import param_update_filter_flags
|
|
|
from parameter_update import param_update_risk_rule
|
|
|
from parameter_update import param_update_risk_videos
|
|
|
+from parameter_update import param_update_rule
|
|
|
|
|
|
log_ = Log()
|
|
|
config_ = set_config()
|
|
@@ -52,14 +53,17 @@ class PoolRecall(object):
|
|
|
self.h_rule_key = h_rule_key
|
|
|
|
|
|
self.expansion_factor = param_update_expansion_factor()
|
|
|
- self.risk_filter_flag = param_update_risk_filter_flag()
|
|
|
+ [self.risk_filter_flag, self.tags_filter_flag] = param_update_filter_flags()
|
|
|
if self.risk_filter_flag:
|
|
|
self.app_region_filtered = param_update_risk_rule()
|
|
|
self.videos_with_risk = param_update_risk_videos()
|
|
|
else:
|
|
|
self.app_region_filtered = {}
|
|
|
self.videos_with_risk = []
|
|
|
-
|
|
|
+ if self.tags_filter_flag:
|
|
|
+ self.tags_filter_rule = param_update_rule(self.redis_helper)
|
|
|
+ else:
|
|
|
+ self.tags_filter_rule = {}
|
|
|
|
|
|
|
|
|
def copy_redis_zset_data(self, from_key_name, to_key_name):
|
|
@@ -3959,7 +3963,13 @@ class PoolRecall(object):
|
|
|
if len(data2_list) > 0:
|
|
|
data_for_filter.extend(
|
|
|
[data2_list[i:i + group_size] for i in range(0, len(data2_list), group_size)])
|
|
|
+ else:
|
|
|
+ data2_list = []
|
|
|
data_for_filter = [i for i in data_for_filter if len(i) > 0]
|
|
|
+
|
|
|
+ # 3.9 获取item的tag特征
|
|
|
+ video_tag_dict = self.get_video_tags(list(set(data1_list) | set(data2_list)))
|
|
|
+
|
|
|
# 4 视频过滤
|
|
|
filter_ = FilterVideos(request_id=self.request_id,
|
|
|
app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=None,
|
|
@@ -3969,7 +3979,7 @@ class PoolRecall(object):
|
|
|
videos_with_risk=self.videos_with_risk
|
|
|
)
|
|
|
region_code = self.get_region_code()
|
|
|
- t = [gevent.spawn(filter_.filter_videos_for_group, region_code, videos) for videos in data_for_filter]
|
|
|
+ t = [gevent.spawn(filter_.filter_videos_for_group, region_code, videos, video_tag_dict, self.tags_filter_rule) for videos in data_for_filter]
|
|
|
gevent.joinall(t)
|
|
|
result_list = [i.get() for i in t if i.get() is not None and len(i.get()) > 0]
|
|
|
# 5 返回结果
|
|
@@ -3977,6 +3987,7 @@ class PoolRecall(object):
|
|
|
for g in result_list:
|
|
|
for v in g:
|
|
|
results.append({
|
|
|
+ 'tags': video_tag_dict[v],
|
|
|
'videoId': v, 'flowPool': '',
|
|
|
'rovScore': 0.0, 'pushFrom': config_.PUSH_FROM['recall_strategy_trend_v1'],
|
|
|
'abCode': self.ab_code
|
|
@@ -3985,4 +3996,15 @@ class PoolRecall(object):
|
|
|
except Exception as e:
|
|
|
log_.error("error in recall_strategy_trend_v1:{}".format(e))
|
|
|
return []
|
|
|
+ def get_video_tags(self, video_ids) -> dict:
|
|
|
+ REDIS_PREFIX = "alg_recsys_video_tags_"
|
|
|
+ redis_keys = [REDIS_PREFIX + str(i) for i in video_ids]
|
|
|
+ video_tags = self.redis_helper.get_batch_key(redis_keys)
|
|
|
+ video_tag_dict = {}
|
|
|
+ for i, tags_str in enumerate(video_tags):
|
|
|
+ tags = []
|
|
|
+ if tags_str is not None and len(tags_str) != 0:
|
|
|
+ tags = str(tags_str).split(",")
|
|
|
+ video_tag_dict[video_ids[i]] = tags
|
|
|
+ return video_tag_dict
|
|
|
|