# coding:utf-8 import redis import psycopg2 import pymysql from config import set_config from log import Log config_, _ = set_config() log = Log() class RedisHelper(object): def __init__(self): """ 初始化redis连接信息 redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''} """ redis_info = config_.REDIS_INFO self.host = redis_info['host'] self.port = redis_info['port'] self.password = redis_info['password'] def connect(self): """ 连接redis :return: conn """ pool = redis.ConnectionPool(host=self.host, port=self.port, password=self.password, decode_responses=True) conn = redis.Redis(connection_pool=pool) return conn def key_exists(self, key_name): """ 判断key是否存在 :param key_name: key :return: 存在-True, 不存在-False """ conn = self.connect() return conn.exists(key_name) def del_keys(self, key_name): """ 删除key :param key_name: key :return: None """ conn = self.connect() conn.delete(key_name) def get_data_from_redis(self, key_name): """ 读取redis中的数据 :param key_name: key :return: data """ conn = self.connect() if not conn.exists(key_name): # key不存在 return None data = conn.get(key_name) return data def set_data_to_redis(self, key_name, value, expire_time=24*3600): """ 新增数据 :param key_name: key :param value: 元素的值 videoId :param expire_time: 过期时间,单位:s,默认1天 :return: None """ conn = self.connect() conn.set(key_name, value, ex=expire_time) def add_data_with_zset(self, key_name, data, expire_time=7*24*3600): """ 新增数据,有序set :param key_name: key :param data: 元素的值及对应分数 type-dict {value: score} :param expire_time: 过期时间,单位:s,默认7天 :return: None """ conn = self.connect() conn.zadd(key_name, data) # 设置过期时间 conn.expire(key_name, expire_time) def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False): """ 根据索引位置获取元素的值 :param key_name: key :param start: 索引起始点 闭区间,包含start :param end: 索引结束点 闭区间,包含end :param desc: 分数排序方式,默认从大到小 :param with_scores: 是否获取元素的分数,默认 False,只获取元素的值 :return: data 元素值列表(不包含分数),value(videoId)类型转换为int, 包含分数时不进行类型转换 """ conn = self.connect() if not conn.exists(key_name): return None data = conn.zrange(key_name, start, end, desc, with_scores) return data # if with_scores: # return data # else: # return [eval(value) for value in data] def get_score_with_value(self, key_name, value): """ 在zset中,根据元素的value获取对应的score :param key_name: key :param value: 元素的值 :return: score value对应的score """ conn = self.connect() return conn.zscore(key_name, value) def update_score_with_value(self, key_name, value, score, expire_time=7*24*3600): """ 在zset中,修改元素value对应的score :param key_name: key :param value: 元素的值 :param score: value对应的score更新值 :param expire_time: 过期时间,单位:s,默认7天 """ conn = self.connect() if conn.exists(key_name): conn.zadd(key_name, {value: score}) else: # key不存在时,需设置过期时间 conn.zadd(key_name, {value: score}) conn.expire(key_name, expire_time) def remove_value_from_zset(self, key_name, value): """ 删除zset中的指定元素 :param key_name: key :param value: 元素的值 :return: None """ conn = self.connect() conn.zrem(key_name, *value) def remove_by_rank_from_zset(self, key_name, start, stop): """ 移除有序集中,指定排名(rank)区间内的所有成员 :param key_name: key :param start: 开始位 :param stop: 结束位 :return: None """ conn = self.connect() conn.zremrangebyrank(name=key_name, min=start, max=stop) def get_index_with_data(self, key_name, value): """ 根据元素的值获取在有序set中的位置,按照分数倒序(从大到小) :param key_name: key :param value: 元素的值 :return: idx 位置索引 """ conn = self.connect() return conn.zrevrank(key_name, value) def get_data_from_set(self, key_name): """ 获取set中的所有数据 :param key_name: key :return: data """ conn = self.connect() if not conn.exists(key_name): # key不存在 return None data = conn.sscan(key_name) return data[1] def add_data_with_set(self, key_name, values, expire_time=30*60): """ 新增数据,set :param key_name: key :param values: 要添加的元素 类型-set :param expire_time: 过期时间,单位:s,默认0.5小时 :return: None """ conn = self.connect() conn.sadd(key_name, *values) # 设置过期时间 conn.expire(key_name, expire_time) def data_exists_with_set(self, key_name, value): """ 判断元素value是否在集合key_name中 :param key_name: key :param value: 需判断的元素 :return: 存在-True, 不存在-False """ conn = self.connect() return conn.sismember(key_name, value) def remove_value_from_set(self, key_name, values): """ 删除set中的指定元素 :param key_name: key :param values: 元素的值, 类型-set :return: None """ conn = self.connect() conn.srem(key_name, *values) def persist_key(self, key_name): """ 移除key的过期时间,将其转换为永久状态 :param key_name: key :return: """ conn = self.connect() conn.persist(key_name) class HologresHelper(object): def __init__(self): """初始化hologres连接信息""" self.hologres_info = config_.HOLOGRES_INFO def get_data(self, sql): # 连接Hologres conn = psycopg2.connect(**self.hologres_info) # 创建游标 cur = conn.cursor() # 查询数据 cur.execute(sql) data = cur.fetchall() # 提交事务 conn.commit() # 释放资源 cur.close() conn.close() return data class MysqlHelper(object): def __init__(self): """ 初始化mysql连接信息 """ self.mysql_info = config_.MYSQL_INFO def get_data(self, sql): """ 查询数据 :param sql: sql语句 :return: data """ # 连接数据库 conn = pymysql.connect(**self.mysql_info) # 创建游标 cursor = conn.cursor() try: # 执行SQL语句 cursor.execute(sql) # 获取查询的所有记录 data = cursor.fetchall() except Exception as e: return None # 关闭游标对象 cursor.close() # 关闭数据库连接 conn.close() return data if __name__ == '__main__': redis_helper = RedisHelper() key = 'com.weiqu.video.hot.recommend.item.score.20210901' res = redis_helper.get_score_with_value(key, 90797) print(res)