""" AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版 - 自动管理连接池,避免重复初始化和内存泄漏 - 支持 async with 上下文管理 - 高并发协程安全 """ import traceback from typing import List, Dict, Any, Optional, Tuple import asyncio import asyncmy from core.utils.log.logger_manager import LoggerManager class AsyncMySQLClient: # 类变量:同配置单例管理连接池实例 _instances: Dict[Tuple, "AsyncMySQLClient"] = {} def __new__(cls, host: str, port: int, user: str, password: str, db: str, charset: str, minsize: int = 1, maxsize: int = 5, **kwargs): """ 单例模式:同配置共享同一连接池实例 """ key = (host, port, user, db) if key not in cls._instances: instance = super().__new__(cls) cls._instances[key] = instance return cls._instances[key] def __init__(self, host: str, port: int, user: str, password: str, db: str, charset: str, minsize: int = 1, maxsize: int = 5, logger: Optional[LoggerManager] = None, aliyun_logr:Optional[LoggerManager] = None,): """ 初始化配置,延迟创建连接池 """ self._db_settings = { "host": host, "port": port, "user": user, "password": password, "db": db, "autocommit": True, "charset": charset, "connect_timeout": 5, } self._minsize = minsize self._maxsize = maxsize self._pool: Optional[asyncmy.Pool] = None self._lock = asyncio.Lock() # 防止并发初始化 self.logger = logger self.aliyun_logger = aliyun_logr async def __aenter__(self): """支持 async with 自动初始化连接池""" await self.init_pool() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """支持 async with 自动关闭连接池""" await self.close() async def init_pool(self): """ 初始化连接池(懒加载 + 并发锁保护) """ if self._pool: return async with self._lock: if self._pool: return try: self._pool = await asyncmy.create_pool( **self._db_settings, minsize=self._minsize, maxsize=self._maxsize, ) except Exception as e: msg = f"[AsyncMySQLClient] 连接池初始化失败: {e} \n {traceback.format_exc()}" self.logger.error(msg) self.aliyun_logger.logging(code="9001", message=msg, data={f"error: f{traceback.format_exc()} \n {e}"}) raise async def close(self): """ 关闭连接池 """ if self._pool: self._pool.close() await self._pool.wait_closed() self._pool = None async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]: """ 查询多行,返回字典列表 """ await self.init_pool() try: async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, params or []) rows = await cur.fetchall() columns = [desc[0] for desc in cur.description] result = [dict(zip(columns, row)) for row in rows] return result except Exception as e: msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}" self.logger.error(msg) self.aliyun_logger.logging(code="9002", message=msg) raise async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]: """ 查询单行,返回字典 """ await self.init_pool() try: async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, params or []) row = await cur.fetchone() if row is None: return None columns = [desc[0] for desc in cur.description] result = dict(zip(columns, row)) return result except Exception as e: msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}" self.logger.error(msg) self.aliyun_logger.logging(code="9003", message=msg) raise async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int: """ 执行单条写操作 """ await self.init_pool() try: async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, params or []) return cur.rowcount except Exception as e: msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}" self.logger.error(msg) self.aliyun_logger.logging(code="9004", message=msg) raise async def executemany(self, sql: str, params_list: List[List[Any]]) -> int: """ 批量执行写操作 """ await self.init_pool() try: async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.executemany(sql, params_list) return cur.rowcount except Exception as e: msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}" self.logger.error(msg) self.aliyun_logger.logging(code="9005", message=msg) raise