import traceback import time import redis from config import set_config from log import Log config_ = set_config() log_ = Log() conn_redis = None class RedisHelper(object): def __init__(self, params=None, redis_info=config_.REDIS_INFO): """ 初始化redis连接信息 redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''} """ self.redis_info = redis_info self.host = redis_info['host'] self.port = redis_info['port'] self.password = redis_info['password'] self.params = params def connect(self): """ 连接redis :return: conn """ global conn_redis if conn_redis is None: pool = redis.ConnectionPool(host=self.host, port=self.port, password=self.password, decode_responses=True) conn = redis.Redis(connection_pool=pool) conn_redis = conn return conn_redis def key_exists(self, key_name): """ 判断key是否存在 :param key_name: key :return: 存在-True, 不存在-False """ # start_time = time.time() conn = self.connect() res = conn.exists(key_name) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'get_data_from_redis', # 'executeTime': (time.time() - start_time) * 1000 # }) return res def del_keys(self, key_name): """ 删除key :param key_name: key :return: None """ conn = self.connect() conn.delete(key_name) def get_data_from_redis(self, key_name): """ 读取redis中的数据 :param key_name: key :return: data """ # start_time = time.time() conn = self.connect() if not conn.exists(key_name): # key不存在 return None data = conn.get(key_name) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'get_data_from_redis', # 'executeTime': (time.time() - start_time) * 1000 # }) return data def set_data_to_redis(self, key_name, value, expire_time=24*3600): """ 新增数据 :param key_name: key :param value: 元素的值 videoId :param expire_time: 过期时间,单位:s,默认1天 :return: None """ # start_time = time.time() conn = self.connect() conn.set(key_name, value, ex=int(expire_time)) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'set_data_to_redis', # 'executeTime': (time.time() - start_time) * 1000}) def add_data_with_zset(self, key_name, data, expire_time=7*24*3600): """ 新增数据,有序set :param key_name: key :param data: 元素的值及对应分数 type-dict {value: score} :param expire_time: 过期时间,单位:s,默认7天,type-int :return: None """ # start_time = time.time() conn = self.connect() conn.zadd(key_name, data) # 设置过期时间 conn.expire(key_name, int(expire_time)) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'add_data_with_zset', # 'executeTime': (time.time() - start_time) * 1000 # }) def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False): """ 根据索引位置获取元素的值 :param key_name: key :param start: 索引起始点 闭区间,包含start :param end: 索引结束点 闭区间,包含end :param desc: 分数排序方式,默认从大到小 :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值 :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换 """ # start_time = time.time() conn = self.connect() if not conn.exists(key_name): return None data = conn.zrange(key_name, start, end, desc, with_scores) if with_scores: data = data else: data = [eval(value) for value in data] # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'get_data_zset_with_index', # 'executeTime': (time.time() - start_time) * 1000 # }) return data def get_all_data_from_zset(self, key_name, desc=True, with_scores=False): """ 获取zset中所有元素的值 :param key_name: key :param desc: 分数排序方式,默认从大到小 :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值 :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换 """ conn = self.connect() if not conn.exists(key_name): return None data = [] start = 0 step = 100 while True: end = start + step - 1 temp = conn.zrange(key_name, start, end, desc, with_scores) if not temp: break data.extend(temp) start += step return data def get_score_with_value(self, key_name, value): """ 在zset中,根据元素的value获取对应的score :param key_name: key :param value: 元素的值 :return: score value对应的score """ conn = self.connect() if not conn.exists(key_name): return None return conn.zscore(key_name, value) def get_rank_with_value(self, key_name, value, desc=False): """ 在zset中,根据元素的value获取对应排名 :param key_name: key :param value: 元素的值 :param desc: 是否倒序 type-bool 默认:False-按照score从小到大 :return: rank value对应的rank,从0开始,不存在返回None """ conn = self.connect() if not conn.exists(key_name): return None if desc is True: return conn.zrevrank(key_name, value) else: return conn.zrank(key_name, value) def update_score_with_value(self, key_name, value, score, expire_time=24*3600): """ 在zset中,修改元素value对应的score :param key_name: key :param value: 元素的值 :param score: value对应的score更新值 :param expire_time: 过期时间,单位:s,默认1天,type-int """ conn = self.connect() if conn.exists(key_name): conn.zadd(key_name, {value: score}) else: # key不存在时,需设置过期时间 conn.zadd(key_name, {value: score}) conn.expire(key_name, int(expire_time)) def remove_value_from_zset(self, key_name, value): """ 删除zset中的指定元素 :param key_name: key :param value: 元素的值 :return: None """ # start_time = time.time() conn = self.connect() res = conn.zrem(key_name, value) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'remove_value_from_zset', # 'executeTime': (time.time() - start_time) * 1000 # }) return res def get_index_with_data(self, key_name, value): """ 根据元素的值获取在有序set中的位置,按照分数倒序(从大到小) :param key_name: key :param value: 元素的值 :return: idx 位置索引 """ # start_time = time.time() conn = self.connect() res = conn.zrevrank(key_name, value) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'get_index_with_data', # 'executeTime': (time.time() - start_time) * 1000 # }) return res def get_data_from_set(self, key_name): """ 获取set中的所有数据 :param key_name: key :return: data """ # start_time = time.time() conn = self.connect() if not conn.exists(key_name): # key不存在 return None data = [] cursor = 0 while True: cur, temp = conn.sscan(key_name, cursor=cursor, count=2000) data.extend(temp) if cur == 0: break cursor = cur # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'get_data_from_set', # 'executeTime': (time.time() - start_time) * 1000 # }) return list(set(data)) def add_data_with_set(self, key_name, values, expire_time=30*60): """ 新增数据,set :param key_name: key :param values: 要添加的元素 类型-tuple :param expire_time: 过期时间,单位:s,默认0.5小时 type-int :return: None """ # start_time = time.time() conn = self.connect() conn.sadd(key_name, *values) # 设置过期时间 conn.expire(key_name, int(expire_time)) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'add_data_with_set', # 'executeTime': (time.time() - start_time) * 1000 # }) def data_exists_with_set(self, key_name, value): """ 判断元素value是否在集合key_name中 :param key_name: key :param value: 需判断的元素 :return: 存在-True, 不存在-False """ # start_time = time.time() conn = self.connect() res = conn.sismember(key_name, value) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'data_exists_with_set', # 'executeTime': (time.time() - start_time) * 1000 # }) return res def get_data_with_count_from_set(self, key_name, count=1): """ 从set中随机获取元素,并放回 :param key_name: key :param count: 获取个数, 默认为1 :return: """ conn = self.connect() data = conn.srandmember(name=key_name, number=count) return data def remove_value_from_set(self, key_name, values): """ 删除set中的指定元素 :param key_name: key :param values: 元素的值, 类型-tuple :return: None """ # start_time = time.time() conn = self.connect() res = conn.srem(key_name, *values) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'remove_value_from_set', # 'executeTime': (time.time() - start_time) * 1000 # }) return res def decr_key(self, key_name, amount=1, expire_time=30*60): """ redis自减 :param key_name: key :param amount: 自减数,默认为1,type-int :param expire_time: 过期时间,单位:s,默认0.5小时 type-int :return: None """ # start_time = time.time() conn = self.connect() conn.decr(name=key_name, amount=amount) conn.expire(key_name, int(expire_time)) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'decr_key', # 'executeTime': (time.time() - start_time) * 1000 # }) def incr_key(self, key_name, amount=1, expire_time=30*60): """ redis自增 :param key_name: key :param amount: 自增数,默认为1,type-int :param expire_time: 过期时间,单位:s,默认0.5小时 type-int :return: None """ # start_time = time.time() conn = self.connect() conn.incr(name=key_name, amount=amount) conn.expire(key_name, int(expire_time)) # if self.params is not None: # log_.info({ # 'logTimestamp': int(time.time() * 1000), # 'request_id': self.params.request_id, # 'operation': 'incr_key', # 'executeTime': (time.time() - start_time) * 1000 # }) def setnx_key(self, key_name, value, expire_time=5*60): """ 当key不存在时,将value塞入key中,key存在时不做操作 :param key_name: key :param value: value :return: 过期时间,单位:s,默认5分钟 type-int """ # start_time = time.time() conn = self.connect() conn.setnx(name=key_name, value=value) conn.expire(name=key_name, time=int(expire_time)) def get_batch_key(self, name_list): conn = self.connect() res = conn.mget(name_list) return res def mget(self, keys): conn = self.connect() data = conn.mget(keys=keys) return data if __name__ == '__main__': redis_helper = RedisHelper() key_name = f"previewed:videos:5:aan7" res = redis_helper.get_data_with_count_from_set( key_name=key_name, count=20) print(res) res1 = redis_helper.remove_value_from_set(key_name=key_name, values=tuple({'2881413'})) print(res1) res = redis_helper.get_data_with_count_from_set( key_name=key_name, count=20) print(res)