async_mysql_service.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import asyncio
  2. import json
  3. from typing import List, Optional, Dict, Any, Tuple
  4. from config import settings
  5. from core.base.async_mysql_client import AsyncMySQLClient
  6. from core.utils.log.logger_manager import LoggerManager
  7. class AsyncMysqlService:
  8. """
  9. 异步业务数据库访问类(支持单例和async with)
  10. 功能特点:
  11. - 单例模式实现,相同配置共享连接池
  12. - 支持async with上下文管理,自动处理连接池生命周期
  13. - 封装业务相关的SQL操作
  14. - 完善的错误处理和日志记录
  15. """
  16. # 存储不同配置的单例实例,键为(platform, mode)元组
  17. _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
  18. def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
  19. """基于配置的单例模式,相同platform和mode共享同一个实例"""
  20. # 处理None值,设置默认值为"system"
  21. platform = platform or "system"
  22. mode = mode or "crawler"
  23. key = (platform, mode)
  24. if key not in cls._instances:
  25. instance = super().__new__(cls)
  26. instance._platform = platform
  27. instance._mode = mode
  28. instance._client = None
  29. instance._pool_initialized = False
  30. cls._instances[key] = instance
  31. return cls._instances[key]
  32. def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None,trace_id: Optional[str] = None):
  33. """初始化数据库配置(仅在创建新实例时执行)"""
  34. # 避免重复初始化
  35. if self._client is not None:
  36. return
  37. self.platform = platform or "system"
  38. self.mode = mode or "crawler"
  39. self.trace_id = trace_id
  40. # 加载环境变量配置
  41. db_config = {
  42. "host": settings.DB_HOST,
  43. "port": settings.DB_PORT,
  44. "user": settings.DB_USER,
  45. "password": settings.DB_PASSWORD,
  46. "db": settings.DB_NAME,
  47. "charset": settings.DB_CHARSET
  48. }
  49. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  50. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  51. # 创建数据库客户端
  52. self._client = AsyncMySQLClient(
  53. host=db_config["host"],
  54. port=db_config["port"],
  55. user=db_config["user"],
  56. password=db_config["password"],
  57. db=db_config["db"],
  58. charset=db_config["charset"],
  59. minsize=1,
  60. maxsize=10,
  61. logger=self.logger,
  62. aliyun_logr=self.aliyun_logr
  63. )
  64. async def __aenter__(self):
  65. """支持async with上下文,初始化连接池"""
  66. if not self._pool_initialized:
  67. try:
  68. await self._client.init_pool()
  69. self._pool_initialized = True
  70. self.logger.info(f"连接池初始化成功: classname={self._platform}, mode={self._mode}")
  71. except Exception as e:
  72. self.logger.error(f"连接池初始化失败: {str(e)}")
  73. raise
  74. return self
  75. async def __aexit__(self, exc_type, exc_val, exc_tb):
  76. """支持async with上下文,关闭连接池"""
  77. if self._pool_initialized:
  78. try:
  79. await self._client.close()
  80. self._pool_initialized = False
  81. self.logger.info(f"连接池已关闭: classname={self._platform}, mode={self._mode}")
  82. except Exception as e:
  83. self.logger.warning(f"连接池关闭失败: {str(e)}")
  84. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  85. """执行查询并返回多行结果"""
  86. try:
  87. return await self._client.fetch_all(sql, params or [])
  88. except Exception as e:
  89. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  90. raise
  91. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  92. """执行查询并返回单行结果"""
  93. try:
  94. return await self._client.fetch_one(sql, params or [])
  95. except Exception as e:
  96. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  97. raise
  98. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  99. """执行单条写操作(insert/update/delete)"""
  100. try:
  101. return await self._client.execute(sql, params or [])
  102. except Exception as e:
  103. self.logger.error(f"写操作失败 [SQL: {sql}]: {str(e)}")
  104. raise
  105. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  106. """批量执行写操作"""
  107. try:
  108. return await self._client.executemany(sql, params_list)
  109. except Exception as e:
  110. self.logger.error(f"批量写操作失败 [SQL: {sql}]: {str(e)}")
  111. raise
  112. # 业务相关方法保持不变...
  113. async def get_user_list(self, task_id: int) -> List[Dict[str, Any]]:
  114. sql = "SELECT uid, link, nick_name FROM crawler_user_v3 WHERE task_id = %s"
  115. return await self.fetch_all(sql, [task_id])
  116. async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
  117. sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
  118. row = await self.fetch_one(sql, [rule_id])
  119. if not row or "rule" not in row:
  120. self.logger.warning(f"未找到规则: rule_id={rule_id}")
  121. return None
  122. try:
  123. rule_data = json.loads(row["rule"])
  124. return {k: v for item in rule_data for k, v in item.items()}
  125. except json.JSONDecodeError as e:
  126. self.logger.error(f"规则解析失败 [rule_id={rule_id}]: {str(e)}")
  127. return None
  128. async def get_today_videos(self) -> int:
  129. sql = """
  130. SELECT COUNT(*) as cnt
  131. FROM crawler_video
  132. WHERE DATE (create_time) = CURDATE()
  133. AND platform = %s
  134. AND strategy = %s \
  135. """
  136. result = await self.fetch_one(sql, [self.platform, self.mode])
  137. return result["cnt"] if result else 0
  138. # 全局便捷访问函数(支持None参数)
  139. async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService:
  140. """获取数据库服务实例的便捷函数,支持platform/mode为None"""
  141. service = AsyncMysqlService(platform, mode)
  142. await service.__aenter__()
  143. return service
  144. # 示例用法
  145. async def demo_usage():
  146. # 方式一:platform和mode为None,使用默认值"system"
  147. async with AsyncMysqlService() as default_service:
  148. users = await default_service.get_user_list(8)
  149. print(f"系统配置用户数: {users}")
  150. # 方式二:显式传入None
  151. async with AsyncMysqlService(None, None) as system_service:
  152. rule = await system_service.get_rule_dict(18)
  153. print(f"自定义配置规则: {rule}")
  154. # 方式三:使用便捷函数
  155. service = await get_db_service("benshanzhufu", "recommend")
  156. try:
  157. count = await service.get_today_videos()
  158. print(f"默认配置今日视频数: {count}")
  159. finally:
  160. await service.__aexit__(None, None, None)
  161. if __name__ == '__main__':
  162. asyncio.run(demo_usage())