sync_mysql_help.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import os
  2. import pymysql
  3. from dotenv import load_dotenv
  4. from typing import Tuple, Any, Dict, Literal, Optional
  5. from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
  6. from dbutils.steady_db import SteadyDBCursor
  7. from pymysql.cursors import DictCursor
  8. class SyncMySQLHelper(object):
  9. _pool: PooledDB = None
  10. _instance = None
  11. def __new__(cls, *args, **kwargs):
  12. """单例"""
  13. if cls._instance is None:
  14. cls._instance = super().__new__(cls, *args, **kwargs)
  15. return cls._instance
  16. def get_pool(self):
  17. if self._pool is None:
  18. # 加载环境变量,允许通过 .env 配置本机调试数据库
  19. load_dotenv()
  20. env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
  21. logger.info(f"✅ env = {env}")
  22. host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
  23. port = int(os.getenv('DB_PORT', '3306'))
  24. user = os.getenv('DB_USER', 'content_rw')
  25. password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
  26. database = os.getenv('DB_NAME', 'content-deconstruction-test' if env in ('local','dev','development') else 'content-deconstruction')
  27. self._pool = PooledDB(
  28. creator=pymysql,
  29. mincached=10,
  30. maxconnections=20,
  31. blocking=True,
  32. host=host,
  33. port=port,
  34. user=user,
  35. password=password,
  36. database=database)
  37. return self._pool
  38. def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
  39. pool = self.get_pool()
  40. with pool.connection() as conn:
  41. with conn.cursor(DictCursor) as cursor:
  42. cursor.execute(sql, data)
  43. result = cursor.fetchone()
  44. return result
  45. def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
  46. pool = self.get_pool()
  47. with pool.connection() as conn:
  48. with conn.cursor(DictCursor) as cursor:
  49. cursor.execute(sql, data)
  50. result = cursor.fetchall()
  51. return result
  52. def fetchmany(self,
  53. sql: str,
  54. data: Optional[Tuple[Any, ...]] = None,
  55. size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
  56. pool = self.get_pool()
  57. with pool.connection() as conn:
  58. with conn.cursor(DictCursor) as cursor:
  59. cursor.execute(sql, data)
  60. result = cursor.fetchmany(size=size)
  61. return result
  62. def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
  63. pool = self.get_pool()
  64. with pool.connection() as conn:
  65. with conn.cursor(DictCursor) as cursor:
  66. try:
  67. cursor.execute(sql, data)
  68. result = conn.commit()
  69. return result
  70. except pymysql.err.IntegrityError as e:
  71. if e.args[0] == 1062: # 重复值
  72. return None
  73. else:
  74. raise e
  75. except pymysql.err.OperationalError as e:
  76. if e.args[0] == 1205: # 死锁
  77. conn.rollback()
  78. return None
  79. else:
  80. raise e
  81. mysql = SyncMySQLHelper()