async_mysql_client.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. """
  2. 文件功能:
  3. 异步 MySQL 客户端封装(基于 asyncmy):
  4. - 自动管理连接池
  5. - 支持 async with 上下文管理
  6. - 提供常见方法:fetch_all, fetch_one, execute, executemany
  7. - 内部单例复用,避免重复创建连接池
  8. 适用场景:
  9. - 高并发异步任务系统
  10. - 通用业务数据库访问组件
  11. """
  12. import asyncmy
  13. from typing import List, Dict, Any, Optional
  14. class AsyncMySQLClient:
  15. """
  16. 通用异步 MySQL 客户端,基于 asyncmy 实现
  17. """
  18. # 类变量用于单例连接池
  19. _instance: Optional["AsyncMySQLClient"] = None
  20. def __new__(cls, *args, **kwargs):
  21. """
  22. 单例模式,确保同一配置只创建一个连接池实例
  23. """
  24. if cls._instance is None:
  25. cls._instance = super().__new__(cls)
  26. return cls._instance
  27. def __init__(
  28. self,
  29. host: str,
  30. port: int,
  31. user: str,
  32. password: str,
  33. db: str,
  34. charset: str,
  35. minsize: int = 1,
  36. maxsize: int = 5,
  37. ):
  38. self._db_settings = {
  39. "host": host,
  40. "port": port,
  41. "user": user,
  42. "password": password,
  43. "db": db,
  44. "autocommit": True,
  45. "charset": charset,
  46. }
  47. self._minsize = minsize
  48. self._maxsize = maxsize
  49. self._pool: Optional[asyncmy.Pool] = None
  50. async def __aenter__(self):
  51. """支持 async with 上下文初始化连接池"""
  52. await self.init_pool()
  53. return self
  54. async def __aexit__(self, exc_type, exc_val, exc_tb):
  55. """支持 async with 自动关闭连接池"""
  56. await self.close()
  57. async def init_pool(self):
  58. """
  59. 初始化连接池(如未初始化)
  60. """
  61. if not self._pool:
  62. self._pool = await asyncmy.create_pool(
  63. **self._db_settings,
  64. minsize=self._minsize,
  65. maxsize=self._maxsize,
  66. )
  67. async def close(self):
  68. """
  69. 关闭连接池
  70. """
  71. if self._pool:
  72. self._pool.close()
  73. await self._pool.wait_closed()
  74. self._pool = None
  75. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  76. """
  77. 查询多行数据,返回字典列表
  78. """
  79. async with self._pool.acquire() as conn:
  80. async with conn.cursor() as cur:
  81. await cur.execute(sql, params or [])
  82. rows = await cur.fetchall()
  83. columns = [desc[0] for desc in cur.description] # 获取字段名列表
  84. # 转换每一行为字典
  85. result = [dict(zip(columns, row)) for row in rows]
  86. return result
  87. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  88. """
  89. 查询单行数据,返回字典
  90. """
  91. async with self._pool.acquire() as conn:
  92. async with conn.cursor() as cur:
  93. await cur.execute(sql, params or [])
  94. row = await cur.fetchone()
  95. if row is None:
  96. return None
  97. columns = [desc[0] for desc in cur.description]
  98. return dict(zip(columns, row))
  99. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  100. """
  101. 执行单条写操作(insert/update/delete)
  102. :param sql: SQL 语句
  103. :param params: 参数列表
  104. :return: 影响行数
  105. """
  106. async with self._pool.acquire() as conn:
  107. async with conn.cursor() as cur:
  108. await cur.execute(sql, params or [])
  109. return cur.rowcount
  110. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  111. """
  112. 批量执行写操作
  113. :param sql: SQL 语句
  114. :param params_list: 多组参数
  115. :return: 总影响行数
  116. """
  117. async with self._pool.acquire() as conn:
  118. async with conn.cursor() as cur:
  119. await cur.executemany(sql, params_list)
  120. return cur.rowcount