123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- """
- AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版
- - 使用 LoggerManager 管理本地及阿里云日志,便于定位问题
- - 自动管理连接池,避免重复初始化和内存泄漏
- - 支持 async with 上下文管理
- - 高并发协程安全
- """
- from typing import List, Dict, Any, Optional, Tuple
- import asyncio
- import asyncmy
- from core.utils.log.logger_manager import LoggerManager
- class AsyncMySQLClient:
- # 类变量:同配置单例管理连接池实例
- _instances: Dict[Tuple, "AsyncMySQLClient"] = {}
- def __new__(cls,
- host: str,
- port: int,
- user: str,
- password: str,
- db: str,
- charset: str,
- minsize: int = 1,
- maxsize: int = 5):
- """
- 单例模式:同配置共享同一连接池实例
- """
- key = (host, port, user, db)
- if key not in cls._instances:
- instance = super().__new__(cls)
- cls._instances[key] = instance
- return cls._instances[key]
- def __init__(self,
- host: str,
- port: int,
- user: str,
- password: str,
- db: str,
- charset: str,
- minsize: int = 1,
- maxsize: int = 5):
- """
- 初始化配置,延迟创建连接池
- """
- self._db_settings = {
- "host": host,
- "port": port,
- "user": user,
- "password": password,
- "db": db,
- "autocommit": True,
- "charset": charset,
- "connect_timeout": 5,
- }
- self._minsize = minsize
- self._maxsize = maxsize
- self._pool: Optional[asyncmy.Pool] = None
- self._lock = asyncio.Lock() # 防止并发初始化
- # 引入统一日志体系
- self.logger = LoggerManager.get_logger(platform=db, mode="mysql")
- self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=db, mode="mysql")
- async def __aenter__(self):
- """支持 async with 自动初始化连接池"""
- await self.init_pool()
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- """支持 async with 自动关闭连接池"""
- await self.close()
- async def init_pool(self):
- """
- 初始化连接池(懒加载 + 并发锁保护)
- """
- if self._pool:
- return
- async with self._lock:
- if self._pool:
- return
- try:
- self._pool = await asyncmy.create_pool(
- **self._db_settings,
- minsize=self._minsize,
- maxsize=self._maxsize,
- )
- msg = f"[AsyncMySQLClient] 连接池初始化成功: {self._db_settings['host']}:{self._db_settings['db']}"
- self.logger.info(msg)
- self.aliyun_logger.logging(code="2000", message=msg)
- except Exception as e:
- msg = f"[AsyncMySQLClient] 连接池初始化失败: {e}"
- self.logger.error(msg)
- self.aliyun_logger.logging(code="9001", message=msg, data={"error": str(e)})
- raise
- async def close(self):
- """
- 关闭连接池
- """
- if self._pool:
- self._pool.close()
- await self._pool.wait_closed()
- msg = "[AsyncMySQLClient] 连接池已关闭"
- self.logger.info(msg)
- self.aliyun_logger.logging(code="2001", message=msg)
- self._pool = None
- async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
- """
- 查询多行,返回字典列表
- """
- await self.init_pool()
- try:
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(sql, params or [])
- rows = await cur.fetchall()
- columns = [desc[0] for desc in cur.description]
- result = [dict(zip(columns, row)) for row in rows]
- self.logger.info(f"[AsyncMySQLClient] fetch_all 执行成功: {sql}")
- return result
- except Exception as e:
- msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}"
- self.logger.error(msg)
- self.aliyun_logger.logging(code="9002", message=msg)
- raise
- async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
- """
- 查询单行,返回字典
- """
- await self.init_pool()
- try:
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(sql, params or [])
- row = await cur.fetchone()
- if row is None:
- return None
- columns = [desc[0] for desc in cur.description]
- result = dict(zip(columns, row))
- self.logger.info(f"[AsyncMySQLClient] fetch_one 执行成功: {sql}")
- return result
- except Exception as e:
- msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}"
- self.logger.error(msg)
- self.aliyun_logger.logging(code="9003", message=msg)
- raise
- async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
- """
- 执行单条写操作
- """
- await self.init_pool()
- try:
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(sql, params or [])
- self.logger.info(f"[AsyncMySQLClient] execute 执行成功: {sql}")
- return cur.rowcount
- except Exception as e:
- msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}"
- self.logger.error(msg)
- self.aliyun_logger.logging(code="9004", message=msg)
- raise
- async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
- """
- 批量执行写操作
- """
- await self.init_pool()
- try:
- async with self._pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.executemany(sql, params_list)
- self.logger.info(f"[AsyncMySQLClient] executemany 执行成功: {sql}")
- return cur.rowcount
- except Exception as e:
- msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}"
- self.logger.error(msg)
- self.aliyun_logger.logging(code="9005", message=msg)
- raise
|