Преглед изворни кода

Merge branch 'feature/zhangbo_demand' of algorithm/rov-server into master-dev

zhangbo пре 1 година
родитељ
комит
181e3d67f5
3 измењених фајлова са 77 додато и 25 уклоњено
  1. 3 4
      parameter_update.py
  2. 19 10
      utils.py
  3. 55 11
      video_recall.py

+ 3 - 4
parameter_update.py

@@ -1,4 +1,3 @@
-import sys
 from my_utils import parse_json_for_risk_rule
 from my_utils import parse_json_for_risk_videos
 from db_helper import RedisHelper
@@ -54,8 +53,8 @@ def param_update_expansion_factor() -> int:
     # 容灾
     if data < 5:
         data = 5
-    elif data > 15:
-        data = 5
+    elif data > 25:
+        data = 25
     return data
 
 def param_update_risk_filter_flag() -> bool:
@@ -69,7 +68,7 @@ def param_update_risk_filter_flag() -> bool:
     data = False
     if tmp is not None:
         try:
-            data = bool(tmp)
+            data = True if tmp.lower() == "true" else False
         except Exception as e:
             data = False
     return data

+ 19 - 10
utils.py

@@ -217,7 +217,11 @@ def update_video_w_h_rate(video_id, key_name):
 
 class FilterVideos(object):
     """视频过滤"""
