import os import pymysql from dotenv import load_dotenv from typing import Tuple, Any, Dict, Literal, Optional from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection from dbutils.steady_db import SteadyDBCursor from pymysql.cursors import DictCursor class SyncMySQLHelper(object): _pool: PooledDB = None _instance = None def __new__(cls, *args, **kwargs): """单例""" if cls._instance is None: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance def get_pool(self): if self._pool is None: # 加载环境变量,允许通过 .env 配置本机调试数据库 load_dotenv() env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower() host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com') port = int(os.getenv('DB_PORT', '3306')) user = os.getenv('DB_USER', 'content_rw') password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0') database = os.getenv('DB_NAME', 'content-deconstruction-test' if env in ('local','dev','development') else 'content-deconstruction') self._pool = PooledDB( creator=pymysql, mincached=10, maxconnections=20, blocking=True, host=host, port=port, user=user, password=password, database=database) return self._pool def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]: pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchone() return result def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]: pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchall() return result def fetchmany(self, sql: str, data: Optional[Tuple[Any, ...]] = None, size: Optional[int] = None) -> Tuple[Dict[str, Any]]: pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchmany(size=size) return result def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None): pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: try: cursor.execute(sql, data) result = conn.commit() return result except pymysql.err.IntegrityError as e: if e.args[0] == 1062: # 重复值 return None else: raise e except pymysql.err.OperationalError as e: if e.args[0] == 1205: # 死锁 conn.rollback() return None else: raise e mysql = SyncMySQLHelper()