Pārlūkot izejas kodu

Merge branch 'feature/zhangbo_demand' of algorithm/rov-server into master

zhangbo 1 gadu atpakaļ
vecāks
revīzija
2f25b90c63
7 mainītis faili ar 289 papildinājumiem un 65 dzēšanām
  1. 1 0
      config.py
  2. 0 0
      parameter_dir/__init__.py
  3. 90 0
      parameter_dir/parameter_update.py
  4. 130 59
      utils.py
  5. 0 0
      utils_dir/__init__.py
  6. 36 0
      utils_dir/my_utils.py
  7. 32 6
      video_recall.py

+ 1 - 0
config.py

@@ -18,6 +18,7 @@ class BaseConfig(object):
         'JOURNEY': 22,  # 票圈足迹
         'BLESSING_YEAR': 3,  # 票圈福年
         'H5': 12,  # H5
+        'PIAO_QUAN_BLESSING': 2, # 票圈 | 祝福
     }
     # 白名单(影视,宗教)过滤Redis
     REDIS_INFO_FILTER = {

+ 0 - 0
parameter_dir/__init__.py


+ 90 - 0
parameter_dir/parameter_update.py

@@ -0,0 +1,90 @@
+import sys
+sys.path.append("../utils_dir/")
+sys.path.append("utils_dir/")
+from my_utils import parse_json_for_risk_rule
+from my_utils import parse_json_for_risk_videos
+from db_helper import RedisHelper
+from config import set_config
+
+
+config_ = set_config()
+
+RISK_SHIELD_FILTER_RULE_V1_JSON = "RISK_SHIELD_FILTER_RULE_V1_JSON"
+RISK_SHIELD_FILTER_VIDEO_V1_STR = "RISK_SHIELD_FILTER_VIDEO_V1_STR"
+RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT = "RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT"
+RISK_SHIELD_FILTER_FLAG_BOOL = "RISK_SHIELD_FILTER_FLAG_BOOL"
+
+def param_update_risk_rule() -> dict:
+    """
+    定时更新风险过滤的规则
+    key=RISK_SHIELD_FILTER_RULE_V1_JSON
+    value= "{\"0\": [\"110000\"]}"
+    """
+    redis_helper = RedisHelper()
+    tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_RULE_V1_JSON)
+    if tmp is not None:
+        data = parse_json_for_risk_rule(tmp)
+        return data
+    return {}
+def param_update_risk_videos() -> list:
+    """
+    定时更新风险过滤的videos
+    key = "RISK_SHIELD_FILTER_VIDEO_V1_STR"
+    value = "7536230,1,2,3,4,5,6,7,8,9,10"
+    """
+    redis_helper = RedisHelper()
+    tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_VIDEO_V1_STR)
+    if tmp is not None:
+        data = parse_json_for_risk_videos(tmp)
+        return data
+    return []
+
+def param_update_expansion_factor() -> int:
+    """
+    定时更新扩大倍数
+    key = "RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT"
+    value = "5"
+    """
+    redis_helper = RedisHelper()
+    tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT)
+    data = 5
+    if tmp is not None:
+        try:
+            data = int(tmp)
+        except Exception as e:
+            data = 5
+    # 容灾
+    if data < 5:
+        data = 5
+    elif data > 15:
+        data = 5
+    return data
+
+def param_update_risk_filter_flag() -> bool:
+    """
+    定时更新 是否过滤
+    key = "RISK_SHIELD_FILTER_FLAG_BOOL"
+    value = "False"
+    """
+    redis_helper = RedisHelper()
+    tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_FLAG_BOOL)
+    data = False
+    if tmp is not None:
+        try:
+            data = bool(tmp)
+        except Exception as e:
+            data = False
+    return data
+
+if __name__ == '__main__':
+    pass
+    d1 = param_update_risk_rule()
+    d2 = param_update_risk_videos()
+    print(d1, type(d1))
+    print(d2, type(d2))
+    d3 = param_update_expansion_factor()
+    d4 = param_update_risk_filter_flag()
+    print(d3, type(d3))
+    print(d4, type(d4))
+
+

+ 130 - 59
utils.py

@@ -6,13 +6,18 @@ import time
 import gevent
 import pandas as pd
 import random
-
 from datetime import datetime
 # from db_helper import HologresHelper, RedisHelper, MysqlHelper
 from db_helper import RedisHelper, MysqlHelper
 from config import set_config
 from log import Log
 
