| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- import pymysql
- from typing import Tuple, Any, Dict, Literal, Optional
- from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
- from dbutils.steady_db import SteadyDBCursor
- from pymysql.cursors import DictCursor
- class SyncMySQLHelper(object):
- _pool: PooledDB = None
- _instance = None
- def __new__(cls, *args, **kwargs):
- """单例"""
- if cls._instance is None:
- cls._instance = super().__new__(cls, *args, **kwargs)
- return cls._instance
- def get_pool(self):
- if self._pool is None:
- self._pool = PooledDB(
- creator=pymysql,
- mincached=10,
- maxconnections=20,
- blocking=True,
- host='rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com',
- port=3306,
- user='content_rw',
- password='bC1aH4bA1lB0',
- database='content-deconstruction')
- return self._pool
- def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchone()
- return result
- def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchall()
- return result
- def fetchmany(self,
- sql: str,
- data: Optional[Tuple[Any, ...]] = None,
- size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchmany(size=size)
- return result
- def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- try:
- cursor.execute(sql, data)
- result = conn.commit()
- return result
- except pymysql.err.IntegrityError as e:
- if e.args[0] == 1062: # 重复值
- return None
- else:
- raise e
- except pymysql.err.OperationalError as e:
- if e.args[0] == 1205: # 死锁
- conn.rollback()
- return None
- else:
- raise e
- mysql = SyncMySQLHelper()
|