-    def __init__(self, request_id, app_type, video_ids, mid='', uid=''):
+    def __init__(self, request_id, app_type, video_ids, mid='', uid='',
+                 expansion_factor=None,
+                 risk_filter_flag=None,
+                 app_region_filtered=None,
+                 videos_with_risk=None):
         """
         初始化
         :param request_id: request_id
@@ -231,6 +235,10 @@ class FilterVideos(object):
         self.mid = mid
         self.uid = uid
         self.video_ids = video_ids
+        self.expansion_factor = expansion_factor
+        self.risk_filter_flag = risk_filter_flag
+        self.app_region_filtered = app_region_filtered
+        self.videos_with_risk = videos_with_risk
 
     def filter_video_status_h(self, video_ids, rule_key, data_key, ab_code, province_code, key_flag=''):
         """召回小时级更新的视频状态过滤"""
@@ -324,7 +332,8 @@ class FilterVideos(object):
         """视频过滤"""
         # todo: 添加app和region的风险过滤。
         st_viewed = time.time()
-        # videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
+        videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
+
         # log_.info({
         #     'logTimestamp': int(time.time() * 1000),
         #     'pool_type': "zhangbo-filter-pool_type",
@@ -339,7 +348,7 @@ class FilterVideos(object):
         # })
         # 预曝光过滤
         st_pre = time.time()
-        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        filtered_pre_result = self.filter_video_previewed(videos_filtered)
         # print("filtered_pre:", (time.time()-st_pre)*1000)
         # et_pre = time.time()
         # log_.info({
@@ -717,7 +726,7 @@ class FilterVideos(object):
         """视频过滤"""
         # todo: 添加app和region的风险过滤。
         st_viewed = time.time()
-        # videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
+        videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
         # log_.info({
         #     'logTimestamp': int(time.time() * 1000),
         #     'pool_type': "zhangbo-filter-pool_type",
@@ -732,7 +741,7 @@ class FilterVideos(object):
         # })
         # 预曝光过滤
         st_pre = time.time()
-        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        filtered_pre_result = self.filter_video_previewed(videos_filtered)
         # print("filtered_pre:", (time.time()-st_pre)*1000)
         # et_pre = time.time()
         # log_.info({
@@ -847,11 +856,11 @@ class FilterVideos(object):
 
     def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
         # 0 用一个开关控制,是否过滤生效。 便于回滚功能。
-        flag = param_update_risk_filter_flag()
-        if not flag:
+        risk_filter_flag = self.risk_filter_flag
+        if not risk_filter_flag:
             return video_ids[0: min(20, len(video_ids))]
         # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤
-        app_region_filtered = param_update_risk_rule()
+        app_region_filtered = self.app_region_filtered
         if app_type in app_region_filtered.keys():
             if_filtered = False
             if region_code in app_region_filtered[app_type]:
@@ -863,10 +872,10 @@ class FilterVideos(object):
         if not if_filtered:
             return video_ids[0: min(20, len(video_ids))]
         # 2 确认过滤,获取风险video列表param_update_risk_videos
-        videos_with_risk = param_update_risk_videos()
+        videos_with_risk = self.videos_with_risk
         # 3 过滤 返回结果
         video_ids_new = [i for i in video_ids if i not in videos_with_risk]
-        # print(flag)
+        # print(risk_filter_flag)
         # print(app_region_filtered)
         # print(video_ids)
         # print(app_type)

+ 55 - 11
video_recall.py

@@ -11,6 +11,9 @@ import gevent
 import json
 import sys
 from parameter_update import param_update_expansion_factor
+from parameter_update import param_update_risk_filter_flag
+from parameter_update import param_update_risk_rule
+from parameter_update import param_update_risk_videos
 
 log_ = Log()
 config_ = set_config()
@@ -48,7 +51,14 @@ class PoolRecall(object):
         self.h_data_key = data_key
         self.h_rule_key = h_rule_key
 
-        # self.expansion_factor = param_update_expansion_factor()
+        self.expansion_factor = param_update_expansion_factor()
+        self.risk_filter_flag = param_update_risk_filter_flag()
+        if self.risk_filter_flag:
+            self.app_region_filtered = param_update_risk_rule()
+            self.videos_with_risk = param_update_risk_videos()
+        else:
+            self.app_region_filtered = {}
+            self.videos_with_risk = []
 
 
 
@@ -522,7 +532,12 @@ class PoolRecall(object):
                     video_mapping[video_id].append(flow_pool)
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
-                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
+                                   expansion_factor=self.expansion_factor,
+                                   risk_filter_flag=self.risk_filter_flag,
+                                   app_region_filtered=self.app_region_filtered,
+                                   videos_with_risk=self.videos_with_risk
+                                   )
             ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
                               region_code=region_code, shield_config=self.shield_config)
             ge.join()
@@ -778,7 +793,12 @@ class PoolRecall(object):
                     video_mapping[video_id].append(flow_pool)
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
-                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
+                                   expansion_factor=self.expansion_factor,
+                                   risk_filter_flag=self.risk_filter_flag,
+                                   app_region_filtered=self.app_region_filtered,
+                                   videos_with_risk=self.videos_with_risk
+                                   )
             ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
                               region_code=region_code, shield_config=self.shield_config)
             ge.join()
@@ -1068,7 +1088,12 @@ class PoolRecall(object):
             to_filter_videos = [item[0] for item in check_result_items[:get_size]]
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
-                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos)
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos,
+                                   expansion_factor=self.expansion_factor,
+                                   risk_filter_flag=self.risk_filter_flag,
+                                   app_region_filtered=self.app_region_filtered,
+                                   videos_with_risk=self.videos_with_risk
+                                   )
             ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
                               region_code=region_code, shield_config=self.shield_config)
             ge.join()
@@ -2810,9 +2835,8 @@ class PoolRecall(object):
         recall_data = []
         pool_recall_result = []
         # 每次获取的视频数
-        get_size = size * 5
-        # get_size = size * self.expansion_factor
-
+        # get_size = size * 5
+        get_size = size * self.expansion_factor
         # 记录获取频次
         freq = 0
         while len(pool_recall_result) < size:
@@ -2835,7 +2859,12 @@ class PoolRecall(object):
                 video_score[video_id] = value[1]
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
-                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
+                                   expansion_factor=self.expansion_factor,
+                                    risk_filter_flag=self.risk_filter_flag,
+                                    app_region_filtered=self.app_region_filtered,
+                                    videos_with_risk=self.videos_with_risk
+                                   )
             ge = gevent.spawn(filter_.filter_videos, '', province_code, None)
             ge.join()
             filtered_result = ge.get()
@@ -3141,7 +3170,12 @@ class PoolRecall(object):
         video_ids = video_ids[:recall_num]
         #print(video_ids)
         filter_ = FilterVideos(request_id=self.request_id,
-                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
+                               expansion_factor=self.expansion_factor,
+                               risk_filter_flag=self.risk_filter_flag,
+                               app_region_filtered=self.app_region_filtered,
+                               videos_with_risk=self.videos_with_risk
+                               )
         filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
         if filtered_viewed_videos is None:
             return recall_result
@@ -3712,7 +3746,12 @@ class PoolRecall(object):
         video_ids = video_ids[:recall_num]
         #print(video_ids)
         filter_ = FilterVideos(request_id=self.request_id,
-                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
+                               expansion_factor=self.expansion_factor,
+                               risk_filter_flag=self.risk_filter_flag,
+                               app_region_filtered=self.app_region_filtered,
+                               videos_with_risk=self.videos_with_risk
+                               )
         filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
         if filtered_viewed_videos is None:
             return recall_result
@@ -3815,7 +3854,12 @@ class PoolRecall(object):
         video_ids = video_ids[:recall_num]
         #print(video_ids)
         filter_ = FilterVideos(request_id=self.request_id,
-                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+                               app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids,
+                               expansion_factor=self.expansion_factor,
+                               risk_filter_flag=self.risk_filter_flag,
+                               app_region_filtered=self.app_region_filtered,
+                               videos_with_risk=self.videos_with_risk
+                               )
         filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
         if filtered_viewed_videos is None:
             return recall_result