import pymysql from typing import Tuple, Any, Dict, Literal, Optional from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection from dbutils.steady_db import SteadyDBCursor from pymysql.cursors import DictCursor class SyncMySQLHelper(object): _pool: PooledDB = None _instance = None def __new__(cls, *args, **kwargs): """单例""" if cls._instance is None: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance def get_pool(self): if self._pool is None: self._pool = PooledDB( creator=pymysql, mincached=10, maxconnections=20, blocking=True, host='rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com', port=3306, user='content_rw', password='bC1aH4bA1lB0', database='content-deconstruction') return self._pool def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]: pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchone() return result def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]: pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchall() return result def fetchmany(self, sql: str, data: Optional[Tuple[Any, ...]] = None, size: Optional[int] = None) -> Tuple[Dict[str, Any]]: pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: cursor.execute(sql, data) result = cursor.fetchmany(size=size) return result def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None): pool = self.get_pool() with pool.connection() as conn: with conn.cursor(DictCursor) as cursor: try: cursor.execute(sql, data) result = conn.commit() return result except pymysql.err.IntegrityError as e: if e.args[0] == 1062: # 重复值 return None else: raise e except pymysql.err.OperationalError as e: if e.args[0] == 1205: # 死锁 conn.rollback() return None else: raise e mysql = SyncMySQLHelper()