""" 文件功能: 异步 MySQL 客户端封装(基于 asyncmy): - 自动管理连接池 - 支持 async with 上下文管理 - 提供常见方法:fetch_all, fetch_one, execute, executemany - 内部单例复用,避免重复创建连接池 适用场景: - 高并发异步任务系统 - 通用业务数据库访问组件 """ import asyncmy from typing import List, Dict, Any, Optional class AsyncMySQLClient: """ 通用异步 MySQL 客户端,基于 asyncmy 实现 """ # 类变量用于单例连接池 _instance: Optional["AsyncMySQLClient"] = None def __new__(cls, *args, **kwargs): """ 单例模式,确保同一配置只创建一个连接池实例 """ if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__( self, host: str, port: int, user: str, password: str, db: str, charset: str, minsize: int = 1, maxsize: int = 5, ): self._db_settings = { "host": host, "port": port, "user": user, "password": password, "db": db, "autocommit": True, "charset": charset, } self._minsize = minsize self._maxsize = maxsize self._pool: Optional[asyncmy.Pool] = None async def __aenter__(self): """支持 async with 上下文初始化连接池""" await self.init_pool() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """支持 async with 自动关闭连接池""" await self.close() async def init_pool(self): """ 初始化连接池(如未初始化) """ if not self._pool: self._pool = await asyncmy.create_pool( **self._db_settings, minsize=self._minsize, maxsize=self._maxsize, ) async def close(self): """ 关闭连接池 """ if self._pool: self._pool.close() await self._pool.wait_closed() self._pool = None async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]: """ 查询多行数据,返回字典列表 """ async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, params or []) rows = await cur.fetchall() columns = [desc[0] for desc in cur.description] # 获取字段名列表 # 转换每一行为字典 result = [dict(zip(columns, row)) for row in rows] return result async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]: """ 查询单行数据,返回字典 """ async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, params or []) row = await cur.fetchone() if row is None: return None columns = [desc[0] for desc in cur.description] return dict(zip(columns, row)) async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int: """ 执行单条写操作(insert/update/delete) :param sql: SQL 语句 :param params: 参数列表 :return: 影响行数 """ async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql, params or []) return cur.rowcount async def executemany(self, sql: str, params_list: List[List[Any]]) -> int: """ 批量执行写操作 :param sql: SQL 语句 :param params_list: 多组参数 :return: 总影响行数 """ async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.executemany(sql, params_list) return cur.rowcount