import os import time from loguru import logger import pymysql from dotenv import load_dotenv, find_dotenv from typing import Tuple, Any, Dict, Literal, Optional from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection from dbutils.steady_db import SteadyDBCursor from pymysql.cursors import DictCursor class SyncMySQLHelper(object): _pool: PooledDB = None _instance = None def __new__(cls, *args, **kwargs): """单例""" if cls._instance is None: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance def get_pool(self): if self._pool is None: # 加载环境变量,允许通过 .env 配置本机调试数据库 load_dotenv(find_dotenv(), override=False) env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower() logger.info(f"✅ env = {env}") host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com') port = int(os.getenv('DB_PORT', '3306')) user = os.getenv('DB_USER', 'content_rw') password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0') database = os.getenv('DB_NAME', 'content-deconstruction-supply') logger.info(f"✅ 当前使用数据库 : {database}") # 防止调度任务因网络/DB 抖动“无限阻塞” # - connect_timeout: 建连超时(秒) # - read_timeout/write_timeout: 单次读写超时(秒) # - blocking=False: 连接池耗尽时直接抛错,避免卡死整个 job connect_timeout = int(os.getenv("DB_CONNECT_TIMEOUT", "30")) read_timeout = int(os.getenv("DB_READ_TIMEOUT", "50")) write_timeout = int(os.getenv("DB_WRITE_TIMEOUT", "50")) # 注意:mincached 过大时会在初始化阶段“批量建连”,DB 抖动会直接把启动拖垮。 # 这里改为懒加载,按需建连。 mincached = int(os.getenv("DB_POOL_MINCACHED", "0")) maxconnections = int(os.getenv("DB_POOL_MAXCONN", "20")) last_exc: Optional[Exception] = None for attempt in range(2): try: self._pool = PooledDB( creator=pymysql, mincached=mincached, maxconnections=maxconnections, blocking=False, maxusage=1000, ping=1, host=host, port=port, user=user, password=password, database=database, connect_timeout=connect_timeout, read_timeout=read_timeout, write_timeout=write_timeout, charset="utf8mb4", ) last_exc = None break except Exception as exc: # 保持 _pool 为 None,下一轮调度可继续重试建池 self._pool = None last_exc = exc logger.exception(f"❌ 初始化数据库连接池失败(第{attempt + 1}次): {exc}") time.sleep(0.5) if last_exc is not None: raise last_exc return self._pool def _conn(self): pool = self.get_pool() try: # DBUtils 在 blocking=False 时,连接不足会直接抛异常 # 这里统一捕获并打印,避免外层任务“静默卡住” return pool.connection() except Exception as exc: logger.exception(f"❌ 获取数据库连接失败: {exc}") raise def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]: with self._conn() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchone() return result def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]: with self._conn() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchall() return result def fetchmany(self, sql: str, data: Optional[Tuple[Any, ...]] = None, size: Optional[int] = None) -> Tuple[Dict[str, Any]]: with self._conn() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchmany(size=size) return result def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None): with self._conn() as conn: with conn.cursor(DictCursor) as cursor: try: cursor.execute(sql, data) result = conn.commit() return result except pymysql.err.IntegrityError as e: if e.args[0] == 1062: # 重复值 return None else: raise e except pymysql.err.OperationalError as e: if e.args[0] == 1205: # 死锁 conn.rollback() return None else: raise e mysql = SyncMySQLHelper()