+import sys
+sys.path.append("parameter_dir/")
+from parameter_update import param_update_risk_rule
+from parameter_update import param_update_risk_videos
+from parameter_update import param_update_risk_filter_flag
+
 config_ = set_config()
 log_ = Log()
 
@@ -319,9 +324,24 @@ class FilterVideos(object):
 
     def filter_videos(self, pool_type='rov', region_code=None, shield_config=None):
         """视频过滤"""
+        # todo: 添加app和region的风险过滤。
+        st_viewed = time.time()
+        videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'pool_type': "zhangbo-filter-pool_type",
+        #     'request_id': self.request_id,
+        #     'app_type': self.app_type,
+        #     'mid': "zhangbo-filter_videos",
+        #     'uid': self.uid,
+        #     'operation': 'shield_filter',
+        #     'request_videos': self.video_ids,
+        #     'shield_filter_result': videos_filtered,
+        #     'executeTime': (time.time() - st_viewed) * 1000
+        # })
         # 预曝光过滤
         st_pre = time.time()
-        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        filtered_pre_result = self.filter_video_previewed(videos_filtered)
         # print("filtered_pre:", (time.time()-st_pre)*1000)
         # et_pre = time.time()
         # log_.info({
@@ -368,34 +388,36 @@ class FilterVideos(object):
         if not filtered_viewed_result:
             return None
         filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
-        if pool_type == 'flow' or pool_type=='normal':
-            # 流量池视频需过滤屏蔽视频
-            if region_code is None or shield_config is None:
-                return filtered_viewed_videos
-            else:
-                shield_key_name_list = shield_config.get(region_code, None)
-                if shield_key_name_list is not None:
-                    filtered_shield_video_ids = self.filter_shield_video(
-                        video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
-                    )
-                    log_.info({
-                        'logTimestamp': int(time.time() * 1000),
-                        'pool_type': pool_type,
-                        'request_id': self.request_id,
-                        'app_type': self.app_type,
-                        'mid': self.mid,
-                        'uid': self.uid,
-                        'operation': 'shield_filter',
-                        'request_videos': filtered_viewed_videos,
-                        'shield_filter_result': filtered_shield_video_ids,
-                        'executeTime': (time.time() - st_viewed) * 1000
-                    })
-                    # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
-                    return filtered_shield_video_ids
-                else:
-                    return filtered_viewed_videos
-        else:
-            return filtered_viewed_videos
+        return filtered_viewed_videos
+
+        # if pool_type == 'flow' or pool_type=='normal':
+        #     # 流量池视频需过滤屏蔽视频
+        #     if region_code is None or shield_config is None:
+        #         return filtered_viewed_videos
+        #     else:
+        #         shield_key_name_list = shield_config.get(region_code, None)
+        #         if shield_key_name_list is not None:
+        #             filtered_shield_video_ids = self.filter_shield_video(
+        #                 video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
+        #             )
+        #             log_.info({
+        #                 'logTimestamp': int(time.time() * 1000),
+        #                 'pool_type': pool_type,
+        #                 'request_id': self.request_id,
+        #                 'app_type': self.app_type,
+        #                 'mid': self.mid,
+        #                 'uid': self.uid,
+        #                 'operation': 'shield_filter',
+        #                 'request_videos': filtered_viewed_videos,
+        #                 'shield_filter_result': filtered_shield_video_ids,
+        #                 'executeTime': (time.time() - st_viewed) * 1000
+        #             })
+        #             # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
+        #             return filtered_shield_video_ids
+        #         else:
+        #             return filtered_viewed_videos
+        # else:
+        #     return filtered_viewed_videos
 
     def filter_video_previewed(self, video_ids):
         """
@@ -695,9 +717,24 @@ class FilterVideos(object):
 
     def filter_videos_status(self, pool_type='rov', region_code=None, shield_config=None):
         """视频过滤"""
+        # todo: 添加app和region的风险过滤。
+        st_viewed = time.time()
+        videos_filtered = self.filter_videos_with_risk_video(self.video_ids, self.app_type, region_code)
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'pool_type': "zhangbo-filter-pool_type",
+        #     'request_id': self.request_id,
+        #     'app_type': self.app_type,
+        #     'mid': "zhangbo-filter_videos_status",
+        #     'uid': self.uid,
+        #     'operation': 'shield_filter',
+        #     'request_videos': self.video_ids,
+        #     'shield_filter_result': videos_filtered,
+        #     'executeTime': (time.time() - st_viewed) * 1000
+        # })
         # 预曝光过滤
         st_pre = time.time()
-        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        filtered_pre_result = self.filter_video_previewed(videos_filtered)
         # print("filtered_pre:", (time.time()-st_pre)*1000)
         # et_pre = time.time()
         # log_.info({
@@ -744,34 +781,36 @@ class FilterVideos(object):
         if not filtered_viewed_result:
             return None
         filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
-        if pool_type == 'flow' or pool_type=='normal':
-            # 流量池视频需过滤屏蔽视频
-            if region_code is None or shield_config is None:
-                return filtered_viewed_videos
-            else:
-                shield_key_name_list = shield_config.get(region_code, None)
-                if shield_key_name_list is not None:
-                    filtered_shield_video_ids = self.filter_shield_video(
-                        video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
-                    )
-                    log_.info({
-                        'logTimestamp': int(time.time() * 1000),
-                        'pool_type': pool_type,
-                        'request_id': self.request_id,
-                        'app_type': self.app_type,
-                        'mid': self.mid,
-                        'uid': self.uid,
-                        'operation': 'shield_filter',
-                        'request_videos': filtered_viewed_videos,
-                        'shield_filter_result': filtered_shield_video_ids,
-                        'executeTime': (time.time() - st_viewed) * 1000
-                    })
-                    # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
-                    return filtered_shield_video_ids
-                else:
-                    return filtered_viewed_videos
-        else:
-            return filtered_viewed_videos
+        return filtered_viewed_videos
+
+        # if pool_type == 'flow' or pool_type=='normal':
+        #     # 流量池视频需过滤屏蔽视频
+        #     if region_code is None or shield_config is None:
+        #         return filtered_viewed_videos
+        #     else:
+        #         shield_key_name_list = shield_config.get(region_code, None)
+        #         if shield_key_name_list is not None:
+        #             filtered_shield_video_ids = self.filter_shield_video(
+        #                 video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
+        #             )
+        #             log_.info({
+        #                 'logTimestamp': int(time.time() * 1000),
+        #                 'pool_type': pool_type,
+        #                 'request_id': self.request_id,
+        #                 'app_type': self.app_type,
+        #                 'mid': self.mid,
+        #                 'uid': self.uid,
+        #                 'operation': 'shield_filter',
+        #                 'request_videos': filtered_viewed_videos,
+        #                 'shield_filter_result': filtered_shield_video_ids,
+        #                 'executeTime': (time.time() - st_viewed) * 1000
+        #             })
+        #             # print("filtered_pre flow:", (time.time() - st_viewed) * 1000)
+        #             return filtered_shield_video_ids
+        #         else:
+        #             return filtered_viewed_videos
+        # else:
+        #     return filtered_viewed_videos
     def filter_video_viewed_status(self, video_ids, types=(1, 6,)):
         """
                    调用后端接口过滤用户已观看视频
