async_mysql_client.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. """
  2. AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版
  3. - 使用 LoggerManager 管理本地及阿里云日志,便于定位问题
  4. - 自动管理连接池,避免重复初始化和内存泄漏
  5. - 支持 async with 上下文管理
  6. - 高并发协程安全
  7. """
  8. from typing import List, Dict, Any, Optional, Tuple
  9. import asyncio
  10. import asyncmy
  11. from core.utils.log.logger_manager import LoggerManager
  12. class AsyncMySQLClient:
  13. # 类变量:同配置单例管理连接池实例
  14. _instances: Dict[Tuple, "AsyncMySQLClient"] = {}
  15. def __new__(cls,
  16. host: str,
  17. port: int,
  18. user: str,
  19. password: str,
  20. db: str,
  21. charset: str,
  22. minsize: int = 1,
  23. maxsize: int = 5):
  24. """
  25. 单例模式:同配置共享同一连接池实例
  26. """
  27. key = (host, port, user, db)
  28. if key not in cls._instances:
  29. instance = super().__new__(cls)
  30. cls._instances[key] = instance
  31. return cls._instances[key]
  32. def __init__(self,
  33. host: str,
  34. port: int,
  35. user: str,
  36. password: str,
  37. db: str,
  38. charset: str,
  39. minsize: int = 1,
  40. maxsize: int = 5):
  41. """
  42. 初始化配置,延迟创建连接池
  43. """
  44. self._db_settings = {
  45. "host": host,
  46. "port": port,
  47. "user": user,
  48. "password": password,
  49. "db": db,
  50. "autocommit": True,
  51. "charset": charset,
  52. "connect_timeout": 5,
  53. }
  54. self._minsize = minsize
  55. self._maxsize = maxsize
  56. self._pool: Optional[asyncmy.Pool] = None
  57. self._lock = asyncio.Lock() # 防止并发初始化
  58. # 引入统一日志体系
  59. self.logger = LoggerManager.get_logger(platform=db, mode="mysql")
  60. self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=db, mode="mysql")
  61. async def __aenter__(self):
  62. """支持 async with 自动初始化连接池"""
  63. await self.init_pool()
  64. return self
  65. async def __aexit__(self, exc_type, exc_val, exc_tb):
  66. """支持 async with 自动关闭连接池"""
  67. await self.close()
  68. async def init_pool(self):
  69. """
  70. 初始化连接池(懒加载 + 并发锁保护)
  71. """
  72. if self._pool:
  73. return
  74. async with self._lock:
  75. if self._pool:
  76. return
  77. try:
  78. self._pool = await asyncmy.create_pool(
  79. **self._db_settings,
  80. minsize=self._minsize,
  81. maxsize=self._maxsize,
  82. )
  83. msg = f"[AsyncMySQLClient] 连接池初始化成功: {self._db_settings['host']}:{self._db_settings['db']}"
  84. self.logger.info(msg)
  85. self.aliyun_logger.logging(code="2000", message=msg)
  86. except Exception as e:
  87. msg = f"[AsyncMySQLClient] 连接池初始化失败: {e}"
  88. self.logger.error(msg)
  89. self.aliyun_logger.logging(code="9001", message=msg, data={"error": str(e)})
  90. raise
  91. async def close(self):
  92. """
  93. 关闭连接池
  94. """
  95. if self._pool:
  96. self._pool.close()
  97. await self._pool.wait_closed()
  98. msg = "[AsyncMySQLClient] 连接池已关闭"
  99. self.logger.info(msg)
  100. self.aliyun_logger.logging(code="2001", message=msg)
  101. self._pool = None
  102. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  103. """
  104. 查询多行,返回字典列表
  105. """
  106. await self.init_pool()
  107. try:
  108. async with self._pool.acquire() as conn:
  109. async with conn.cursor() as cur:
  110. await cur.execute(sql, params or [])
  111. rows = await cur.fetchall()
  112. columns = [desc[0] for desc in cur.description]
  113. result = [dict(zip(columns, row)) for row in rows]
  114. self.logger.info(f"[AsyncMySQLClient] fetch_all 执行成功: {sql}")
  115. return result
  116. except Exception as e:
  117. msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}"
  118. self.logger.error(msg)
  119. self.aliyun_logger.logging(code="9002", message=msg)
  120. raise
  121. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  122. """
  123. 查询单行,返回字典
  124. """
  125. await self.init_pool()
  126. try:
  127. async with self._pool.acquire() as conn:
  128. async with conn.cursor() as cur:
  129. await cur.execute(sql, params or [])
  130. row = await cur.fetchone()
  131. if row is None:
  132. return None
  133. columns = [desc[0] for desc in cur.description]
  134. result = dict(zip(columns, row))
  135. self.logger.info(f"[AsyncMySQLClient] fetch_one 执行成功: {sql}")
  136. return result
  137. except Exception as e:
  138. msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}"
  139. self.logger.error(msg)
  140. self.aliyun_logger.logging(code="9003", message=msg)
  141. raise
  142. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  143. """
  144. 执行单条写操作
  145. """
  146. await self.init_pool()
  147. try:
  148. async with self._pool.acquire() as conn:
  149. async with conn.cursor() as cur:
  150. await cur.execute(sql, params or [])
  151. self.logger.info(f"[AsyncMySQLClient] execute 执行成功: {sql}")
  152. return cur.rowcount
  153. except Exception as e:
  154. msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}"
  155. self.logger.error(msg)
  156. self.aliyun_logger.logging(code="9004", message=msg)
  157. raise
  158. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  159. """
  160. 批量执行写操作
  161. """
  162. await self.init_pool()
  163. try:
  164. async with self._pool.acquire() as conn:
  165. async with conn.cursor() as cur:
  166. await cur.executemany(sql, params_list)
  167. self.logger.info(f"[AsyncMySQLClient] executemany 执行成功: {sql}")
  168. return cur.rowcount
  169. except Exception as e:
  170. msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}"
  171. self.logger.error(msg)
  172. self.aliyun_logger.logging(code="9005", message=msg)
  173. raise