123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- import asyncio
- import json
- import os
- import logging
- from typing import List, Optional, Dict, Any, Tuple
- from core.base.async_mysql_client import AsyncMySQLClient
- from core.utils.log.logger_manager import LoggerManager
- from config import settings
- logger = logging.getLogger(__name__)
- class AsyncMysqlService:
- """
- 异步业务数据库访问类(支持单例和async with)
- 功能特点:
- - 单例模式实现,相同配置共享连接池
- - 支持async with上下文管理,自动处理连接池生命周期
- - 封装业务相关的SQL操作
- - 完善的错误处理和日志记录
- """
- # 存储不同配置的单例实例,键为(platform, mode)元组
- _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
- def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
- """基于配置的单例模式,相同platform和mode共享同一个实例"""
- # 处理None值,设置默认值为"system"
- platform = platform or "system"
- mode = mode or "system"
- key = (platform, mode)
- if key not in cls._instances:
- instance = super().__new__(cls)
- instance._platform = platform
- instance._mode = mode
- instance._client = None
- instance._pool_initialized = False
- cls._instances[key] = instance
- return cls._instances[key]
- def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None):
- """初始化数据库配置(仅在创建新实例时执行)"""
- # 避免重复初始化
- if self._client is not None:
- return
- # 处理None值,设置默认值为"system"
- platform = platform or "system"
- mode = mode or "system"
- self._platform = platform
- self._mode = mode
- # 加载环境变量配置
- db_config = {
- "host": settings.DB_HOST,
- "port": settings.DB_PORT,
- "user": settings.DB_USER,
- "password": settings.DB_PASSWORD,
- "db": settings.DB_NAME,
- "charset": settings.DB_CHARSET
- }
- self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
- self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
- # 创建数据库客户端
- self._client = AsyncMySQLClient(
- host=db_config["host"],
- port=db_config["port"],
- user=db_config["user"],
- password=db_config["password"],
- db=db_config["db"],
- charset=db_config["charset"],
- minsize=1,
- maxsize=10
- )
- self.logger.info(f"创建数据库服务实例: platform={platform}, mode={mode}")
- # 以下方法与原实现一致,未修改
- async def __aenter__(self):
- """支持async with上下文,初始化连接池"""
- if not self._pool_initialized:
- try:
- await self._client.init_pool()
- self._pool_initialized = True
- self.logger.info(f"连接池初始化成功: platform={self._platform}, mode={self._mode}")
- except Exception as e:
- self.logger.error(f"连接池初始化失败: {str(e)}")
- raise
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- """支持async with上下文,关闭连接池"""
- if self._pool_initialized:
- try:
- await self._client.close()
- self._pool_initialized = False
- self.logger.info(f"连接池已关闭: platform={self._platform}, mode={self._mode}")
- except Exception as e:
- self.logger.warning(f"连接池关闭失败: {str(e)}")
- @property
- def platform(self) -> str:
- """获取服务关联的平台"""
- return self._platform
- @property
- def mode(self) -> str:
- """获取服务运行模式"""
- return self._mode
- async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
- """执行查询并返回多行结果"""
- try:
- return await self._client.fetch_all(sql, params or [])
- except Exception as e:
- self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
- raise
- async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
- """执行查询并返回单行结果"""
- try:
- return await self._client.fetch_one(sql, params or [])
- except Exception as e:
- self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
- raise
- async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
- """执行单条写操作(insert/update/delete)"""
- try:
- return await self._client.execute(sql, params or [])
- except Exception as e:
- self.logger.error(f"写操作失败 [SQL: {sql}]: {str(e)}")
- raise
- async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
- """批量执行写操作"""
- try:
- return await self._client.executemany(sql, params_list)
- except Exception as e:
- self.logger.error(f"批量写操作失败 [SQL: {sql}]: {str(e)}")
- raise
- # 业务相关方法保持不变...
- async def get_user_list(self, task_id: int) -> List[Dict[str, Any]]:
- sql = "SELECT uid, link, nick_name FROM crawler_user_v3 WHERE task_id = %s"
- return await self.fetch_all(sql, [task_id])
- async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
- sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
- row = await self.fetch_one(sql, [rule_id])
- if not row or "rule" not in row:
- self.logger.warning(f"未找到规则: rule_id={rule_id}")
- return None
- try:
- rule_data = json.loads(row["rule"])
- return {k: v for item in rule_data for k, v in item.items()}
- except json.JSONDecodeError as e:
- self.logger.error(f"规则解析失败 [rule_id={rule_id}]: {str(e)}")
- return None
- async def get_today_videos(self) -> int:
- sql = """
- SELECT COUNT(*) as cnt
- FROM crawler_video
- WHERE DATE(create_time) = CURDATE()
- AND platform = %s
- AND strategy = %s
- """
- self.logger.info(f"查询今日视频数量: platform={self.platform}, strategy={self.mode}")
- result = await self.fetch_one(sql, [self.platform, self.mode])
- return result["cnt"] if result else 0
- # 全局便捷访问函数(支持None参数)
- async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService:
- """获取数据库服务实例的便捷函数,支持platform/mode为None"""
- service = AsyncMysqlService(platform, mode)
- await service.__aenter__()
- return service
- # 示例用法
- async def demo_usage():
- # 方式一:platform和mode为None,使用默认值"system"
- async with AsyncMysqlService() as default_service:
- users = await default_service.get_user_list(8)
- print(f"系统配置用户数: {users}")
- # 方式二:显式传入None
- async with AsyncMysqlService(None, None) as system_service:
- rule = await system_service.get_rule_dict(18)
- print(f"自定义配置规则: {rule}")
- # 方式三:使用便捷函数
- service = await get_db_service("benshanzhufu", "recommend")
- try:
- count = await service.get_today_videos()
- print(f"默认配置今日视频数: {count}")
- finally:
- await service.__aexit__(None, None, None)
- if __name__ == '__main__':
- asyncio.run(demo_usage())
|