async_mysql_client.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. AsyncMySQLClient (基于 asyncmy) + 项目统一日志正式上线版
  3. - 自动管理连接池,避免重复初始化和内存泄漏
  4. - 支持 async with 上下文管理
  5. - 高并发协程安全
  6. """
  7. import traceback
  8. from typing import List, Dict, Any, Optional, Tuple
  9. import asyncio
  10. import asyncmy
  11. from core.utils.log.logger_manager import LoggerManager
  12. class AsyncMySQLClient:
  13. # 类变量:同配置单例管理连接池实例
  14. _instances: Dict[Tuple, "AsyncMySQLClient"] = {}
  15. def __new__(cls,
  16. host: str,
  17. port: int,
  18. user: str,
  19. password: str,
  20. db: str,
  21. charset: str,
  22. minsize: int = 1,
  23. maxsize: int = 5):
  24. """
  25. 单例模式:同配置共享同一连接池实例
  26. """
  27. key = (host, port, user, db)
  28. if key not in cls._instances:
  29. instance = super().__new__(cls)
  30. cls._instances[key] = instance
  31. return cls._instances[key]
  32. def __init__(self,
  33. host: str,
  34. port: int,
  35. user: str,
  36. password: str,
  37. db: str,
  38. charset: str,
  39. minsize: int = 1,
  40. maxsize: int = 5,
  41. logger: Optional[LoggerManager] = None,
  42. aliyun_logr:Optional[LoggerManager] = None,):
  43. """
  44. 初始化配置,延迟创建连接池
  45. """
  46. self._db_settings = {
  47. "host": host,
  48. "port": port,
  49. "user": user,
  50. "password": password,
  51. "db": db,
  52. "autocommit": True,
  53. "charset": charset,
  54. "connect_timeout": 5,
  55. }
  56. self._minsize = minsize
  57. self._maxsize = maxsize
  58. self._pool: Optional[asyncmy.Pool] = None
  59. self._lock = asyncio.Lock() # 防止并发初始化
  60. self.logger = logger
  61. self.aliyun_logger = aliyun_logr
  62. async def __aenter__(self):
  63. """支持 async with 自动初始化连接池"""
  64. await self.init_pool()
  65. return self
  66. async def __aexit__(self, exc_type, exc_val, exc_tb):
  67. """支持 async with 自动关闭连接池"""
  68. await self.close()
  69. async def init_pool(self):
  70. """
  71. 初始化连接池(懒加载 + 并发锁保护)
  72. """
  73. if self._pool:
  74. return
  75. async with self._lock:
  76. if self._pool:
  77. return
  78. try:
  79. self._pool = await asyncmy.create_pool(
  80. **self._db_settings,
  81. minsize=self._minsize,
  82. maxsize=self._maxsize,
  83. )
  84. except Exception as e:
  85. msg = f"[AsyncMySQLClient] 连接池初始化失败: {e} \n {traceback.format_exc()}"
  86. self.logger.error(msg)
  87. self.aliyun_logger.logging(code="9001", message=msg, data={f"error: f{traceback.format_exc()} \n {e}"})
  88. raise
  89. async def close(self):
  90. """
  91. 关闭连接池
  92. """
  93. if self._pool:
  94. self._pool.close()
  95. await self._pool.wait_closed()
  96. self._pool = None
  97. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  98. """
  99. 查询多行,返回字典列表
  100. """
  101. await self.init_pool()
  102. try:
  103. async with self._pool.acquire() as conn:
  104. async with conn.cursor() as cur:
  105. await cur.execute(sql, params or [])
  106. rows = await cur.fetchall()
  107. columns = [desc[0] for desc in cur.description]
  108. result = [dict(zip(columns, row)) for row in rows]
  109. return result
  110. except Exception as e:
  111. msg = f"[AsyncMySQLClient] fetch_all 执行失败: {e} | SQL: {sql}"
  112. self.logger.error(msg)
  113. self.aliyun_logger.logging(code="9002", message=msg)
  114. raise
  115. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  116. """
  117. 查询单行,返回字典
  118. """
  119. await self.init_pool()
  120. try:
  121. async with self._pool.acquire() as conn:
  122. async with conn.cursor() as cur:
  123. await cur.execute(sql, params or [])
  124. row = await cur.fetchone()
  125. if row is None:
  126. return None
  127. columns = [desc[0] for desc in cur.description]
  128. result = dict(zip(columns, row))
  129. return result
  130. except Exception as e:
  131. msg = f"[AsyncMySQLClient] fetch_one 执行失败: {e} | SQL: {sql}"
  132. self.logger.error(msg)
  133. self.aliyun_logger.logging(code="9003", message=msg)
  134. raise
  135. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  136. """
  137. 执行单条写操作
  138. """
  139. await self.init_pool()
  140. try:
  141. async with self._pool.acquire() as conn:
  142. async with conn.cursor() as cur:
  143. await cur.execute(sql, params or [])
  144. return cur.rowcount
  145. except Exception as e:
  146. msg = f"[AsyncMySQLClient] execute 执行失败: {e} | SQL: {sql}"
  147. self.logger.error(msg)
  148. self.aliyun_logger.logging(code="9004", message=msg)
  149. raise
  150. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  151. """
  152. 批量执行写操作
  153. """
  154. await self.init_pool()
  155. try:
  156. async with self._pool.acquire() as conn:
  157. async with conn.cursor() as cur:
  158. await cur.executemany(sql, params_list)
  159. return cur.rowcount
  160. except Exception as e:
  161. msg = f"[AsyncMySQLClient] executemany 执行失败: {e} | SQL: {sql}"
  162. self.logger.error(msg)
  163. self.aliyun_logger.logging(code="9005", message=msg)
  164. raise