MySQLHelper.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import logging
  2. from typing import Dict, List, Any
  3. import pymysql
  4. import pymysql.cursors
  5. class MySQLHelper:
  6. def __init__(self, host, username, password, database, **kwargs):
  7. """
  8. 初始化 MySQLHelper 实例。
  9. :param host: 数据库地址
  10. :param username: 数据库用户名
  11. :param password: 数据库密码
  12. :param database: 数据库名称
  13. :param kwargs: 可选其他数据库参数(如端口、超时等)
  14. """
  15. self.connection_args = {
  16. 'host': host,
  17. 'user': username,
  18. 'password': password,
  19. 'database': database,
  20. 'charset': 'utf8mb4',
  21. 'cursorclass': pymysql.cursors.DictCursor,
  22. }
  23. self.connection_args.update(kwargs)
  24. self.conn = None
  25. self._connect()
  26. logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
  27. def _connect(self):
  28. """建立数据库连接,支持断开后自动重连。"""
  29. try:
  30. if self.conn is None or not self.conn.open:
  31. self.conn = pymysql.connect(**self.connection_args)
  32. logging.info("成功连接到数据库。")
  33. except pymysql.MySQLError as e:
  34. logging.error(f"数据库连接失败: {e}")
  35. raise
  36. def execute_query(self, sql: str, params: tuple = None) -> List[Dict[str, Any]]:
  37. """
  38. 执行 SELECT 查询。
  39. :param sql: 查询的 SQL 语句
  40. :param params: 可选的 SQL 参数
  41. :return: 查询结果的列表,每条记录为字典
  42. """
  43. self._connect()
  44. try:
  45. with self.conn.cursor() as cursor:
  46. cursor.execute(sql, params)
  47. results = cursor.fetchall()
  48. logging.info(f"成功执行查询: {sql}")
  49. return results
  50. except pymysql.MySQLError as e:
  51. logging.error(f"查询失败: {e} -> SQL: {sql}")
  52. raise
  53. def execute_update(self, sql: str, params: tuple = None) -> int:
  54. """
  55. 执行 INSERT/UPDATE/DELETE 操作。
  56. :param sql: 执行的 SQL 语句
  57. :param params: 可选的 SQL 参数
  58. :return: 受影响的行数
  59. """
  60. self._connect()
  61. try:
  62. with self.conn.cursor() as cursor:
  63. cursor.execute(sql, params)
  64. self.conn.commit()
  65. logging.info(f"成功执行更新: {sql}")
  66. return cursor.rowcount
  67. except pymysql.MySQLError as e:
  68. self.conn.rollback()
  69. logging.error(f"更新失败: {e} -> SQL: {sql}")
  70. raise
  71. def execute_many(self, sql: str, param_list: list[tuple]) -> int:
  72. """
  73. 批量执行 SQL 语句。
  74. :param sql: 执行的 SQL 语句
  75. :param param_list: 参数列表,每个元素是参数元组
  76. :return: 受影响的总行数
  77. """
  78. self._connect()
  79. try:
  80. with self.conn.cursor() as cursor:
  81. rowcount = cursor.executemany(sql, param_list)
  82. self.conn.commit()
  83. logging.info(f"成功批量执行 SQL,共影响 {rowcount} 行。")
  84. return rowcount
  85. except pymysql.MySQLError as e:
  86. self.conn.rollback()
  87. logging.error(f"批量操作失败: {e} -> SQL: {sql}")
  88. raise
  89. def close(self):
  90. """关闭数据库连接。"""
  91. if self.conn and self.conn.open:
  92. self.conn.close()
  93. logging.info("数据库连接已关闭。")
  94. def __del__(self):
  95. """析构时确保连接关闭。"""
  96. self.close()