MySQLHelper.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import logging
  2. import pymysql
  3. import pymysql.cursors
  4. class MySQLHelper:
  5. def __init__(self, host, username, password, database, **kwargs):
  6. """
  7. 初始化 MySQLHelper 实例。
  8. :param host: 数据库地址
  9. :param username: 数据库用户名
  10. :param password: 数据库密码
  11. :param database: 数据库名称
  12. :param kwargs: 可选其他数据库参数(如端口、超时等)
  13. """
  14. self.connection_args = {
  15. 'host': host,
  16. 'user': username,
  17. 'password': password,
  18. 'database': database,
  19. 'charset': 'utf8mb4',
  20. 'cursorclass': pymysql.cursors.DictCursor,
  21. }
  22. self.connection_args.update(kwargs)
  23. self.conn = None
  24. self._connect()
  25. logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
  26. def _connect(self):
  27. """建立数据库连接,支持断开后自动重连。"""
  28. try:
  29. if self.conn is None or not self.conn.open:
  30. self.conn = pymysql.connect(**self.connection_args)
  31. logging.info("成功连接到数据库。")
  32. except pymysql.MySQLError as e:
  33. logging.error(f"数据库连接失败: {e}")
  34. raise
  35. def execute_query(self, sql: str, params: tuple = None) -> list[dict]:
  36. """
  37. 执行 SELECT 查询。
  38. :param sql: 查询的 SQL 语句
  39. :param params: 可选的 SQL 参数
  40. :return: 查询结果的列表,每条记录为字典
  41. """
  42. self._connect()
  43. try:
  44. with self.conn.cursor() as cursor:
  45. cursor.execute(sql, params)
  46. results = cursor.fetchall()
  47. logging.info(f"成功执行查询: {sql}")
  48. return results
  49. except pymysql.MySQLError as e:
  50. logging.error(f"查询失败: {e} -> SQL: {sql}")
  51. raise
  52. def execute_update(self, sql: str, params: tuple = None) -> int:
  53. """
  54. 执行 INSERT/UPDATE/DELETE 操作。
  55. :param sql: 执行的 SQL 语句
  56. :param params: 可选的 SQL 参数
  57. :return: 受影响的行数
  58. """
  59. self._connect()
  60. try:
  61. with self.conn.cursor() as cursor:
  62. cursor.execute(sql, params)
  63. self.conn.commit()
  64. logging.info(f"成功执行更新: {sql}")
  65. return cursor.rowcount
  66. except pymysql.MySQLError as e:
  67. self.conn.rollback()
  68. logging.error(f"更新失败: {e} -> SQL: {sql}")
  69. raise
  70. def execute_many(self, sql: str, param_list: list[tuple]) -> int:
  71. """
  72. 批量执行 SQL 语句。
  73. :param sql: 执行的 SQL 语句
  74. :param param_list: 参数列表,每个元素是参数元组
  75. :return: 受影响的总行数
  76. """
  77. self._connect()
  78. try:
  79. with self.conn.cursor() as cursor:
  80. rowcount = cursor.executemany(sql, param_list)
  81. self.conn.commit()
  82. logging.info(f"成功批量执行 SQL,共影响 {rowcount} 行。")
  83. return rowcount
  84. except pymysql.MySQLError as e:
  85. self.conn.rollback()
  86. logging.error(f"批量操作失败: {e} -> SQL: {sql}")
  87. raise
  88. def close(self):
  89. """关闭数据库连接。"""
  90. if self.conn and self.conn.open:
  91. self.conn.close()
  92. logging.info("数据库连接已关闭。")
  93. def __del__(self):
  94. """析构时确保连接关闭。"""
  95. self.close()