| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 | """@author: luojunhui"""import pymysqlfrom contextlib import contextmanagerfrom .exceptions import QueryError, TransactionErrorclass DatabaseConnector:    """    数据库连接器,使用 pymysql 进行 MySQL 数据库操作。    """    def __init__(self, db_config):        """        初始化数据库连接配置。        :param db_config:        """        self.db_config = db_config        self.connection = None    def connect(self):        """        建立数据库连接。        :raises ConnectionError: 如果无法连接到数据库。        """        try:            self.connection = pymysql.connect(                host=self.db_config.get('host', 'localhost'),                user=self.db_config['user'],                password=self.db_config['password'],                db=self.db_config['db'],                port=self.db_config.get('port', 3306),                charset=self.db_config.get('charset', 'utf8mb4')            )        except pymysql.MySQLError as e:            raise ConnectionError(f"无法连接到数据库: {e}")    def close(self):        """        关闭数据库连接。        """        if self.connection:            self.connection.close()            self.connection = None    def execute_query(self, query, params=None):        """        执行单条查询语句,并返回结果。        :param query: SQL 查询语句。        :param params: 可选的参数,用于参数化查询。        :return: 查询结果列表。        :raises QueryError: 如果执行查询时出错。        """        if not self.connection:            self.connect()        try:            with self.connection.cursor() as cursor:                cursor.execute(query, params)                result = cursor.fetchall()                return result        except pymysql.MySQLError as e:            self.connection.rollback()            raise QueryError(f"查询执行失败: {e}")    def execute_many(self, query, params_list):        """        执行多条查询语句。        :param query: SQL 查询语句。        :param params_list: 包含多个参数的列表。        :raises QueryError: 如果执行查询时出错。        """        if not self.connection:            self.connect()        try:            with self.connection.cursor() as cursor:                cursor.executemany(query, params_list)        except pymysql.MySQLError as e:            self.connection.rollback()            raise QueryError(f"批量查询执行失败: {e}")    def commit(self):        """        提交当前事务。        :raises TransactionError: 如果提交事务时出错。        """        if not self.connection:            raise TransactionError("没有活动的数据库连接。")        try:            self.connection.commit()            print("事务提交成功。")        except pymysql.MySQLError as e:            self.connection.rollback()            raise TransactionError(f"提交事务失败: {e}")    def rollback(self):        """        回滚当前事务。        :raises TransactionError: 如果回滚事务时出错。        """        if not self.connection:            raise TransactionError("没有活动的数据库连接。")        try:            self.connection.rollback()            print("事务已回滚。")        except pymysql.MySQLError as e:            raise TransactionError(f"回滚事务失败: {e}")    @contextmanager    def transaction(self):        """        上下文管理器,用于处理事务。        使用示例:            with db.transaction():                db.execute_query("INSERT INTO ...")                db.execute_query("UPDATE ...")        """        try:            yield            self.commit()        except Exception as e:            self.rollback()            raise e    def __enter__(self):        """        支持 with 语句,进入上下文时建立连接。        """        self.connect()        return self    def __exit__(self, exc_type, exc_val, exc_tb):        """        支持 with 语句,退出上下文时关闭连接。        """        if exc_type:            self.rollback()        self.close()
 |