sync_mysql_help.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import os
  2. from loguru import logger
  3. import pymysql
  4. from dotenv import load_dotenv
  5. from typing import Tuple, Any, Dict, Literal, Optional
  6. from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
  7. from dbutils.steady_db import SteadyDBCursor
  8. from pymysql.cursors import DictCursor
  9. class SyncMySQLHelper(object):
  10. _pool: PooledDB = None
  11. _instance = None
  12. def __new__(cls, *args, **kwargs):
  13. """单例"""
  14. if cls._instance is None:
  15. cls._instance = super().__new__(cls, *args, **kwargs)
  16. return cls._instance
  17. def get_pool(self):
  18. if self._pool is None:
  19. # 加载环境变量,允许通过 .env 配置本机调试数据库
  20. load_dotenv()
  21. env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
  22. logger.info(f"✅ env = {env}")
  23. host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
  24. port = int(os.getenv('DB_PORT', '3306'))
  25. user = os.getenv('DB_USER', 'content_rw')
  26. password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
  27. database = os.getenv('DB_NAME', 'content-deconstruction-test' if env in ('local','dev','development') else 'content-deconstruction')
  28. self._pool = PooledDB(
  29. creator=pymysql,
  30. mincached=10,
  31. maxconnections=20,
  32. blocking=True,
  33. host=host,
  34. port=port,
  35. user=user,
  36. password=password,
  37. database=database)
  38. return self._pool
  39. def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
  40. pool = self.get_pool()
  41. with pool.connection() as conn:
  42. with conn.cursor(DictCursor) as cursor:
  43. cursor.execute(sql, data)
  44. result = cursor.fetchone()
  45. return result
  46. def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
  47. pool = self.get_pool()
  48. with pool.connection() as conn:
  49. with conn.cursor(DictCursor) as cursor:
  50. cursor.execute(sql, data)
  51. result = cursor.fetchall()
  52. return result
  53. def fetchmany(self,
  54. sql: str,
  55. data: Optional[Tuple[Any, ...]] = None,
  56. size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
  57. pool = self.get_pool()
  58. with pool.connection() as conn:
  59. with conn.cursor(DictCursor) as cursor:
  60. cursor.execute(sql, data)
  61. result = cursor.fetchmany(size=size)
  62. return result
  63. def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
  64. pool = self.get_pool()
  65. with pool.connection() as conn:
  66. with conn.cursor(DictCursor) as cursor:
  67. try:
  68. cursor.execute(sql, data)
  69. result = conn.commit()
  70. return result
  71. except pymysql.err.IntegrityError as e:
  72. if e.args[0] == 1062: # 重复值
  73. return None
  74. else:
  75. raise e
  76. except pymysql.err.OperationalError as e:
  77. if e.args[0] == 1205: # 死锁
  78. conn.rollback()
  79. return None
  80. else:
  81. raise e
  82. mysql = SyncMySQLHelper()