sync_mysql_help.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import os
  2. import time
  3. from loguru import logger
  4. import pymysql
  5. from dotenv import load_dotenv, find_dotenv
  6. from typing import Tuple, Any, Dict, Literal, Optional
  7. from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
  8. from dbutils.steady_db import SteadyDBCursor
  9. from pymysql.cursors import DictCursor
  10. class SyncMySQLHelper(object):
  11. _pool: PooledDB = None
  12. _instance = None
  13. def __new__(cls, *args, **kwargs):
  14. """单例"""
  15. if cls._instance is None:
  16. cls._instance = super().__new__(cls, *args, **kwargs)
  17. return cls._instance
  18. def get_pool(self):
  19. if self._pool is None:
  20. # 加载环境变量,允许通过 .env 配置本机调试数据库
  21. load_dotenv(find_dotenv(), override=False)
  22. env = (os.getenv('APP_ENV') or os.getenv('ENV') or 'local').lower()
  23. logger.info(f"✅ env = {env}")
  24. host = os.getenv('DB_HOST', 'rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com')
  25. port = int(os.getenv('DB_PORT', '3306'))
  26. user = os.getenv('DB_USER', 'content_rw')
  27. password = os.getenv('DB_PASSWORD', 'bC1aH4bA1lB0')
  28. database = os.getenv('DB_NAME', 'content-deconstruction-supply')
  29. logger.info(f"✅ 当前使用数据库 : {database}")
  30. # 防止调度任务因网络/DB 抖动“无限阻塞”
  31. # - connect_timeout: 建连超时(秒)
  32. # - read_timeout/write_timeout: 单次读写超时(秒)
  33. # - blocking=False: 连接池耗尽时直接抛错,避免卡死整个 job
  34. connect_timeout = int(os.getenv("DB_CONNECT_TIMEOUT", "30"))
  35. read_timeout = int(os.getenv("DB_READ_TIMEOUT", "50"))
  36. write_timeout = int(os.getenv("DB_WRITE_TIMEOUT", "50"))
  37. # 注意:mincached 过大时会在初始化阶段“批量建连”,DB 抖动会直接把启动拖垮。
  38. # 这里改为懒加载,按需建连。
  39. mincached = int(os.getenv("DB_POOL_MINCACHED", "0"))
  40. maxconnections = int(os.getenv("DB_POOL_MAXCONN", "20"))
  41. last_exc: Optional[Exception] = None
  42. for attempt in range(2):
  43. try:
  44. self._pool = PooledDB(
  45. creator=pymysql,
  46. mincached=mincached,
  47. maxconnections=maxconnections,
  48. blocking=False,
  49. maxusage=1000,
  50. ping=1,
  51. host=host,
  52. port=port,
  53. user=user,
  54. password=password,
  55. database=database,
  56. connect_timeout=connect_timeout,
  57. read_timeout=read_timeout,
  58. write_timeout=write_timeout,
  59. charset="utf8mb4",
  60. )
  61. last_exc = None
  62. break
  63. except Exception as exc:
  64. # 保持 _pool 为 None,下一轮调度可继续重试建池
  65. self._pool = None
  66. last_exc = exc
  67. logger.exception(f"❌ 初始化数据库连接池失败(第{attempt + 1}次): {exc}")
  68. time.sleep(0.5)
  69. if last_exc is not None:
  70. raise last_exc
  71. return self._pool
  72. def _conn(self):
  73. pool = self.get_pool()
  74. try:
  75. # DBUtils 在 blocking=False 时,连接不足会直接抛异常
  76. # 这里统一捕获并打印,避免外层任务“静默卡住”
  77. return pool.connection()
  78. except Exception as exc:
  79. logger.exception(f"❌ 获取数据库连接失败: {exc}")
  80. raise
  81. def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
  82. with self._conn() as conn:
  83. with conn.cursor(DictCursor) as cursor:
  84. cursor.execute(sql, data)
  85. result = cursor.fetchone()
  86. return result
  87. def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
  88. with self._conn() as conn:
  89. with conn.cursor(DictCursor) as cursor:
  90. cursor.execute(sql, data)
  91. result = cursor.fetchall()
  92. return result
  93. def fetchmany(self,
  94. sql: str,
  95. data: Optional[Tuple[Any, ...]] = None,
  96. size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
  97. with self._conn() as conn:
  98. with conn.cursor(DictCursor) as cursor:
  99. cursor.execute(sql, data)
  100. result = cursor.fetchmany(size=size)
  101. return result
  102. def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
  103. with self._conn() as conn:
  104. with conn.cursor(DictCursor) as cursor:
  105. try:
  106. cursor.execute(sql, data)
  107. result = conn.commit()
  108. return result
  109. except pymysql.err.IntegrityError as e:
  110. if e.args[0] == 1062: # 重复值
  111. return None
  112. else:
  113. raise e
  114. except pymysql.err.OperationalError as e:
  115. if e.args[0] == 1205: # 死锁
  116. conn.rollback()
  117. return None
  118. else:
  119. raise e
  120. mysql = SyncMySQLHelper()