| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- import pymysql
- import threading
- import time
- from queue import Queue, Empty, Full
- from contextlib import contextmanager
- from typing import Dict
- from loguru import logger
- from .db_config import db_config
- from .mysql_exceptions import MySQLConnectionError, MySQLPoolError
- class MySQLConnectionPool:
- """MySQL连接池管理类"""
- def __init__(self, min_connections: int = 5, max_connections: int = 20,
- max_idle_time: int = 300, check_interval: int = 60):
- """
- 初始化连接池
- Args:
- min_connections: 最小连接数
- max_connections: 最大连接数
- max_idle_time: 最大空闲时间(秒)
- check_interval: 连接检查间隔(秒)
- """
- self.min_connections = min_connections
- self.max_connections = max_connections
- self.max_idle_time = max_idle_time
- self.check_interval = check_interval
- self._pool = Queue(maxsize=max_connections)
- self._active_connections = 0
- self._lock = threading.RLock()
- self._connection_params = db_config.get_connection_params()
- # 添加DictCursor支持
- self._connection_params['cursorclass'] = pymysql.cursors.DictCursor
- # 初始化连接池
- self._initialize_pool()
- # 启动连接检查线程
- self._check_thread = threading.Thread(target=self._check_connections, daemon=True)
- self._check_thread.start()
- def _create_connection(self) -> pymysql.Connection:
- """创建新的数据库连接"""
- try:
- connection = pymysql.connect(**self._connection_params)
- connection.ping(reconnect=True)
- # 记录连接创建时间
- connection._created_time = time.time()
- connection._last_used_time = time.time()
- return connection
- except Exception as e:
- error_msg = f"创建数据库连接失败: {e}"
- logger.error(error_msg)
- raise MySQLConnectionError(error_msg, original_error=e)
- def _initialize_pool(self):
- """初始化连接池"""
- with self._lock:
- for _ in range(self.min_connections):
- try:
- connection = self._create_connection()
- self._pool.put(connection, block=False)
- self._active_connections += 1
- except Exception as e:
- error_msg = f"初始化连接池失败: {e}"
- logger.error(error_msg)
- raise MySQLPoolError(error_msg, original_error=e)
- def get_connection(self, timeout: int = 30) -> pymysql.Connection:
- """从连接池获取连接"""
- try:
- # 尝试从池中获取连接
- connection = self._pool.get(timeout=timeout)
- # 检查连接是否有效
- try:
- connection.ping(reconnect=True)
- connection._last_used_time = time.time()
- return connection
- except:
- # 连接无效,创建新连接
- with self._lock:
- self._active_connections -= 1
- return self._create_new_connection()
- except Empty:
- # 池中无可用连接,尝试创建新连接
- return self._create_new_connection()
- def _create_new_connection(self) -> pymysql.Connection:
- """创建新连接(当池中无可用连接时)"""
- with self._lock:
- if self._active_connections < self.max_connections:
- connection = self._create_connection()
- self._active_connections += 1
- return connection
- else:
- error_msg = "连接池已达到最大连接数限制"
- logger.error(error_msg)
- raise MySQLPoolError(error_msg)
- def return_connection(self, connection: pymysql.Connection):
- """将连接返回到连接池"""
- if connection is None:
- return
- try:
- # 检查连接是否有效
- connection.ping(reconnect=True)
- connection._last_used_time = time.time()
- # 确保连接处于自动提交模式
- if not connection.get_autocommit():
- connection.rollback()
- connection.autocommit(True)
- self._pool.put(connection, block=False)
- except (Full, Exception) as e:
- # 池已满或连接无效,关闭连接
- self._close_connection(connection)
- def _close_connection(self, connection: pymysql.Connection):
- """关闭连接"""
- try:
- connection.close()
- except:
- pass
- finally:
- with self._lock:
- self._active_connections -= 1
- def _check_connections(self):
- """定期检查连接池中的连接"""
- while True:
- try:
- time.sleep(self.check_interval)
- current_time = time.time()
- connections_to_remove = []
- # 检查空闲连接
- temp_connections = []
- while not self._pool.empty():
- try:
- connection = self._pool.get_nowait()
- # 检查连接是否超时
- if (current_time - connection._last_used_time) > self.max_idle_time:
- connections_to_remove.append(connection)
- else:
- temp_connections.append(connection)
- except Empty:
- break
- # 将有效连接放回池中
- for connection in temp_connections:
- try:
- self._pool.put_nowait(connection)
- except Full:
- connections_to_remove.append(connection)
- # 关闭超时连接
- for connection in connections_to_remove:
- self._close_connection(connection)
- # 确保最小连接数
- with self._lock:
- while (self._active_connections < self.min_connections and
- self._active_connections < self.max_connections):
- try:
- connection = self._create_connection()
- self._pool.put_nowait(connection)
- self._active_connections += 1
- except (Full, Exception):
- break
- except Exception as e:
- logger.error(f"连接池检查出错: {e}")
- @contextmanager
- def get_connection_context(self):
- """上下文管理器方式获取连接"""
- connection = None
- try:
- connection = self.get_connection()
- yield connection
- finally:
- if connection:
- self.return_connection(connection)
- def close_all(self):
- """关闭所有连接"""
- while not self._pool.empty():
- try:
- connection = self._pool.get_nowait()
- self._close_connection(connection)
- except Empty:
- break
- def get_pool_status(self) -> Dict[str, int]:
- """获取连接池状态"""
- with self._lock:
- return {
- 'active_connections': self._active_connections,
- 'pool_size': self._pool.qsize(),
- 'max_connections': self.max_connections,
- 'min_connections': self.min_connections
- }
- # 全局连接池实例
- mysql_pool = MySQLConnectionPool()
|