import redis.asyncio as aioredis from core.utils.log.logger_manager import LoggerManager class RedisManager: _pool = None logger = LoggerManager.get_logger() @classmethod async def init( cls, redis_url: str = "", max_connections: int = 20, encoding: str = "utf-8", decode_responses: bool = True ): """ 初始化 Redis 异步连接池,保证进程级单例。 """ if cls._pool is None: try: cls._pool = await aioredis.from_url( redis_url, max_connections=max_connections, encoding=encoding, decode_responses=decode_responses, retry_on_timeout=True, ) cls.logger.debug(f"[RedisManager] Redis 连接池初始化成功: {redis_url}") except Exception as e: cls.logger.error(f"[RedisManager] Redis 连接池初始化失败: {e}") raise @classmethod def get_pool(cls): if cls._pool is None: raise Exception("[RedisManager] 未初始化,请先执行 RedisManager.init()") return cls._pool @classmethod async def close(cls): """ 关闭连接池(在优雅退出时调用) """ if cls._pool: await cls._pool.close() cls.logger.debug("[RedisManager] Redis 连接池已关闭")