@@ -808,6 +847,38 @@ class FilterVideos(object):
         filtered_videos = result['data']
         return filtered_videos
 
+    def filter_videos_with_risk_video(self, video_ids, app_type, region_code):
+        # 0 用一个开关控制,是否过滤生效。 便于回滚功能。
+        flag = param_update_risk_filter_flag()
+        if not flag:
+            return video_ids[0: min(20, len(video_ids))]
+        # 1 判断是否过滤,不展示的app+区域列表,-1必须过滤
+        app_region_filtered = param_update_risk_rule()
+        if app_type in app_region_filtered.keys():
+            if_filtered = False
+            if region_code in app_region_filtered[app_type]:
+                if_filtered = True
+        else:
+            if_filtered = True
+        if region_code == -1:
+            if_filtered = True
+        if not if_filtered:
+            return video_ids[0: min(20, len(video_ids))]
+        # 2 确认过滤,获取风险video列表param_update_risk_videos
+        videos_with_risk = param_update_risk_videos()
+        # 3 过滤 返回结果
+        video_ids_new = [i for i in video_ids if i not in videos_with_risk]
+        # print(flag)
+        # print(app_region_filtered)
+        # print(video_ids)
+        # print(app_type)
+        # print(region_code)
+        # print(videos_with_risk)
+        # print(video_ids_new)
+        # print(len(video_ids))
+        # print(len(video_ids_new))
+        return video_ids_new[0: min(20, len(video_ids_new))]
+
 if __name__ == '__main__':
     user = [
         ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),

+ 0 - 0
utils_dir/__init__.py


+ 36 - 0
utils_dir/my_utils.py

