sync_mysql_help.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import os
  2. import pymysql
  3. from dotenv import load_dotenv
  4. from typing import Tuple, Any, Dict, Literal, Optional
  5. from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
  6. from dbutils.steady_db import SteadyDBCursor
  7. from pymysql.cursors import DictCursor
  8. class SyncMySQLHelper(object):
  9. _pool: PooledDB = None
  10. _instance = None
  11. def __new__(cls, *args, **kwargs):
  12. """单例"""
  13. if cls._instance is None:
  14. cls._instance = super().__new__(cls, *args, **kwargs)
  15. return cls._instance
  16. def get_pool(self):
  17. if self._pool is None:
  18. # 加载环境变量,允许通过 .env 配置本机调试数据库
  19. load_dotenv()
  20. env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
  21. host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
  22. port = int(os.getenv('DB_PORT', '3306'))
  23. user = os.getenv('DB_USER', 'content_rw')
  24. password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
  25. database = os.getenv('DB_NAME', 'content-deconstruction-test' if env in ('local','dev','development') else 'content-deconstruction')
  26. self._pool = PooledDB(
  27. creator=pymysql,
  28. mincached=10,
  29. maxconnections=20,
  30. blocking=True,
  31. host=host,
  32. port=port,
  33. user=user,
  34. password=password,
  35. database=database)
  36. return self._pool
  37. def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
  38. pool = self.get_pool()
  39. with pool.connection() as conn:
  40. with conn.cursor(DictCursor) as cursor:
  41. cursor.execute(sql, data)
  42. result = cursor.fetchone()
  43. return result
  44. def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
  45. pool = self.get_pool()
  46. with pool.connection() as conn:
  47. with conn.cursor(DictCursor) as cursor:
  48. cursor.execute(sql, data)
  49. result = cursor.fetchall()
  50. return result
  51. def fetchmany(self,
  52. sql: str,
  53. data: Optional[Tuple[Any, ...]] = None,
  54. size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
  55. pool = self.get_pool()
  56. with pool.connection() as conn:
  57. with conn.cursor(DictCursor) as cursor:
  58. cursor.execute(sql, data)
  59. result = cursor.fetchmany(size=size)
  60. return result
  61. def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
  62. pool = self.get_pool()
  63. with pool.connection() as conn:
  64. with conn.cursor(DictCursor) as cursor:
  65. try:
  66. cursor.execute(sql, data)
  67. result = conn.commit()
  68. return result
  69. except pymysql.err.IntegrityError as e:
  70. if e.args[0] == 1062: # 重复值
  71. return None
  72. else:
  73. raise e
  74. except pymysql.err.OperationalError as e:
  75. if e.args[0] == 1205: # 死锁
  76. conn.rollback()
  77. return None
  78. else:
  79. raise e
  80. mysql = SyncMySQLHelper()