from my_utils import parse_json_for_risk_rule from my_utils import parse_json_for_risk_videos from db_helper import RedisHelper from config import set_config import json from log import Log config_ = set_config() log_ = Log() RISK_SHIELD_FILTER_RULE_V1_JSON = "RISK_SHIELD_FILTER_RULE_V1_JSON" RISK_SHIELD_FILTER_VIDEO_V1_STR = "RISK_SHIELD_FILTER_VIDEO_V1_STR" RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT = "RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT" RISK_SHIELD_FILTER_FLAG_BOOL = "RISK_SHIELD_FILTER_FLAG_BOOL" TAGS_FILTER_FLAG_BOOL = "TAGS_FILTER_FLAG_BOOL" TAGS_FILTER_RULE_V1_JSON = "TAGS_FILTER_RULE_V1_JSON" ALG_RECSYS_RANK_DENSITY_RULES_JSON = "ALG_RECSYS_RANK_DENSITY_RULES_JSON" def param_update_risk_rule() -> dict: """ 定时更新风险过滤的规则 key=RISK_SHIELD_FILTER_RULE_V1_JSON value= "{\"0\": [\"110000\"]}" """ redis_helper = RedisHelper() tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_RULE_V1_JSON) if tmp is not None: data = parse_json_for_risk_rule(tmp) return data return {} def param_update_risk_videos() -> list: """ 定时更新风险过滤的videos key = "RISK_SHIELD_FILTER_VIDEO_V1_STR" value = "7536230,1,2,3,4,5,6,7,8,9,10" """ redis_helper = RedisHelper() tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_VIDEO_V1_STR) if tmp is not None: data = parse_json_for_risk_videos(tmp) return data return [] def param_update_expansion_factor() -> int: """ 定时更新扩大倍数 key = "RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT" value = "5" """ redis_helper = RedisHelper() tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_EXPANSION_FACTOR_INT) data = 5 if tmp is not None: try: data = int(tmp) except Exception as e: data = 5 # 容灾 if data < 5: data = 5 elif data > 25: data = 25 return data def param_update_risk_filter_flag() -> bool: """ 定时更新 是否过滤 key = "RISK_SHIELD_FILTER_FLAG_BOOL" value = "False" """ redis_helper = RedisHelper() tmp = redis_helper.get_data_from_redis(key_name=RISK_SHIELD_FILTER_FLAG_BOOL) data = False if tmp is not None: try: data = True if tmp.lower() == "true" else False except Exception as e: data = False return data def param_update_filter_flags() -> [bool, bool]: """ 用getbatch的方式从redis中取所有filter的flag key1 = "RISK_SHIELD_FILTER_FLAG_BOOL" key2 = "TAGS_FILTER_FLAG_BOOL" value = "False" """ redis_helper = RedisHelper() key_list = [ RISK_SHIELD_FILTER_FLAG_BOOL, TAGS_FILTER_FLAG_BOOL ] tmp = redis_helper.get_batch_key(name_list=key_list) data1 = False data2 = False if tmp is not None: try: data1 = True if tmp[0].lower() == "true" else False except Exception as e: data1 = False if tmp is not None: try: data2 = True if tmp[1].lower() == "true" else False except Exception as e: data2 = False return [data1, data2] def param_update_rule(redis_helper: RedisHelper, rule_key_str: str) -> dict: tmp = redis_helper.get_data_from_redis(key_name=rule_key_str) if tmp is not None: try: data = json.loads(tmp) return data except Exception as e: log_.error("{}: parse json is wrong with in param_update_rule:{}".format(e, tmp)) return {} def param_update_density_rule(redis_helper: RedisHelper): tmp = redis_helper.get_data_from_redis(key_name=ALG_RECSYS_RANK_DENSITY_RULES_JSON) if tmp is not None: try: data = json.loads(tmp) return data except Exception as e: log_.error("{}: parse json is wrong with in param_update_density_rule:{}".format(e, tmp)) return {} if __name__ == '__main__': pass d1 = param_update_risk_rule() d2 = param_update_risk_videos() print(d1, type(d1)) print(d2, type(d2)) d3 = param_update_expansion_factor() d4 = param_update_risk_filter_flag() print(d3, type(d3)) print(d4, type(d4)) redis_helper = RedisHelper() d5 = param_update_rule(redis_helper) print(d5, type(d5))