mysql_client.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. from __future__ import annotations
  2. from contextlib import contextmanager
  3. from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple
  4. import pymysql
  5. from pymysql.cursors import DictCursor
  6. from .errors import MySQLConnectionError, MySQLQueryError
  7. from .types import ExecResult, MySQLConfig, Params
  8. try:
  9. # Optional dependency for connection pooling.
  10. from dbutils.pooled_db import PooledDB # type: ignore
  11. _HAS_DBUTILS_POOL = True
  12. except Exception: # pragma: no cover
  13. PooledDB = None # type: ignore
  14. _HAS_DBUTILS_POOL = False
  15. def _normalize_params(params: Params) -> Any:
  16. if params is None:
  17. return None
  18. # pymysql supports Mapping (dict) and Sequence (tuple/list).
  19. if isinstance(params, (list, tuple)):
  20. return tuple(params)
  21. return params
  22. class MySQLClient:
  23. """
  24. Synchronous MySQL client based on `pymysql`.
  25. Main APIs:
  26. - fetchone / fetchall / fetchmany for SELECT queries
  27. - execute / executemany for INSERT/UPDATE/DELETE/DDL
  28. - transaction() context manager for multi-statement transactions
  29. """
  30. def __init__(self, config: MySQLConfig):
  31. self._config = config
  32. self._pool = None
  33. if self._config.use_pool and _HAS_DBUTILS_POOL:
  34. conn_kwargs: Dict[str, Any] = dict(
  35. host=self._config.host,
  36. port=self._config.port,
  37. user=self._config.user,
  38. password=self._config.password,
  39. database=self._config.database,
  40. charset=self._config.charset,
  41. connect_timeout=self._config.connect_timeout,
  42. read_timeout=self._config.read_timeout,
  43. write_timeout=self._config.write_timeout,
  44. autocommit=self._config.autocommit,
  45. )
  46. self._pool = PooledDB(
  47. creator=pymysql,
  48. mincached=self._config.pool_mincached,
  49. maxconnections=self._config.pool_maxconnections,
  50. blocking=True,
  51. **conn_kwargs,
  52. )
  53. @property
  54. def config(self) -> MySQLConfig:
  55. return self._config
  56. def open_connection(self) -> pymysql.connections.Connection:
  57. """
  58. Open a raw pymysql connection.
  59. Intended usage:
  60. - transaction() keep the same connection for multiple operations
  61. - other cases where you want manual connection lifecycle
  62. Note:
  63. - Caller must close the connection via `close_connection()`.
  64. """
  65. if self._pool is not None:
  66. return self._pool.connection()
  67. return self._connect()
  68. def close_connection(self, connection: pymysql.connections.Connection) -> None:
  69. """Close a previously opened connection (returns to pool when applicable)."""
  70. try:
  71. connection.close()
  72. except Exception:
  73. pass
  74. def _connect(self) -> pymysql.connections.Connection:
  75. try:
  76. return pymysql.connect(
  77. host=self._config.host,
  78. port=self._config.port,
  79. user=self._config.user,
  80. password=self._config.password,
  81. database=self._config.database,
  82. charset=self._config.charset,
  83. cursorclass=DictCursor,
  84. connect_timeout=self._config.connect_timeout,
  85. read_timeout=self._config.read_timeout,
  86. write_timeout=self._config.write_timeout,
  87. autocommit=self._config.autocommit,
  88. )
  89. except Exception as e: # pragma: no cover
  90. raise MySQLConnectionError(
  91. f"MySQL connection failed (source={self._config.source}, host={self._config.host}, db={self._config.database}): {e}"
  92. ) from e
  93. @contextmanager
  94. def _cursor(self) -> Iterator[Tuple[pymysql.connections.Connection, DictCursor]]:
  95. """
  96. Yield (connection, cursor) using DictCursor by default.
  97. Uses pool connection if configured; otherwise creates a fresh connection.
  98. """
  99. if self._pool is not None:
  100. # DBUtils pooled connection supports context manager.
  101. conn = self._pool.connection()
  102. else:
  103. conn = self._connect()
  104. try:
  105. cursor = conn.cursor(DictCursor)
  106. try:
  107. yield conn, cursor
  108. finally:
  109. cursor.close()
  110. finally:
  111. try:
  112. conn.close()
  113. except Exception:
  114. pass
  115. def transaction(self):
  116. """
  117. Transaction context manager.
  118. - Commits on success when autocommit is False.
  119. - Rolls back on exception.
  120. """
  121. @contextmanager
  122. def _tx():
  123. if self._pool is not None:
  124. conn = self._pool.connection()
  125. else:
  126. conn = self._connect()
  127. # If autocommit=False, commit/rollback controls are meaningful.
  128. tx_active = not self._config.autocommit
  129. try:
  130. cursor = conn.cursor(DictCursor)
  131. try:
  132. yield cursor
  133. finally:
  134. cursor.close()
  135. if tx_active:
  136. conn.commit()
  137. except Exception:
  138. if tx_active:
  139. try:
  140. conn.rollback()
  141. except Exception:
  142. pass
  143. raise
  144. finally:
  145. try:
  146. conn.close()
  147. except Exception:
  148. pass
  149. return _tx()
  150. def fetchone(self, sql: str, params: Params = None) -> Optional[Dict[str, Any]]:
  151. with self._cursor() as (conn, cursor):
  152. try:
  153. cursor.execute(sql, _normalize_params(params))
  154. return cursor.fetchone()
  155. except Exception as e:
  156. raise MySQLQueryError(f"fetchone failed: {e} | sql={sql}") from e
  157. def fetchall(
  158. self,
  159. sql: str,
  160. params: Params = None,
  161. *,
  162. max_rows: Optional[int] = None,
  163. ) -> List[Dict[str, Any]]:
  164. with self._cursor() as (conn, cursor):
  165. try:
  166. cursor.execute(sql, _normalize_params(params))
  167. if max_rows is None:
  168. return list(cursor.fetchall())
  169. # Cursor fetchall has no max; fallback to fetchmany.
  170. out: List[Dict[str, Any]] = []
  171. while True:
  172. batch = cursor.fetchmany(size=max_rows - len(out))
  173. if not batch:
  174. break
  175. out.extend(batch)
  176. if len(out) >= max_rows:
  177. break
  178. return out
  179. except Exception as e:
  180. raise MySQLQueryError(f"fetchall failed: {e} | sql={sql}") from e
  181. def fetchmany(
  182. self,
  183. sql: str,
  184. params: Params = None,
  185. *,
  186. size: int = 100,
  187. ) -> List[Dict[str, Any]]:
  188. with self._cursor() as (conn, cursor):
  189. try:
  190. cursor.execute(sql, _normalize_params(params))
  191. return list(cursor.fetchmany(size=size))
  192. except Exception as e:
  193. raise MySQLQueryError(f"fetchmany failed: {e} | sql={sql}") from e
  194. def execute(
  195. self,
  196. sql: str,
  197. params: Params = None,
  198. *,
  199. commit: Optional[bool] = None,
  200. ignore_duplicate: bool = False,
  201. ignore_deadlock: bool = False,
  202. ) -> ExecResult:
  203. """
  204. Execute a write statement.
  205. Args:
  206. commit:
  207. - None: follow config.autocommit (当 `autocommit=False` 时默认会 commit)
  208. - True/False: force commit/rollback behavior
  209. ignore_duplicate:
  210. If True, silently ignore MySQL duplicate-key errors (1062).
  211. ignore_deadlock:
  212. If True, rollback and silently ignore deadlocks (1205).
  213. """
  214. # Commit semantics:
  215. # - autocommit=True: no explicit commit is required/possible.
  216. # - autocommit=False:
  217. # - commit is None: commit by default
  218. # - commit=True: force commit
  219. # - commit=False: skip commit
  220. commit_enabled = (not self._config.autocommit) and (True if commit is None else bool(commit))
  221. with self._cursor() as (conn, cursor):
  222. try:
  223. cursor.execute(sql, _normalize_params(params))
  224. if commit_enabled:
  225. conn.commit()
  226. return ExecResult(
  227. rowcount=int(cursor.rowcount or 0),
  228. lastrowid=getattr(cursor, "lastrowid", None),
  229. )
  230. except pymysql.err.IntegrityError as e:
  231. if ignore_duplicate and getattr(e, "args", None) and e.args and e.args[0] == 1062:
  232. if not self._config.autocommit:
  233. conn.rollback()
  234. return ExecResult(rowcount=0, lastrowid=None)
  235. raise MySQLQueryError(f"execute failed (IntegrityError): {e} | sql={sql}") from e
  236. except pymysql.err.OperationalError as e:
  237. if ignore_deadlock and getattr(e, "args", None) and e.args and e.args[0] == 1205:
  238. if not self._config.autocommit:
  239. conn.rollback()
  240. return ExecResult(rowcount=0, lastrowid=None)
  241. raise MySQLQueryError(f"execute failed (OperationalError): {e} | sql={sql}") from e
  242. except Exception as e:
  243. if not self._config.autocommit:
  244. try:
  245. conn.rollback()
  246. except Exception:
  247. pass
  248. raise MySQLQueryError(f"execute failed: {e} | sql={sql}") from e
  249. def executemany(
  250. self,
  251. sql: str,
  252. params_seq: Sequence[Params],
  253. *,
  254. commit: Optional[bool] = None,
  255. ignore_duplicate: bool = False,
  256. ignore_deadlock: bool = False,
  257. ) -> ExecResult:
  258. """
  259. Execute a statement against multiple parameter sets.
  260. Note:
  261. If ignore_duplicate/ignore_deadlock are enabled, we fall back to per-row execution
  262. to emulate "ignore" semantics without terminating the whole batch.
  263. """
  264. commit_enabled = (not self._config.autocommit) and (True if commit is None else bool(commit))
  265. if (ignore_duplicate or ignore_deadlock) and self._config.autocommit is False:
  266. # Fallback: execute one-by-one so we can ignore specific errors.
  267. total = 0
  268. lastrowid: Optional[int] = None
  269. for p in params_seq:
  270. res = self.execute(
  271. sql,
  272. p,
  273. commit=commit,
  274. ignore_duplicate=ignore_duplicate,
  275. ignore_deadlock=ignore_deadlock,
  276. )
  277. total += res.rowcount
  278. lastrowid = res.lastrowid
  279. return ExecResult(rowcount=total, lastrowid=lastrowid)
  280. with self._cursor() as (conn, cursor):
  281. try:
  282. cursor.executemany(sql, [_normalize_params(p) for p in params_seq])
  283. if commit_enabled:
  284. conn.commit()
  285. return ExecResult(
  286. rowcount=int(cursor.rowcount or 0),
  287. lastrowid=getattr(cursor, "lastrowid", None),
  288. )
  289. except pymysql.err.IntegrityError as e:
  290. if ignore_duplicate and getattr(e, "args", None) and e.args and e.args[0] == 1062:
  291. if not self._config.autocommit:
  292. conn.rollback()
  293. return ExecResult(rowcount=0, lastrowid=None)
  294. raise MySQLQueryError(f"executemany failed (IntegrityError): {e} | sql={sql}") from e
  295. except pymysql.err.OperationalError as e:
  296. if ignore_deadlock and getattr(e, "args", None) and e.args and e.args[0] == 1205:
  297. if not self._config.autocommit:
  298. conn.rollback()
  299. return ExecResult(rowcount=0, lastrowid=None)
  300. raise MySQLQueryError(f"executemany failed (OperationalError): {e} | sql={sql}") from e
  301. except Exception as e:
  302. if not self._config.autocommit:
  303. try:
  304. conn.rollback()
  305. except Exception:
  306. pass
  307. raise MySQLQueryError(f"executemany failed: {e} | sql={sql}") from e