import logging import pymysql import pymysql.cursors class MySQLHelper: def __init__(self, host, username, password, database, **kwargs): """ 初始化 MySQLHelper 实例。 :param host: 数据库地址 :param username: 数据库用户名 :param password: 数据库密码 :param database: 数据库名称 :param kwargs: 可选其他数据库参数(如端口、超时等) """ self.connection_args = { 'host': host, 'user': username, 'password': password, 'database': database, 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor, } self.connection_args.update(kwargs) self.conn = None self._connect() logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def _connect(self): """建立数据库连接,支持断开后自动重连。""" try: if self.conn is None or not self.conn.open: self.conn = pymysql.connect(**self.connection_args) logging.info("成功连接到数据库。") except pymysql.MySQLError as e: logging.error(f"数据库连接失败: {e}") raise def execute_query(self, sql: str, params: tuple = None) -> list[dict]: """ 执行 SELECT 查询。 :param sql: 查询的 SQL 语句 :param params: 可选的 SQL 参数 :return: 查询结果的列表,每条记录为字典 """ self._connect() try: with self.conn.cursor() as cursor: cursor.execute(sql, params) results = cursor.fetchall() logging.info(f"成功执行查询: {sql}") return results except pymysql.MySQLError as e: logging.error(f"查询失败: {e} -> SQL: {sql}") raise def execute_update(self, sql: str, params: tuple = None) -> int: """ 执行 INSERT/UPDATE/DELETE 操作。 :param sql: 执行的 SQL 语句 :param params: 可选的 SQL 参数 :return: 受影响的行数 """ self._connect() try: with self.conn.cursor() as cursor: cursor.execute(sql, params) self.conn.commit() logging.info(f"成功执行更新: {sql}") return cursor.rowcount except pymysql.MySQLError as e: self.conn.rollback() logging.error(f"更新失败: {e} -> SQL: {sql}") raise def execute_many(self, sql: str, param_list: list[tuple]) -> int: """ 批量执行 SQL 语句。 :param sql: 执行的 SQL 语句 :param param_list: 参数列表,每个元素是参数元组 :return: 受影响的总行数 """ self._connect() try: with self.conn.cursor() as cursor: rowcount = cursor.executemany(sql, param_list) self.conn.commit() logging.info(f"成功批量执行 SQL,共影响 {rowcount} 行。") return rowcount except pymysql.MySQLError as e: self.conn.rollback() logging.error(f"批量操作失败: {e} -> SQL: {sql}") raise def close(self): """关闭数据库连接。""" if self.conn and self.conn.open: self.conn.close() logging.info("数据库连接已关闭。") def __del__(self): """析构时确保连接关闭。""" self.close()