| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import os
- import time
- from loguru import logger
- import pymysql
- from dotenv import load_dotenv, find_dotenv
- from typing import Tuple, Any, Dict, Literal, Optional
- from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
- from dbutils.steady_db import SteadyDBCursor
- from pymysql.cursors import DictCursor
- class SyncMySQLHelper(object):
- _pool: PooledDB = None
- _instance = None
- def __new__(cls, *args, **kwargs):
- """单例"""
- if cls._instance is None:
- cls._instance = super().__new__(cls, *args, **kwargs)
- return cls._instance
- def get_pool(self):
- if self._pool is None:
- # 加载环境变量,允许通过 .env 配置本机调试数据库
- load_dotenv(find_dotenv(), override=False)
- env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
- logger.info(f"✅ env = {env}")
- host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
- port = int(os.getenv('DB_PORT', '3306'))
- user = os.getenv('DB_USER', 'content_rw')
- password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
- database = os.getenv('DB_NAME', 'content-deconstruction-supply')
- logger.info(f"✅ 当前使用数据库 : {database}")
- # 防止调度任务因网络/DB 抖动“无限阻塞”
- # - connect_timeout: 建连超时(秒)
- # - read_timeout/write_timeout: 单次读写超时(秒)
- # - blocking=False: 连接池耗尽时直接抛错,避免卡死整个 job
- connect_timeout = int(os.getenv("DB_CONNECT_TIMEOUT", "30"))
- read_timeout = int(os.getenv("DB_READ_TIMEOUT", "50"))
- write_timeout = int(os.getenv("DB_WRITE_TIMEOUT", "50"))
- # 注意:mincached 过大时会在初始化阶段“批量建连”,DB 抖动会直接把启动拖垮。
- # 这里改为懒加载,按需建连。
- mincached = int(os.getenv("DB_POOL_MINCACHED", "0"))
- maxconnections = int(os.getenv("DB_POOL_MAXCONN", "20"))
- last_exc: Optional[Exception] = None
- for attempt in range(2):
- try:
- self._pool = PooledDB(
- creator=pymysql,
- mincached=mincached,
- maxconnections=maxconnections,
- blocking=False,
- maxusage=1000,
- ping=1,
- host=host,
- port=port,
- user=user,
- password=password,
- database=database,
- connect_timeout=connect_timeout,
- read_timeout=read_timeout,
- write_timeout=write_timeout,
- charset="utf8mb4",
- )
- last_exc = None
- break
- except Exception as exc:
- # 保持 _pool 为 None,下一轮调度可继续重试建池
- self._pool = None
- last_exc = exc
- logger.exception(f"❌ 初始化数据库连接池失败(第{attempt + 1}次): {exc}")
- time.sleep(0.5)
- if last_exc is not None:
- raise last_exc
- return self._pool
- def _conn(self):
- pool = self.get_pool()
- try:
- # DBUtils 在 blocking=False 时,连接不足会直接抛异常
- # 这里统一捕获并打印,避免外层任务“静默卡住”
- return pool.connection()
- except Exception as exc:
- logger.exception(f"❌ 获取数据库连接失败: {exc}")
- raise
- def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
- with self._conn() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchone()
- return result
- def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
- with self._conn() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchall()
- return result
- def fetchmany(self,
- sql: str,
- data: Optional[Tuple[Any, ...]] = None,
- size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
- with self._conn() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchmany(size=size)
- return result
- def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
- with self._conn() as conn:
- with conn.cursor(DictCursor) as cursor:
- try:
- cursor.execute(sql, data)
- result = conn.commit()
- return result
- except pymysql.err.IntegrityError as e:
- if e.args[0] == 1062: # 重复值
- return None
- else:
- raise e
- except pymysql.err.OperationalError as e:
- if e.args[0] == 1205: # 死锁
- conn.rollback()
- return None
- else:
- raise e
- mysql = SyncMySQLHelper()
|