@@ -0,0 +1,36 @@
+import json
+from log import Log
+
+log_ = Log()
+
+def parse_json_for_risk_rule(s: str) -> dict:
+    """
+    定时更新风险过滤的规则
+    key=RISK_SHIELD_FILTER_RULE_V1_JSON
+    value= "{\"VLOG\": [\"北京\"]}"
+    """
+    # 1 解析json格式,格式错误直接返回空字典。
+    try:
+        data = json.loads(s)
+        data_new = {}
+        for k, v in data.items():
+            data_new[int(k)] = v
+    except Exception as e:
+        log_.error("{}: parse json is wrong with in parse_json_for_risk_rule:{}".format(e, s))
+        return {}
+    return data_new
+
+def parse_json_for_risk_videos(s: str) -> list:
+    """
+    定时更新风险过滤的规则
+    key = "RISK_SHIELD_FILTER_VIDEO_V1_STR"
+    value = "7536230,1,2,3,4,5,6,7,8,9,10"
+    """
+    # 1 解析字符串,格式错误直接返回空list。
+    try:
+        data = s.split(",")
+        data_new = [int(i) for i in data]
+    except Exception as e:
+        log_.error("{}: parse str is wrong with in parse_json_for_risk_videos:{}".format(e, s))
+        return []
+    return data_new

+ 32 - 6
video_recall.py

@@ -8,7 +8,10 @@ from db_helper import RedisHelper
 from config import set_config
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count, send_msg_to_feishu
 import gevent
-import  json
+import json
+import sys
+sys.path.append("parameter_dir/")
+from parameter_update import param_update_expansion_factor
 
 log_ = Log()
 config_ = set_config()
@@ -46,6 +49,10 @@ class PoolRecall(object):
         self.h_data_key = data_key
         self.h_rule_key = h_rule_key
 
+        self.expansion_factor = param_update_expansion_factor()
+
+
+
     def copy_redis_zset_data(self, from_key_name, to_key_name):
         # 获取from_key_name中的数据
         records = self.redis_helper.get_data_zset_with_index(key_name=from_key_name, start=0, end=-1, with_scores=True)
@@ -2804,7 +2811,9 @@ class PoolRecall(object):
         recall_data = []
         pool_recall_result = []
         # 每次获取的视频数
-        get_size = size * 5
+        # get_size = size * 5
+        get_size = size * self.expansion_factor
+
         # 记录获取频次
         freq = 0
         while len(pool_recall_result) < size:
@@ -2828,7 +2837,7 @@ class PoolRecall(object):
             # 过滤
             filter_ = FilterVideos(request_id=self.request_id,
                                    app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-            ge = gevent.spawn(filter_.filter_videos)
+            ge = gevent.spawn(filter_.filter_videos, '', province_code, None)
             ge.join()
             filtered_result = ge.get()
 
@@ -3134,7 +3143,7 @@ class PoolRecall(object):
         #print(video_ids)
         filter_ = FilterVideos(request_id=self.request_id,
                                app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
+        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
         if filtered_viewed_videos is None:
             return recall_result
         #print("filtered_viewed_videos:", filtered_viewed_videos)
@@ -3705,7 +3714,7 @@ class PoolRecall(object):
         #print(video_ids)
         filter_ = FilterVideos(request_id=self.request_id,
                                app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
+        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
         if filtered_viewed_videos is None:
             return recall_result
         #print("filtered_viewed_videos:", filtered_viewed_videos)
@@ -3808,7 +3817,7 @@ class PoolRecall(object):
         #print(video_ids)
         filter_ = FilterVideos(request_id=self.request_id,
                                app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
-        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal')
+        filtered_viewed_videos = filter_.filter_videos_status(pool_type='normal', region_code=self.get_region_code())
         if filtered_viewed_videos is None:
             return recall_result
         #print("filtered_viewed_videos:", filtered_viewed_videos)
@@ -3863,3 +3872,20 @@ class PoolRecall(object):
                 recall_result.append(recall_dict[vid])
         #print("u2i recall_result:", recall_result)
         return recall_result
+
+    def get_region_code(self):
+        # 获取存在城市分组数据的城市编码列表
+        city_code_list = [code for _, code in config_.CITY_CODE.items()]
+        # 获取provinceCode
+        province_code = self.client_info.get('provinceCode', '-1')
+        # 获取cityCode
+        city_code = self.client_info.get('cityCode', '-1')
+
+        if city_code in city_code_list:
+            # 分城市数据存在时,获取城市分组数据
+            region_code = city_code
+        else:
+            region_code = province_code
+        if region_code == '':
+            region_code = '-1'
+        return region_code