123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import logging
- import pymysql
- import pymysql.cursors
- class MySQLHelper:
- def __init__(self, host, username, password, database, **kwargs):
- """
- 初始化 MySQLHelper 实例。
- :param host: 数据库地址
- :param username: 数据库用户名
- :param password: 数据库密码
- :param database: 数据库名称
- :param kwargs: 可选其他数据库参数(如端口、超时等)
- """
- self.connection_args = {
- 'host': host,
- 'user': username,
- 'password': password,
- 'database': database,
- 'charset': 'utf8mb4',
- 'cursorclass': pymysql.cursors.DictCursor,
- }
- self.connection_args.update(kwargs)
- self.conn = None
- self._connect()
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
- def _connect(self):
- """建立数据库连接,支持断开后自动重连。"""
- try:
- if self.conn is None or not self.conn.open:
- self.conn = pymysql.connect(**self.connection_args)
- logging.info("成功连接到数据库。")
- except pymysql.MySQLError as e:
- logging.error(f"数据库连接失败: {e}")
- raise
- def execute_query(self, sql: str, params: tuple = None) -> list[dict]:
- """
- 执行 SELECT 查询。
- :param sql: 查询的 SQL 语句
- :param params: 可选的 SQL 参数
- :return: 查询结果的列表,每条记录为字典
- """
- self._connect()
- try:
- with self.conn.cursor() as cursor:
- cursor.execute(sql, params)
- results = cursor.fetchall()
- logging.info(f"成功执行查询: {sql}")
- return results
- except pymysql.MySQLError as e:
- logging.error(f"查询失败: {e} -> SQL: {sql}")
- raise
- def execute_update(self, sql: str, params: tuple = None) -> int:
- """
- 执行 INSERT/UPDATE/DELETE 操作。
- :param sql: 执行的 SQL 语句
- :param params: 可选的 SQL 参数
- :return: 受影响的行数
- """
- self._connect()
- try:
- with self.conn.cursor() as cursor:
- cursor.execute(sql, params)
- self.conn.commit()
- logging.info(f"成功执行更新: {sql}")
- return cursor.rowcount
- except pymysql.MySQLError as e:
- self.conn.rollback()
- logging.error(f"更新失败: {e} -> SQL: {sql}")
- raise
- def execute_many(self, sql: str, param_list: list[tuple]) -> int:
- """
- 批量执行 SQL 语句。
- :param sql: 执行的 SQL 语句
- :param param_list: 参数列表,每个元素是参数元组
- :return: 受影响的总行数
- """
- self._connect()
- try:
- with self.conn.cursor() as cursor:
- rowcount = cursor.executemany(sql, param_list)
- self.conn.commit()
- logging.info(f"成功批量执行 SQL,共影响 {rowcount} 行。")
- return rowcount
- except pymysql.MySQLError as e:
- self.conn.rollback()
- logging.error(f"批量操作失败: {e} -> SQL: {sql}")
- raise
- def close(self):
- """关闭数据库连接。"""
- if self.conn and self.conn.open:
- self.conn.close()
- logging.info("数据库连接已关闭。")
- def __del__(self):
- """析构时确保连接关闭。"""
- self.close()
|