import pymysql import threading import time from queue import Queue, Empty, Full from contextlib import contextmanager from typing import Dict from loguru import logger from .db_config import db_config from .mysql_exceptions import MySQLConnectionError, MySQLPoolError class MySQLConnectionPool: """MySQL连接池管理类""" def __init__(self, min_connections: int = 5, max_connections: int = 20, max_idle_time: int = 300, check_interval: int = 60): """ 初始化连接池 Args: min_connections: 最小连接数 max_connections: 最大连接数 max_idle_time: 最大空闲时间(秒) check_interval: 连接检查间隔(秒) """ self.min_connections = min_connections self.max_connections = max_connections self.max_idle_time = max_idle_time self.check_interval = check_interval self._pool = Queue(maxsize=max_connections) self._active_connections = 0 self._lock = threading.RLock() self._connection_params = db_config.get_connection_params() # 添加DictCursor支持 self._connection_params['cursorclass'] = pymysql.cursors.DictCursor # 初始化连接池 self._initialize_pool() # 启动连接检查线程 self._check_thread = threading.Thread(target=self._check_connections, daemon=True) self._check_thread.start() def _create_connection(self) -> pymysql.Connection: """创建新的数据库连接""" try: connection = pymysql.connect(**self._connection_params) connection.ping(reconnect=True) # 记录连接创建时间 connection._created_time = time.time() connection._last_used_time = time.time() return connection except Exception as e: error_msg = f"创建数据库连接失败: {e}" logger.error(error_msg) raise MySQLConnectionError(error_msg, original_error=e) def _initialize_pool(self): """初始化连接池""" with self._lock: for _ in range(self.min_connections): try: connection = self._create_connection() self._pool.put(connection, block=False) self._active_connections += 1 except Exception as e: error_msg = f"初始化连接池失败: {e}" logger.error(error_msg) raise MySQLPoolError(error_msg, original_error=e) def get_connection(self, timeout: int = 30) -> pymysql.Connection: """从连接池获取连接""" try: # 尝试从池中获取连接 connection = self._pool.get(timeout=timeout) # 检查连接是否有效 try: connection.ping(reconnect=True) connection._last_used_time = time.time() return connection except: # 连接无效,创建新连接 with self._lock: self._active_connections -= 1 return self._create_new_connection() except Empty: # 池中无可用连接,尝试创建新连接 return self._create_new_connection() def _create_new_connection(self) -> pymysql.Connection: """创建新连接(当池中无可用连接时)""" with self._lock: if self._active_connections < self.max_connections: connection = self._create_connection() self._active_connections += 1 return connection else: error_msg = "连接池已达到最大连接数限制" logger.error(error_msg) raise MySQLPoolError(error_msg) def return_connection(self, connection: pymysql.Connection): """将连接返回到连接池""" if connection is None: return try: # 检查连接是否有效 connection.ping(reconnect=True) connection._last_used_time = time.time() # 确保连接处于自动提交模式 if not connection.get_autocommit(): connection.rollback() connection.autocommit(True) self._pool.put(connection, block=False) except (Full, Exception) as e: # 池已满或连接无效,关闭连接 self._close_connection(connection) def _close_connection(self, connection: pymysql.Connection): """关闭连接""" try: connection.close() except: pass finally: with self._lock: self._active_connections -= 1 def _check_connections(self): """定期检查连接池中的连接""" while True: try: time.sleep(self.check_interval) current_time = time.time() connections_to_remove = [] # 检查空闲连接 temp_connections = [] while not self._pool.empty(): try: connection = self._pool.get_nowait() # 检查连接是否超时 if (current_time - connection._last_used_time) > self.max_idle_time: connections_to_remove.append(connection) else: temp_connections.append(connection) except Empty: break # 将有效连接放回池中 for connection in temp_connections: try: self._pool.put_nowait(connection) except Full: connections_to_remove.append(connection) # 关闭超时连接 for connection in connections_to_remove: self._close_connection(connection) # 确保最小连接数 with self._lock: while (self._active_connections < self.min_connections and self._active_connections < self.max_connections): try: connection = self._create_connection() self._pool.put_nowait(connection) self._active_connections += 1 except (Full, Exception): break except Exception as e: logger.error(f"连接池检查出错: {e}") @contextmanager def get_connection_context(self): """上下文管理器方式获取连接""" connection = None try: connection = self.get_connection() yield connection finally: if connection: self.return_connection(connection) def close_all(self): """关闭所有连接""" while not self._pool.empty(): try: connection = self._pool.get_nowait() self._close_connection(connection) except Empty: break def get_pool_status(self) -> Dict[str, int]: """获取连接池状态""" with self._lock: return { 'active_connections': self._active_connections, 'pool_size': self._pool.qsize(), 'max_connections': self.max_connections, 'min_connections': self.min_connections } # 全局连接池实例 mysql_pool = MySQLConnectionPool()