async_mysql_service.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import asyncio
  2. import json
  3. import logging
  4. from typing import List, Optional, Dict, Any, Tuple
  5. from config import settings
  6. from core.base.async_mysql_client import AsyncMySQLClient
  7. from core.utils.log.logger_manager import LoggerManager
  8. logger = logging.getLogger(__name__)
  9. class AsyncMysqlService:
  10. """
  11. 异步业务数据库访问类(支持单例和async with)
  12. 功能特点:
  13. - 单例模式实现,相同配置共享连接池
  14. - 支持async with上下文管理,自动处理连接池生命周期
  15. - 封装业务相关的SQL操作
  16. - 完善的错误处理和日志记录
  17. """
  18. # 存储不同配置的单例实例,键为(classname, mode)元组
  19. _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
  20. def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
  21. """基于配置的单例模式,相同platform和mode共享同一个实例"""
  22. # 处理None值,设置默认值为"system"
  23. platform = platform or "system"
  24. mode = mode or "crawler"
  25. key = (platform, mode)
  26. if key not in cls._instances:
  27. instance = super().__new__(cls)
  28. instance._platform = platform
  29. instance._mode = mode
  30. instance._client = None
  31. instance._pool_initialized = False
  32. cls._instances[key] = instance
  33. return cls._instances[key]
  34. def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None):
  35. """初始化数据库配置(仅在创建新实例时执行)"""
  36. # 避免重复初始化
  37. if self._client is not None:
  38. return
  39. # 处理None值,设置默认值为"system"
  40. platform = platform or "system"
  41. mode = mode or "crawler"
  42. self._platform = platform
  43. self._mode = mode
  44. # 加载环境变量配置
  45. db_config = {
  46. "host": settings.DB_HOST,
  47. "port": settings.DB_PORT,
  48. "user": settings.DB_USER,
  49. "password": settings.DB_PASSWORD,
  50. "db": settings.DB_NAME,
  51. "charset": settings.DB_CHARSET
  52. }
  53. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  54. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  55. # 创建数据库客户端
  56. self._client = AsyncMySQLClient(
  57. host=db_config["host"],
  58. port=db_config["port"],
  59. user=db_config["user"],
  60. password=db_config["password"],
  61. db=db_config["db"],
  62. charset=db_config["charset"],
  63. minsize=1,
  64. maxsize=10
  65. )
  66. self.logger.info(f"创建数据库服务实例: classname={platform}, mode={mode}")
  67. async def __aenter__(self):
  68. """支持async with上下文,初始化连接池"""
  69. if not self._pool_initialized:
  70. try:
  71. await self._client.init_pool()
  72. self._pool_initialized = True
  73. self.logger.info(f"连接池初始化成功: classname={self._platform}, mode={self._mode}")
  74. except Exception as e:
  75. self.logger.error(f"连接池初始化失败: {str(e)}")
  76. raise
  77. return self
  78. async def __aexit__(self, exc_type, exc_val, exc_tb):
  79. """支持async with上下文,关闭连接池"""
  80. if self._pool_initialized:
  81. try:
  82. await self._client.close()
  83. self._pool_initialized = False
  84. self.logger.info(f"连接池已关闭: classname={self._platform}, mode={self._mode}")
  85. except Exception as e:
  86. self.logger.warning(f"连接池关闭失败: {str(e)}")
  87. @property
  88. def platform(self) -> str:
  89. """获取服务关联的平台"""
  90. return self._platform
  91. @property
  92. def mode(self) -> str:
  93. """获取服务运行模式"""
  94. return self._mode
  95. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  96. """执行查询并返回多行结果"""
  97. try:
  98. return await self._client.fetch_all(sql, params or [])
  99. except Exception as e:
  100. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  101. raise
  102. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  103. """执行查询并返回单行结果"""
  104. try:
  105. return await self._client.fetch_one(sql, params or [])
  106. except Exception as e:
  107. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  108. raise
  109. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  110. """执行单条写操作(insert/update/delete)"""
  111. try:
  112. return await self._client.execute(sql, params or [])
  113. except Exception as e:
  114. self.logger.error(f"写操作失败 [SQL: {sql}]: {str(e)}")
  115. raise
  116. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  117. """批量执行写操作"""
  118. try:
  119. return await self._client.executemany(sql, params_list)
  120. except Exception as e:
  121. self.logger.error(f"批量写操作失败 [SQL: {sql}]: {str(e)}")
  122. raise
  123. # 业务相关方法保持不变...
  124. async def get_user_list(self, task_id: int) -> List[Dict[str, Any]]:
  125. sql = "SELECT uid, link, nick_name FROM crawler_user_v3 WHERE task_id = %s"
  126. return await self.fetch_all(sql, [task_id])
  127. async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
  128. sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
  129. row = await self.fetch_one(sql, [rule_id])
  130. if not row or "rule" not in row:
  131. self.logger.warning(f"未找到规则: rule_id={rule_id}")
  132. return None
  133. try:
  134. rule_data = json.loads(row["rule"])
  135. return {k: v for item in rule_data for k, v in item.items()}
  136. except json.JSONDecodeError as e:
  137. self.logger.error(f"规则解析失败 [rule_id={rule_id}]: {str(e)}")
  138. return None
  139. async def get_today_videos(self) -> int:
  140. sql = """
  141. SELECT COUNT(*) as cnt
  142. FROM crawler_video
  143. WHERE DATE (create_time) = CURDATE()
  144. AND platform = %s
  145. AND strategy = %s \
  146. """
  147. self.logger.info(f"查询今日视频数量: platform={self.platform}, strategy={self.mode}")
  148. result = await self.fetch_one(sql, [self.platform, self.mode])
  149. return result["cnt"] if result else 0
  150. # 全局便捷访问函数(支持None参数)
  151. async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService:
  152. """获取数据库服务实例的便捷函数,支持platform/mode为None"""
  153. service = AsyncMysqlService(platform, mode)
  154. await service.__aenter__()
  155. return service
  156. # 示例用法
  157. async def demo_usage():
  158. # 方式一:platform和mode为None,使用默认值"system"
  159. async with AsyncMysqlService() as default_service:
  160. users = await default_service.get_user_list(8)
  161. print(f"系统配置用户数: {users}")
  162. # 方式二:显式传入None
  163. async with AsyncMysqlService(None, None) as system_service:
  164. rule = await system_service.get_rule_dict(18)
  165. print(f"自定义配置规则: {rule}")
  166. # 方式三:使用便捷函数
  167. service = await get_db_service("benshanzhufu", "recommend")
  168. try:
  169. count = await service.get_today_videos()
  170. print(f"默认配置今日视频数: {count}")
  171. finally:
  172. await service.__aexit__(None, None, None)
  173. if __name__ == '__main__':
  174. asyncio.run(demo_usage())