import asyncio import json import logging from typing import List, Optional, Dict, Any, Tuple from config import settings from core.base.async_mysql_client import AsyncMySQLClient from core.utils.log.logger_manager import LoggerManager logger = logging.getLogger(__name__) class AsyncMysqlService: """ 异步业务数据库访问类(支持单例和async with) 功能特点: - 单例模式实现,相同配置共享连接池 - 支持async with上下文管理,自动处理连接池生命周期 - 封装业务相关的SQL操作 - 完善的错误处理和日志记录 """ # 存储不同配置的单例实例,键为(classname, mode)元组 _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {} def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None): """基于配置的单例模式,相同platform和mode共享同一个实例""" # 处理None值,设置默认值为"system" platform = platform or "system" mode = mode or "system" key = (platform, mode) if key not in cls._instances: instance = super().__new__(cls) instance._platform = platform instance._mode = mode instance._client = None instance._pool_initialized = False cls._instances[key] = instance return cls._instances[key] def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None): """初始化数据库配置(仅在创建新实例时执行)""" # 避免重复初始化 if self._client is not None: return # 处理None值,设置默认值为"system" platform = platform or "system" mode = mode or "system" self._platform = platform self._mode = mode # 加载环境变量配置 db_config = { "host": settings.DB_HOST, "port": settings.DB_PORT, "user": settings.DB_USER, "password": settings.DB_PASSWORD, "db": settings.DB_NAME, "charset": settings.DB_CHARSET } self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode) self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode) # 创建数据库客户端 self._client = AsyncMySQLClient( host=db_config["host"], port=db_config["port"], user=db_config["user"], password=db_config["password"], db=db_config["db"], charset=db_config["charset"], minsize=1, maxsize=10 ) self.logger.info(f"创建数据库服务实例: classname={platform}, mode={mode}") async def __aenter__(self): """支持async with上下文,初始化连接池""" if not self._pool_initialized: try: await self._client.init_pool() self._pool_initialized = True self.logger.info(f"连接池初始化成功: classname={self._platform}, mode={self._mode}") except Exception as e: self.logger.error(f"连接池初始化失败: {str(e)}") raise return self async def __aexit__(self, exc_type, exc_val, exc_tb): """支持async with上下文,关闭连接池""" if self._pool_initialized: try: await self._client.close() self._pool_initialized = False self.logger.info(f"连接池已关闭: classname={self._platform}, mode={self._mode}") except Exception as e: self.logger.warning(f"连接池关闭失败: {str(e)}") @property def platform(self) -> str: """获取服务关联的平台""" return self._platform @property def mode(self) -> str: """获取服务运行模式""" return self._mode async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]: """执行查询并返回多行结果""" try: return await self._client.fetch_all(sql, params or []) except Exception as e: self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}") raise async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]: """执行查询并返回单行结果""" try: return await self._client.fetch_one(sql, params or []) except Exception as e: self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}") raise async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int: """执行单条写操作(insert/update/delete)""" try: return await self._client.execute(sql, params or []) except Exception as e: self.logger.error(f"写操作失败 [SQL: {sql}]: {str(e)}") raise async def executemany(self, sql: str, params_list: List[List[Any]]) -> int: """批量执行写操作""" try: return await self._client.executemany(sql, params_list) except Exception as e: self.logger.error(f"批量写操作失败 [SQL: {sql}]: {str(e)}") raise # 业务相关方法保持不变... async def get_user_list(self, task_id: int) -> List[Dict[str, Any]]: sql = "SELECT uid, link, nick_name FROM crawler_user_v3 WHERE task_id = %s" return await self.fetch_all(sql, [task_id]) async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]: sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s" row = await self.fetch_one(sql, [rule_id]) if not row or "rule" not in row: self.logger.warning(f"未找到规则: rule_id={rule_id}") return None try: rule_data = json.loads(row["rule"]) return {k: v for item in rule_data for k, v in item.items()} except json.JSONDecodeError as e: self.logger.error(f"规则解析失败 [rule_id={rule_id}]: {str(e)}") return None async def get_today_videos(self) -> int: sql = """ SELECT COUNT(*) as cnt FROM crawler_video WHERE DATE (create_time) = CURDATE() AND classname = %s AND strategy = %s \ """ self.logger.info(f"查询今日视频数量: classname={self.platform}, strategy={self.mode}") result = await self.fetch_one(sql, [self.platform, self.mode]) return result["cnt"] if result else 0 # 全局便捷访问函数(支持None参数) async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService: """获取数据库服务实例的便捷函数,支持platform/mode为None""" service = AsyncMysqlService(platform, mode) await service.__aenter__() return service # 示例用法 async def demo_usage(): # 方式一:platform和mode为None,使用默认值"system" async with AsyncMysqlService() as default_service: users = await default_service.get_user_list(8) print(f"系统配置用户数: {users}") # 方式二:显式传入None async with AsyncMysqlService(None, None) as system_service: rule = await system_service.get_rule_dict(18) print(f"自定义配置规则: {rule}") # 方式三:使用便捷函数 service = await get_db_service("benshanzhufu", "recommend") try: count = await service.get_today_videos() print(f"默认配置今日视频数: {count}") finally: await service.__aexit__(None, None, None) if __name__ == '__main__': asyncio.run(demo_usage())