123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- """
- @author: luojunhui
- """
- import pymysql
- from contextlib import contextmanager
- from applications.exception import QueryError, TransactionError
- class DatabaseConnector:
- """
- 数据库连接器,使用 pymysql 进行 MySQL 数据库操作。
- """
- def __init__(self, db_config):
- """
- 初始化数据库连接配置。
- :param db_config:
- """
- self.db_config = db_config
- self.connection = None
- def connect(self):
- """
- 建立数据库连接。
- :raises ConnectionError: 如果无法连接到数据库。
- """
- try:
- self.connection = pymysql.connect(
- host=self.db_config.get("host", "localhost"),
- user=self.db_config["user"],
- password=self.db_config["password"],
- db=self.db_config["db"],
- port=self.db_config.get("port", 3306),
- charset=self.db_config.get("charset", "utf8mb4"),
- )
- except pymysql.MySQLError as e:
- raise ConnectionError(f"无法连接到数据库: {e}")
- def close(self):
- """
- 关闭数据库连接。
- """
- if self.connection:
- self.connection.close()
- self.connection = None
- def fetch(self, query, cursor_type=None, params=None):
- """
- 执行单条查询语句,并返回结果。
- :param params: 查询传参
- :param cursor_type: 输出的返回格式
- :param query: 查询语句
- :return: 查询结果列表
- :raises QueryError: 如果执行查询时出错。
- """
- if not self.connection:
- self.connect()
- try:
- with self.connection.cursor(cursor_type) as cursor:
- if params:
- cursor.execute(query, params)
- else:
- cursor.execute(query)
- result = cursor.fetchall()
- return result
- except pymysql.MySQLError as e:
- self.rollback()
- raise QueryError(e, query)
- def save(self, query, params):
- """
- 执行单条插入、更新语句
- :param query:
- :param params:
- :return:
- """
- if not self.connection:
- self.connect()
- try:
- with self.connection.cursor() as cursor:
- affected_rows = cursor.execute(query, params)
- if affected_rows:
- self.commit()
- return affected_rows
- else:
- self.rollback()
- return 0
- except pymysql.MySQLError as e:
- self.rollback()
- raise QueryError(e, query)
- def save_many(self, query, params_list):
- """
- 执行多条查询语句
- :param query: SQL 查询语句。
- :param params_list: 包含多个参数的列表。
- :raises QueryError: 如果执行查询时出错。
- """
- if not self.connection:
- self.connect()
- try:
- with self.connection.cursor() as cursor:
- affected_rows = cursor.executemany(query, params_list)
- if affected_rows:
- self.commit()
- return affected_rows
- else:
- self.rollback()
- return 0
- except pymysql.MySQLError as e:
- self.rollback()
- raise QueryError(e, query)
- def commit(self):
- """
- 提交当前事务。
- :raises TransactionError: 如果提交事务时出错。
- """
- if not self.connection:
- raise TransactionError("没有活动的数据库连接。")
- try:
- self.connection.commit()
- except pymysql.MySQLError as e:
- self.connection.rollback()
- raise TransactionError(f"提交事务失败: {e}")
- def rollback(self):
- """
- 回滚当前事务。
- :raises TransactionError: 如果回滚事务时出错。
- """
- if not self.connection:
- raise TransactionError("没有活动的数据库连接。")
- try:
- self.connection.rollback()
- except pymysql.MySQLError as e:
- raise TransactionError(f"回滚事务失败: {e}")
- @contextmanager
- def transaction(self):
- """
- 上下文管理器,用于处理事务
- """
- try:
- yield self.commit()
- except Exception as e:
- self.rollback()
- raise e
- def __enter__(self):
- """
- 支持 with 语句,进入上下文时建立连接。
- """
- self.connect()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- 支持 with 语句,退出上下文时关闭连接。
- """
- if exc_type:
- self.rollback()
- self.close()
|