| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- import os
- from loguru import logger
- import pymysql
- from dotenv import load_dotenv, find_dotenv
- from typing import Tuple, Any, Dict, Literal, Optional
- from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
- from dbutils.steady_db import SteadyDBCursor
- from pymysql.cursors import DictCursor
- class SyncMySQLHelper(object):
- _pool: PooledDB = None
- _instance = None
- def __new__(cls, *args, **kwargs):
- """单例"""
- if cls._instance is None:
- cls._instance = super().__new__(cls, *args, **kwargs)
- return cls._instance
- def get_pool(self):
- if self._pool is None:
- # 加载环境变量,允许通过 .env 配置本机调试数据库
- load_dotenv(find_dotenv(), override=False)
- env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
- logger.info(f"✅ env = {env}")
- host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
- port = int(os.getenv('DB_PORT', '3306'))
- user = os.getenv('DB_USER', 'content_rw')
- password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
- database = os.getenv('DB_NAME', 'content-deconstruction-test' if env in ('local','dev','development') else 'content-deconstruction')
- logger.info(f"✅ 当前使用数据库 : {database}")
- self._pool = PooledDB(
- creator=pymysql,
- mincached=10,
- maxconnections=20,
- blocking=True,
- host=host,
- port=port,
- user=user,
- password=password,
- database=database)
- return self._pool
- def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchone()
- return result
- def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchall()
- return result
- def fetchmany(self,
- sql: str,
- data: Optional[Tuple[Any, ...]] = None,
- size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- cursor.execute(sql, data)
- result = cursor.fetchmany(size=size)
- return result
- def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
- pool = self.get_pool()
- with pool.connection() as conn:
- with conn.cursor(DictCursor) as cursor:
- try:
- cursor.execute(sql, data)
- result = conn.commit()
- return result
- except pymysql.err.IntegrityError as e:
- if e.args[0] == 1062: # 重复值
- return None
- else:
- raise e
- except pymysql.err.OperationalError as e:
- if e.args[0] == 1205: # 死锁
- conn.rollback()
- return None
- else:
- raise e
- mysql = SyncMySQLHelper()
|