# -*- coding: utf-8 -*- # @Author: luojunhui # @Time: 2023/12/19 """ 数据库连接及操作 """ import redis import pymysql import os import sys sys.path.append(os.getcwd()) from application.common.log import Local from application.config.mysql_config import env_dict class MysqlHelper(object): """ MySQL """ def __init__(self, env, mode, platform, action=''): mysql_config = env_dict[env] self.connection = pymysql.connect( host=mysql_config['host'], # 数据库IP地址,内网地址 port=mysql_config['port'], # 端口号 user=mysql_config['user'], # mysql用户名 passwd=mysql_config['passwd'], # mysql用户登录密码 db=mysql_config['db'], # 数据库名 charset=mysql_config['charset'] # 如果数据库里面的文本是utf8编码的,charset指定是utf8 ) self.mode = mode self.platform = platform self.action = action def select(self, sql): cursor = self.connection.cursor() cursor.execute(sql) data = cursor.fetchall() return data def update(self, sql): cursor = self.connection.cursor() try: res = cursor.execute(sql) self.connection.commit() return res except Exception as e: Local.logger(self.mode, self.platform).error(f"update_values异常,进行回滚操作:{e}\n") self.connection.rollback() def close(self): self.connection.close() class RedisHelper: @classmethod def connect_redis(cls, env): if env == 'hk': redis_pool = redis.ConnectionPool( # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com', # 内网地址 # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com', # 测试地址 host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com', # 外网地址 port=6379, db=2, password='Wqsd@2019' ) redis_conn = redis.Redis(connection_pool=redis_pool) elif env == 'prod': redis_pool = redis.ConnectionPool( host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com', # 内网地址 # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com', # 外网地址 port=6379, db=2, password='Wqsd@2019' ) redis_conn = redis.Redis(connection_pool=redis_pool) else: redis_pool = redis.ConnectionPool( # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com', # 内网地址 host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com', # 外网地址 port=6379, db=2, password='Qingqu2019' ) redis_conn = redis.Redis(connection_pool=redis_pool) return redis_conn @classmethod def redis_push(cls, env, task_key, data): redis_conn = cls.connect_redis(env) # print("开始写入数据") redis_conn.lpush(task_key, data) # print("数据写入完成") @classmethod def redis_pop(cls, env, task_key): redis_conn = cls.connect_redis(env) if redis_conn.llen(task_key) == 0: return None else: return redis_conn.rpop(task_key)