""" @author: luojunhui """ import pymysql from contextlib import contextmanager from applications.exception import QueryError, TransactionError class DatabaseConnector: """ 数据库连接器,使用 pymysql 进行 MySQL 数据库操作。 """ def __init__(self, db_config): """ 初始化数据库连接配置。 :param db_config: """ self.db_config = db_config self.connection = None def connect(self): """ 建立数据库连接。 :raises ConnectionError: 如果无法连接到数据库。 """ try: self.connection = pymysql.connect( host=self.db_config.get("host", "localhost"), user=self.db_config["user"], password=self.db_config["password"], db=self.db_config["db"], port=self.db_config.get("port", 3306), charset=self.db_config.get("charset", "utf8mb4"), ) except pymysql.MySQLError as e: raise ConnectionError(f"无法连接到数据库: {e}") def close(self): """ 关闭数据库连接。 """ if self.connection: self.connection.close() self.connection = None def fetch(self, query, cursor_type=None, params=None): """ 执行单条查询语句,并返回结果。 :param params: 查询传参 :param cursor_type: 输出的返回格式 :param query: 查询语句 :return: 查询结果列表 :raises QueryError: 如果执行查询时出错。 """ if not self.connection: self.connect() try: with self.connection.cursor(cursor_type) as cursor: if params: cursor.execute(query, params) else: cursor.execute(query) result = cursor.fetchall() return result except pymysql.MySQLError as e: self.rollback() raise QueryError(e, query) def save(self, query, params): """ 执行单条插入、更新语句 :param query: :param params: :return: """ if not self.connection: self.connect() try: with self.connection.cursor() as cursor: affected_rows = cursor.execute(query, params) if affected_rows: self.commit() return affected_rows else: self.rollback() return 0 except pymysql.MySQLError as e: self.rollback() raise QueryError(e, query) def save_many(self, query, params_list): """ 执行多条查询语句 :param query: SQL 查询语句。 :param params_list: 包含多个参数的列表。 :raises QueryError: 如果执行查询时出错。 """ if not self.connection: self.connect() try: with self.connection.cursor() as cursor: affected_rows = cursor.executemany(query, params_list) if affected_rows: self.commit() return affected_rows else: self.rollback() return 0 except pymysql.MySQLError as e: self.rollback() raise QueryError(e, query) def commit(self): """ 提交当前事务。 :raises TransactionError: 如果提交事务时出错。 """ if not self.connection: raise TransactionError("没有活动的数据库连接。") try: self.connection.commit() except pymysql.MySQLError as e: self.connection.rollback() raise TransactionError(f"提交事务失败: {e}") def rollback(self): """ 回滚当前事务。 :raises TransactionError: 如果回滚事务时出错。 """ if not self.connection: raise TransactionError("没有活动的数据库连接。") try: self.connection.rollback() except pymysql.MySQLError as e: raise TransactionError(f"回滚事务失败: {e}") @contextmanager def transaction(self): """ 上下文管理器,用于处理事务 """ try: yield self.commit() except Exception as e: self.rollback() raise e def __enter__(self): """ 支持 with 语句,进入上下文时建立连接。 """ self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """ 支持 with 语句,退出上下文时关闭连接。 """ if exc_type: self.rollback() self.close()