mysql_pool.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import pymysql
  2. import threading
  3. import time
  4. from queue import Queue, Empty, Full
  5. from contextlib import contextmanager
  6. from typing import Dict
  7. from loguru import logger
  8. from .db_config import db_config
  9. from .mysql_exceptions import MySQLConnectionError, MySQLPoolError
  10. class MySQLConnectionPool:
  11. """MySQL连接池管理类"""
  12. def __init__(self, min_connections: int = 5, max_connections: int = 20,
  13. max_idle_time: int = 300, check_interval: int = 60):
  14. """
  15. 初始化连接池
  16. Args:
  17. min_connections: 最小连接数
  18. max_connections: 最大连接数
  19. max_idle_time: 最大空闲时间(秒)
  20. check_interval: 连接检查间隔(秒)
  21. """
  22. self.min_connections = min_connections
  23. self.max_connections = max_connections
  24. self.max_idle_time = max_idle_time
  25. self.check_interval = check_interval
  26. self._pool = Queue(maxsize=max_connections)
  27. self._active_connections = 0
  28. self._lock = threading.RLock()
  29. self._connection_params = db_config.get_connection_params()
  30. # 添加DictCursor支持
  31. self._connection_params['cursorclass'] = pymysql.cursors.DictCursor
  32. # 初始化连接池
  33. self._initialize_pool()
  34. # 启动连接检查线程
  35. self._check_thread = threading.Thread(target=self._check_connections, daemon=True)
  36. self._check_thread.start()
  37. def _create_connection(self) -> pymysql.Connection:
  38. """创建新的数据库连接"""
  39. try:
  40. connection = pymysql.connect(**self._connection_params)
  41. connection.ping(reconnect=True)
  42. # 记录连接创建时间
  43. connection._created_time = time.time()
  44. connection._last_used_time = time.time()
  45. return connection
  46. except Exception as e:
  47. error_msg = f"创建数据库连接失败: {e}"
  48. logger.error(error_msg)
  49. raise MySQLConnectionError(error_msg, original_error=e)
  50. def _initialize_pool(self):
  51. """初始化连接池"""
  52. with self._lock:
  53. for _ in range(self.min_connections):
  54. try:
  55. connection = self._create_connection()
  56. self._pool.put(connection, block=False)
  57. self._active_connections += 1
  58. except Exception as e:
  59. error_msg = f"初始化连接池失败: {e}"
  60. logger.error(error_msg)
  61. raise MySQLPoolError(error_msg, original_error=e)
  62. def get_connection(self, timeout: int = 30) -> pymysql.Connection:
  63. """从连接池获取连接"""
  64. try:
  65. # 尝试从池中获取连接
  66. connection = self._pool.get(timeout=timeout)
  67. # 检查连接是否有效
  68. try:
  69. connection.ping(reconnect=True)
  70. connection._last_used_time = time.time()
  71. return connection
  72. except:
  73. # 连接无效,创建新连接
  74. with self._lock:
  75. self._active_connections -= 1
  76. return self._create_new_connection()
  77. except Empty:
  78. # 池中无可用连接,尝试创建新连接
  79. return self._create_new_connection()
  80. def _create_new_connection(self) -> pymysql.Connection:
  81. """创建新连接(当池中无可用连接时)"""
  82. with self._lock:
  83. if self._active_connections < self.max_connections:
  84. connection = self._create_connection()
  85. self._active_connections += 1
  86. return connection
  87. else:
  88. error_msg = "连接池已达到最大连接数限制"
  89. logger.error(error_msg)
  90. raise MySQLPoolError(error_msg)
  91. def return_connection(self, connection: pymysql.Connection):
  92. """将连接返回到连接池"""
  93. if connection is None:
  94. return
  95. try:
  96. # 检查连接是否有效
  97. connection.ping(reconnect=True)
  98. connection._last_used_time = time.time()
  99. # 确保连接处于自动提交模式
  100. if not connection.get_autocommit():
  101. connection.rollback()
  102. connection.autocommit(True)
  103. self._pool.put(connection, block=False)
  104. except (Full, Exception) as e:
  105. # 池已满或连接无效,关闭连接
  106. self._close_connection(connection)
  107. def _close_connection(self, connection: pymysql.Connection):
  108. """关闭连接"""
  109. try:
  110. connection.close()
  111. except:
  112. pass
  113. finally:
  114. with self._lock:
  115. self._active_connections -= 1
  116. def _check_connections(self):
  117. """定期检查连接池中的连接"""
  118. while True:
  119. try:
  120. time.sleep(self.check_interval)
  121. current_time = time.time()
  122. connections_to_remove = []
  123. # 检查空闲连接
  124. temp_connections = []
  125. while not self._pool.empty():
  126. try:
  127. connection = self._pool.get_nowait()
  128. # 检查连接是否超时
  129. if (current_time - connection._last_used_time) > self.max_idle_time:
  130. connections_to_remove.append(connection)
  131. else:
  132. temp_connections.append(connection)
  133. except Empty:
  134. break
  135. # 将有效连接放回池中
  136. for connection in temp_connections:
  137. try:
  138. self._pool.put_nowait(connection)
  139. except Full:
  140. connections_to_remove.append(connection)
  141. # 关闭超时连接
  142. for connection in connections_to_remove:
  143. self._close_connection(connection)
  144. # 确保最小连接数
  145. with self._lock:
  146. while (self._active_connections < self.min_connections and
  147. self._active_connections < self.max_connections):
  148. try:
  149. connection = self._create_connection()
  150. self._pool.put_nowait(connection)
  151. self._active_connections += 1
  152. except (Full, Exception):
  153. break
  154. except Exception as e:
  155. logger.error(f"连接池检查出错: {e}")
  156. @contextmanager
  157. def get_connection_context(self):
  158. """上下文管理器方式获取连接"""
  159. connection = None
  160. try:
  161. connection = self.get_connection()
  162. yield connection
  163. finally:
  164. if connection:
  165. self.return_connection(connection)
  166. def close_all(self):
  167. """关闭所有连接"""
  168. while not self._pool.empty():
  169. try:
  170. connection = self._pool.get_nowait()
  171. self._close_connection(connection)
  172. except Empty:
  173. break
  174. def get_pool_status(self) -> Dict[str, int]:
  175. """获取连接池状态"""
  176. with self._lock:
  177. return {
  178. 'active_connections': self._active_connections,
  179. 'pool_size': self._pool.qsize(),
  180. 'max_connections': self.max_connections,
  181. 'min_connections': self.min_connections
  182. }
  183. # 全局连接池实例
  184. mysql_pool = MySQLConnectionPool()