sync_mysql_help.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import os
  2. from loguru import logger
  3. import pymysql
  4. from dotenv import load_dotenv, find_dotenv
  5. from typing import Tuple, Any, Dict, Literal, Optional
  6. from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
  7. from dbutils.steady_db import SteadyDBCursor
  8. from pymysql.cursors import DictCursor
  9. class SyncMySQLHelper(object):
  10. _pool: PooledDB = None
  11. _instance = None
  12. def __new__(cls, *args, **kwargs):
  13. """单例"""
  14. if cls._instance is None:
  15. cls._instance = super().__new__(cls, *args, **kwargs)
  16. return cls._instance
  17. def get_pool(self):
  18. if self._pool is None:
  19. # 加载环境变量,允许通过 .env 配置本机调试数据库
  20. load_dotenv(find_dotenv(), override=False)
  21. env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
  22. logger.info(f"✅ env = {env}")
  23. host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
  24. port = int(os.getenv('DB_PORT', '3306'))
  25. user = os.getenv('DB_USER', 'content_rw')
  26. password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
  27. database = os.getenv('DB_NAME', 'content-deconstruction-test' if env in ('local','dev','development') else 'content-deconstruction')
  28. logger.info(f"✅ 当前使用数据库 : {database}")
  29. self._pool = PooledDB(
  30. creator=pymysql,
  31. mincached=10,
  32. maxconnections=20,
  33. blocking=True,
  34. host=host,
  35. port=port,
  36. user=user,
  37. password=password,
  38. database=database)
  39. return self._pool
  40. def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
  41. pool = self.get_pool()
  42. with pool.connection() as conn:
  43. with conn.cursor(DictCursor) as cursor:
  44. cursor.execute(sql, data)
  45. result = cursor.fetchone()
  46. return result
  47. def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
  48. pool = self.get_pool()
  49. with pool.connection() as conn:
  50. with conn.cursor(DictCursor) as cursor:
  51. cursor.execute(sql, data)
  52. result = cursor.fetchall()
  53. return result
  54. def fetchmany(self,
  55. sql: str,
  56. data: Optional[Tuple[Any, ...]] = None,
  57. size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
  58. pool = self.get_pool()
  59. with pool.connection() as conn:
  60. with conn.cursor(DictCursor) as cursor:
  61. cursor.execute(sql, data)
  62. result = cursor.fetchmany(size=size)
  63. return result
  64. def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
  65. pool = self.get_pool()
  66. with pool.connection() as conn:
  67. with conn.cursor(DictCursor) as cursor:
  68. try:
  69. cursor.execute(sql, data)
  70. result = conn.commit()
  71. return result
  72. except pymysql.err.IntegrityError as e:
  73. if e.args[0] == 1062: # 重复值
  74. return None
  75. else:
  76. raise e
  77. except pymysql.err.OperationalError as e:
  78. if e.args[0] == 1205: # 死锁
  79. conn.rollback()
  80. return None
  81. else:
  82. raise e
  83. mysql = SyncMySQLHelper()