123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- """
- 文件功能:
- 异步 MySQL 客户端封装(基于 asyncmy):
- - 自动管理连接池
- - 支持 async with 上下文管理
- - 提供常见方法:fetch_all, fetch_one, execute, executemany
- - 内部单例复用,避免重复创建连接池
- 适用场景:
- - 高并发异步任务系统
- - 通用业务数据库访问组件
- """
- import asyncmy
- from typing import List, Dict, Any, Optional
- class AsyncMySQLClient:
- """
- 通用异步 MySQL 客户端,基于 asyncmy 实现
- """
- # 类变量用于单例连接池
- _instance: Optional["AsyncMySQLClient"] = None
- def __new__(cls, *args, **kwargs):
- """
- 单例模式,确保同一配置只创建一个连接池实例
- """
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- return cls._instance
- def __init__(
- self,
- host: str,
- port: int,
- user: str,
- password: str,
- db: str,
- charset: str,
- minsize: int = 1,
- maxsize: int = 5,
- ):
- self._db_settings = {
- "host": host,
- "port": port,
- "user": user,
- "password": password,
- "db": db,
- "autocommit": True,
- "charset": charset,
- }
- self._minsize = minsize
- self._maxsize = maxsize
- self._pool: Optional[asyncmy.Pool] = None
- async def __aenter__(self):
- """支持 async with 上下文初始化连接池"""
- await self.init_pool()
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- """支持 async with 自动关闭连接池"""
- await self.close()
- async def init_pool(self):
- """
- 初始化连接池(如未初始化)
- """
- if not self._pool:
- self._pool = await asyncmy.create_pool(
- **self._db_settings,
- minsize=self._minsize,
- maxsize=self._maxsize,
- )
- async def close(self):
- """
- 关闭连接池
- """
- if self._pool:
- self._pool.close()
- await self._pool.wait_closed()
- self._pool = None
- async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
- """
- 查询多行数据,返回字典列表
- """
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(sql, params or [])
- rows = await cur.fetchall()
- columns = [desc[0] for desc in cur.description] # 获取字段名列表
- # 转换每一行为字典
- result = [dict(zip(columns, row)) for row in rows]
- return result
- async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
- """
- 查询单行数据,返回字典
- """
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(sql, params or [])
- row = await cur.fetchone()
- if row is None:
- return None
- columns = [desc[0] for desc in cur.description]
- return dict(zip(columns, row))
- async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
- """
- 执行单条写操作(insert/update/delete)
- :param sql: SQL 语句
- :param params: 参数列表
- :return: 影响行数
- """
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(sql, params or [])
- return cur.rowcount
- async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
- """
- 批量执行写操作
- :param sql: SQL 语句
- :param params_list: 多组参数
- :return: 总影响行数
- """
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.executemany(sql, params_list)
- return cur.rowcount
|