sync_mysql_help.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import pymysql
  2. from typing import Tuple, Any, Dict, Literal, Optional
  3. from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
  4. from dbutils.steady_db import SteadyDBCursor
  5. from pymysql.cursors import DictCursor
  6. class SyncMySQLHelper(object):
  7. _pool: PooledDB = None
  8. _instance = None
  9. def __new__(cls, *args, **kwargs):
  10. """单例"""
  11. if cls._instance is None:
  12. cls._instance = super().__new__(cls, *args, **kwargs)
  13. return cls._instance
  14. def get_pool(self):
  15. if self._pool is None:
  16. self._pool = PooledDB(
  17. creator=pymysql,
  18. mincached=10,
  19. maxconnections=20,
  20. blocking=True,
  21. host='rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com',
  22. port=3306,
  23. user='content_rw',
  24. password='bC1aH4bA1lB0',
  25. database='content-deconstruction')
  26. return self._pool
  27. def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
  28. pool = self.get_pool()
  29. with pool.connection() as conn:
  30. with conn.cursor(DictCursor) as cursor:
  31. cursor.execute(sql, data)
  32. result = cursor.fetchone()
  33. return result
  34. def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
  35. pool = self.get_pool()
  36. with pool.connection() as conn:
  37. with conn.cursor(DictCursor) as cursor:
  38. cursor.execute(sql, data)
  39. result = cursor.fetchall()
  40. return result
  41. def fetchmany(self,
  42. sql: str,
  43. data: Optional[Tuple[Any, ...]] = None,
  44. size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
  45. pool = self.get_pool()
  46. with pool.connection() as conn:
  47. with conn.cursor(DictCursor) as cursor:
  48. cursor.execute(sql, data)
  49. result = cursor.fetchmany(size=size)
  50. return result
  51. def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
  52. pool = self.get_pool()
  53. with pool.connection() as conn:
  54. with conn.cursor(DictCursor) as cursor:
  55. try:
  56. cursor.execute(sql, data)
  57. result = conn.commit()
  58. return result
  59. except pymysql.err.IntegrityError as e:
  60. if e.args[0] == 1062: # 重复值
  61. return None
  62. else:
  63. raise e
  64. except pymysql.err.OperationalError as e:
  65. if e.args[0] == 1205: # 死锁
  66. conn.rollback()
  67. return None
  68. else:
  69. raise e
  70. mysql = SyncMySQLHelper()