async_mysql_service.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import asyncio
  2. import json
  3. import os
  4. import logging
  5. from typing import List, Optional, Dict, Any, Tuple
  6. from core.base.async_mysql_client import AsyncMySQLClient
  7. from core.utils.log.logger_manager import LoggerManager
  8. from config import settings
  9. logger = logging.getLogger(__name__)
  10. class AsyncMysqlService:
  11. """
  12. 异步业务数据库访问类(支持单例和async with)
  13. 功能特点:
  14. - 单例模式实现,相同配置共享连接池
  15. - 支持async with上下文管理,自动处理连接池生命周期
  16. - 封装业务相关的SQL操作
  17. - 完善的错误处理和日志记录
  18. """
  19. # 存储不同配置的单例实例,键为(platform, mode)元组
  20. _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
  21. def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
  22. """基于配置的单例模式,相同platform和mode共享同一个实例"""
  23. # 处理None值,设置默认值为"system"
  24. platform = platform or "system"
  25. mode = mode or "system"
  26. key = (platform, mode)
  27. if key not in cls._instances:
  28. instance = super().__new__(cls)
  29. instance._platform = platform
  30. instance._mode = mode
  31. instance._client = None
  32. instance._pool_initialized = False
  33. cls._instances[key] = instance
  34. return cls._instances[key]
  35. def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None):
  36. """初始化数据库配置(仅在创建新实例时执行)"""
  37. # 避免重复初始化
  38. if self._client is not None:
  39. return
  40. # 处理None值,设置默认值为"system"
  41. platform = platform or "system"
  42. mode = mode or "system"
  43. self._platform = platform
  44. self._mode = mode
  45. # 加载环境变量配置
  46. db_config = {
  47. "host": settings.DB_HOST,
  48. "port": settings.DB_PORT,
  49. "user": settings.DB_USER,
  50. "password": settings.DB_PASSWORD,
  51. "db": settings.DB_NAME,
  52. "charset": settings.DB_CHARSET
  53. }
  54. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  55. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  56. # 创建数据库客户端
  57. self._client = AsyncMySQLClient(
  58. host=db_config["host"],
  59. port=db_config["port"],
  60. user=db_config["user"],
  61. password=db_config["password"],
  62. db=db_config["db"],
  63. charset=db_config["charset"],
  64. minsize=1,
  65. maxsize=10
  66. )
  67. self.logger.info(f"创建数据库服务实例: platform={platform}, mode={mode}")
  68. # 以下方法与原实现一致,未修改
  69. async def __aenter__(self):
  70. """支持async with上下文,初始化连接池"""
  71. if not self._pool_initialized:
  72. try:
  73. await self._client.init_pool()
  74. self._pool_initialized = True
  75. self.logger.info(f"连接池初始化成功: platform={self._platform}, mode={self._mode}")
  76. except Exception as e:
  77. self.logger.error(f"连接池初始化失败: {str(e)}")
  78. raise
  79. return self
  80. async def __aexit__(self, exc_type, exc_val, exc_tb):
  81. """支持async with上下文,关闭连接池"""
  82. if self._pool_initialized:
  83. try:
  84. await self._client.close()
  85. self._pool_initialized = False
  86. self.logger.info(f"连接池已关闭: platform={self._platform}, mode={self._mode}")
  87. except Exception as e:
  88. self.logger.warning(f"连接池关闭失败: {str(e)}")
  89. @property
  90. def platform(self) -> str:
  91. """获取服务关联的平台"""
  92. return self._platform
  93. @property
  94. def mode(self) -> str:
  95. """获取服务运行模式"""
  96. return self._mode
  97. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  98. """执行查询并返回多行结果"""
  99. try:
  100. return await self._client.fetch_all(sql, params or [])
  101. except Exception as e:
  102. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  103. raise
  104. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  105. """执行查询并返回单行结果"""
  106. try:
  107. return await self._client.fetch_one(sql, params or [])
  108. except Exception as e:
  109. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  110. raise
  111. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  112. """执行单条写操作(insert/update/delete)"""
  113. try:
  114. return await self._client.execute(sql, params or [])
  115. except Exception as e:
  116. self.logger.error(f"写操作失败 [SQL: {sql}]: {str(e)}")
  117. raise
  118. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  119. """批量执行写操作"""
  120. try:
  121. return await self._client.executemany(sql, params_list)
  122. except Exception as e:
  123. self.logger.error(f"批量写操作失败 [SQL: {sql}]: {str(e)}")
  124. raise
  125. # 业务相关方法保持不变...
  126. async def get_user_list(self, task_id: int) -> List[Dict[str, Any]]:
  127. sql = "SELECT uid, link, nick_name FROM crawler_user_v3 WHERE task_id = %s"
  128. return await self.fetch_all(sql, [task_id])
  129. async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
  130. sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
  131. row = await self.fetch_one(sql, [rule_id])
  132. if not row or "rule" not in row:
  133. self.logger.warning(f"未找到规则: rule_id={rule_id}")
  134. return None
  135. try:
  136. rule_data = json.loads(row["rule"])
  137. return {k: v for item in rule_data for k, v in item.items()}
  138. except json.JSONDecodeError as e:
  139. self.logger.error(f"规则解析失败 [rule_id={rule_id}]: {str(e)}")
  140. return None
  141. async def get_today_videos(self) -> int:
  142. sql = """
  143. SELECT COUNT(*) as cnt
  144. FROM crawler_video
  145. WHERE DATE(create_time) = CURDATE()
  146. AND platform = %s
  147. AND strategy = %s
  148. """
  149. self.logger.info(f"查询今日视频数量: platform={self.platform}, strategy={self.mode}")
  150. result = await self.fetch_one(sql, [self.platform, self.mode])
  151. return result["cnt"] if result else 0
  152. # 全局便捷访问函数(支持None参数)
  153. async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService:
  154. """获取数据库服务实例的便捷函数,支持platform/mode为None"""
  155. service = AsyncMysqlService(platform, mode)
  156. await service.__aenter__()
  157. return service
  158. # 示例用法
  159. async def demo_usage():
  160. # 方式一:platform和mode为None,使用默认值"system"
  161. async with AsyncMysqlService() as default_service:
  162. users = await default_service.get_user_list(8)
  163. print(f"系统配置用户数: {users}")
  164. # 方式二:显式传入None
  165. async with AsyncMysqlService(None, None) as system_service:
  166. rule = await system_service.get_rule_dict(18)
  167. print(f"自定义配置规则: {rule}")
  168. # 方式三:使用便捷函数
  169. service = await get_db_service("benshanzhufu", "recommend")
  170. try:
  171. count = await service.get_today_videos()
  172. print(f"默认配置今日视频数: {count}")
  173. finally:
  174. await service.__aexit__(None, None, None)
  175. if __name__ == '__main__':
  176. asyncio.run(demo_usage())