async_mysql_client.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. """
  2. AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版
  3. - 自动管理连接池,避免重复初始化和内存泄漏
  4. - 支持 async with 上下文管理
  5. - 高并发协程安全
  6. """
  7. import traceback
  8. from typing import List, Dict, Any, Optional, Tuple
  9. import asyncio
  10. import asyncmy
  11. from core.utils.log.logger_manager import LoggerManager
  12. class AsyncMySQLClient:
  13. # 类变量:同配置单例管理连接池实例
  14. _instances: Dict[Tuple, "AsyncMySQLClient"] = {}
  15. def __new__(cls,
  16. host: str,
  17. port: int,
  18. user: str,
  19. password: str,
  20. db: str,
  21. charset: str,
  22. minsize: int = 1,
  23. maxsize: int = 5,
  24. **kwargs):
  25. """
  26. 单例模式:同配置共享同一连接池实例
  27. """
  28. key = (host, port, user, db)
  29. if key not in cls._instances:
  30. instance = super().__new__(cls)
  31. cls._instances[key] = instance
  32. return cls._instances[key]
  33. def __init__(self,
  34. host: str,
  35. port: int,
  36. user: str,
  37. password: str,
  38. db: str,
  39. charset: str,
  40. minsize: int = 1,
  41. maxsize: int = 5,
  42. logger: Optional[LoggerManager] = None,
  43. aliyun_logr:Optional[LoggerManager] = None,):
  44. """
  45. 初始化配置,延迟创建连接池
  46. """
  47. self._db_settings = {
  48. "host": host,
  49. "port": port,
  50. "user": user,
  51. "password": password,
  52. "db": db,
  53. "autocommit": True,
  54. "charset": charset,
  55. "connect_timeout": 5,
  56. }
  57. self._minsize = minsize
  58. self._maxsize = maxsize
  59. self._pool: Optional[asyncmy.Pool] = None
  60. self._lock = asyncio.Lock() # 防止并发初始化
  61. self.logger = logger
  62. self.aliyun_logger = aliyun_logr
  63. async def __aenter__(self):
  64. """支持 async with 自动初始化连接池"""
  65. await self.init_pool()
  66. return self
  67. async def __aexit__(self, exc_type, exc_val, exc_tb):
  68. """支持 async with 自动关闭连接池"""
  69. await self.close()
  70. async def init_pool(self):
  71. """
  72. 初始化连接池(懒加载 + 并发锁保护)
  73. """
  74. if self._pool:
  75. return
  76. async with self._lock:
  77. if self._pool:
  78. return
  79. try:
  80. self._pool = await asyncmy.create_pool(
  81. **self._db_settings,
  82. minsize=self._minsize,
  83. maxsize=self._maxsize,
  84. )
  85. except Exception as e:
  86. msg = f"[AsyncMySQLClient] 连接池初始化失败: {e} \n {traceback.format_exc()}"
  87. self.logger.error(msg)
  88. self.aliyun_logger.logging(code="9001", message=msg, data={f"error: f{traceback.format_exc()} \n {e}"})
  89. raise
  90. async def close(self):
  91. """
  92. 关闭连接池
  93. """
  94. if self._pool:
  95. self._pool.close()
  96. await self._pool.wait_closed()
  97. self._pool = None
  98. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  99. """
  100. 查询多行,返回字典列表
  101. """
  102. await self.init_pool()
  103. try:
  104. async with self._pool.acquire() as conn:
  105. async with conn.cursor() as cur:
  106. await cur.execute(sql, params or [])
  107. rows = await cur.fetchall()
  108. columns = [desc[0] for desc in cur.description]
  109. result = [dict(zip(columns, row)) for row in rows]
  110. return result
  111. except Exception as e:
  112. msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}"
  113. self.logger.error(msg)
  114. self.aliyun_logger.logging(code="9002", message=msg)
  115. raise
  116. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  117. """
  118. 查询单行,返回字典
  119. """
  120. await self.init_pool()
  121. try:
  122. async with self._pool.acquire() as conn:
  123. async with conn.cursor() as cur:
  124. await cur.execute(sql, params or [])
  125. row = await cur.fetchone()
  126. if row is None:
  127. return None
  128. columns = [desc[0] for desc in cur.description]
  129. result = dict(zip(columns, row))
  130. return result
  131. except Exception as e:
  132. msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}"
  133. self.logger.error(msg)
  134. self.aliyun_logger.logging(code="9003", message=msg)
  135. raise
  136. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  137. """
  138. 执行单条写操作
  139. """
  140. await self.init_pool()
  141. try:
  142. async with self._pool.acquire() as conn:
  143. async with conn.cursor() as cur:
  144. await cur.execute(sql, params or [])
  145. return cur.rowcount
  146. except Exception as e:
  147. msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}"
  148. self.logger.error(msg)
  149. self.aliyun_logger.logging(code="9004", message=msg)
  150. raise
  151. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  152. """
  153. 批量执行写操作
  154. """
  155. await self.init_pool()
  156. try:
  157. async with self._pool.acquire() as conn:
  158. async with conn.cursor() as cur:
  159. await cur.executemany(sql, params_list)
  160. return cur.rowcount
  161. except Exception as e:
  162. msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}"
  163. self.logger.error(msg)
  164. self.aliyun_logger.logging(code="9005", message=msg)
  165. raise