mysql_db.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. from __future__ import annotations
  2. import math
  3. from contextlib import contextmanager
  4. from contextvars import ContextVar
  5. from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Tuple
  6. import pymysql
  7. from pymysql.cursors import DictCursor
  8. from .errors import (
  9. MySQLConnectionError,
  10. MySQLConfigError,
  11. MySQLQueryError,
  12. MySQLTransactionError,
  13. )
  14. from .mysql_manager import MySQLClientManager, get_global_manager
  15. from .mysql_client import MySQLClient
  16. _tx_connection_var: ContextVar[Optional[pymysql.connections.Connection]] = ContextVar(
  17. "examples_how_db_utils_tx_connection", default=None
  18. )
  19. def _normalize_where_params(where_params: Any) -> Optional[Tuple[Any, ...]]:
  20. if where_params is None:
  21. return None
  22. if isinstance(where_params, (list, tuple)):
  23. return tuple(where_params)
  24. if isinstance(where_params, dict):
  25. return tuple(where_params.values())
  26. # Fallback: keep as-is (pymysql supports sequences/tuples or mapping)
  27. return (where_params,)
  28. class MySQLDB:
  29. """
  30. High-level MySQL API (CRUD + advanced queries + transaction).
  31. Interface is aligned with `how_decode/utils/mysql/mysql_db` style.
  32. """
  33. def __init__(self, *, manager: MySQLClientManager, source: str = "default"):
  34. self._manager = manager
  35. self._source = source
  36. @property
  37. def source(self) -> str:
  38. return self._source
  39. def _client(self) -> MySQLClient:
  40. return self._manager.get_client(self._source)
  41. def _is_in_transaction(self) -> bool:
  42. return _tx_connection_var.get() is not None
  43. @contextmanager
  44. def _get_connection_and_cursor(
  45. self, connection: Optional[pymysql.connections.Connection] = None
  46. ) -> Iterator[Tuple[pymysql.connections.Connection, DictCursor, bool]]:
  47. """
  48. Returns (connection, cursor, should_close_connection).
  49. - If `connection` is provided: uses it and should_close_connection=False
  50. - Else if transaction connection exists: uses it and should_close_connection=False
  51. - Else opens a new connection: should_close_connection=True
  52. """
  53. client = self._client()
  54. tx_conn = _tx_connection_var.get()
  55. should_close = False
  56. actual_conn: Optional[pymysql.connections.Connection] = None
  57. if connection is not None:
  58. actual_conn = connection
  59. elif tx_conn is not None:
  60. actual_conn = tx_conn
  61. else:
  62. actual_conn = client.open_connection()
  63. should_close = True
  64. cursor = actual_conn.cursor(DictCursor)
  65. try:
  66. yield actual_conn, cursor, should_close
  67. finally:
  68. try:
  69. cursor.close()
  70. except Exception:
  71. pass
  72. if should_close:
  73. try:
  74. actual_conn.close()
  75. except Exception:
  76. pass
  77. @contextmanager
  78. def transaction(self, isolation_level: Optional[str] = None):
  79. """
  80. Transaction context manager.
  81. Important: when you call CRUD methods inside this context without passing `connection`,
  82. they will automatically reuse the same transaction connection (via ContextVar).
  83. """
  84. client = self._client()
  85. conn = None
  86. token = None
  87. try:
  88. conn = client.open_connection()
  89. # Ensure explicit transaction.
  90. conn.autocommit(False)
  91. if isolation_level:
  92. conn.execute(
  93. f"SET SESSION TRANSACTION ISOLATION LEVEL {isolation_level}"
  94. )
  95. conn.begin()
  96. token = _tx_connection_var.set(conn)
  97. yield conn
  98. conn.commit()
  99. except Exception as e:
  100. if conn is not None:
  101. try:
  102. conn.rollback()
  103. except Exception:
  104. pass
  105. raise MySQLTransactionError(
  106. message=f"transaction failed (source={self._source}): {e}",
  107. original_error=e,
  108. ) from e
  109. finally:
  110. if token is not None:
  111. _tx_connection_var.reset(token)
  112. if conn is not None:
  113. try:
  114. conn.close()
  115. except Exception:
  116. pass
  117. # -----------------------
  118. # Basic CRUD
  119. # -----------------------
  120. def select(
  121. self,
  122. table: str,
  123. columns: str = "*",
  124. where: str = "",
  125. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  126. order_by: str = "",
  127. limit: Optional[int] = None,
  128. connection: Optional[pymysql.connections.Connection] = None,
  129. ) -> List[Dict[str, Any]]:
  130. sql = f"SELECT {columns} FROM {table}"
  131. if where:
  132. sql += f" WHERE {where}"
  133. if order_by:
  134. sql += f" ORDER BY {order_by}"
  135. if limit is not None:
  136. sql += f" LIMIT {limit}"
  137. params = _normalize_where_params(where_params)
  138. try:
  139. with self._get_connection_and_cursor(connection) as (conn, cursor, should_close):
  140. cursor.execute(sql, params)
  141. return list(cursor.fetchall())
  142. except Exception as e:
  143. raise MySQLQueryError(
  144. message=f"select failed (source={self._source}): {e}",
  145. original_error=e,
  146. ) from e
  147. def select_one(
  148. self,
  149. table: str,
  150. columns: str = "*",
  151. where: str = "",
  152. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  153. connection: Optional[pymysql.connections.Connection] = None,
  154. ) -> Optional[Dict[str, Any]]:
  155. sql = f"SELECT {columns} FROM {table}"
  156. if where:
  157. sql += f" WHERE {where}"
  158. sql += " LIMIT 1"
  159. params = _normalize_where_params(where_params)
  160. try:
  161. with self._get_connection_and_cursor(connection) as (_conn, cursor, _should_close):
  162. cursor.execute(sql, params)
  163. return cursor.fetchone()
  164. except Exception as e:
  165. raise MySQLQueryError(
  166. message=f"select_one failed (source={self._source}): {e}",
  167. original_error=e,
  168. ) from e
  169. def insert(
  170. self,
  171. table: str,
  172. data: Dict[str, Any],
  173. connection: Optional[pymysql.connections.Connection] = None,
  174. ) -> int:
  175. if not data:
  176. raise ValueError("insert data must not be empty")
  177. columns = list(data.keys())
  178. placeholders = ", ".join(["%s"] * len(columns))
  179. sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
  180. params = tuple(data.values())
  181. conn: Optional[pymysql.connections.Connection] = None
  182. try:
  183. with self._get_connection_and_cursor(connection) as (conn, cursor, should_close):
  184. cursor.execute(sql, params)
  185. # If not inside transaction, auto commit.
  186. if not self._is_in_transaction() and should_close:
  187. conn.commit()
  188. return int(getattr(cursor, "lastrowid", 0) or 0)
  189. except Exception as e:
  190. # If we opened a connection ourselves, rollback to be safe.
  191. if conn is not None and connection is None and not self._is_in_transaction():
  192. try:
  193. conn.rollback()
  194. except Exception:
  195. pass
  196. raise MySQLQueryError(
  197. message=f"insert failed (source={self._source}): {e}",
  198. original_error=e,
  199. ) from e
  200. def insert_many(
  201. self,
  202. table: str,
  203. data_list: List[Dict[str, Any]],
  204. connection: Optional[pymysql.connections.Connection] = None,
  205. ) -> int:
  206. if not data_list:
  207. raise ValueError("insert_many data_list must not be empty")
  208. columns = list(data_list[0].keys())
  209. placeholders = ", ".join(["%s"] * len(columns))
  210. sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
  211. params_list = [tuple(d[col] for col in columns) for d in data_list]
  212. conn: Optional[pymysql.connections.Connection] = None
  213. try:
  214. with self._get_connection_and_cursor(connection) as (conn, cursor, should_close):
  215. cursor.executemany(sql, params_list)
  216. if not self._is_in_transaction() and should_close:
  217. conn.commit()
  218. return int(cursor.rowcount or 0)
  219. except Exception as e:
  220. if conn is not None and connection is None and not self._is_in_transaction():
  221. try:
  222. conn.rollback()
  223. except Exception:
  224. pass
  225. raise MySQLQueryError(
  226. message=f"insert_many failed (source={self._source}): {e}",
  227. original_error=e,
  228. ) from e
  229. def update(
  230. self,
  231. table: str,
  232. data: Dict[str, Any],
  233. where: str,
  234. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  235. connection: Optional[pymysql.connections.Connection] = None,
  236. ) -> int:
  237. if not data:
  238. raise ValueError("update data must not be empty")
  239. set_clause = ", ".join([f"{col}=%s" for col in data.keys()])
  240. sql = f"UPDATE {table} SET {set_clause} WHERE {where}"
  241. params: List[Any] = list(data.values())
  242. wp = _normalize_where_params(where_params)
  243. if wp is not None:
  244. params.extend(list(wp))
  245. conn: Optional[pymysql.connections.Connection] = None
  246. try:
  247. with self._get_connection_and_cursor(connection) as (conn, cursor, should_close):
  248. cursor.execute(sql, tuple(params))
  249. if not self._is_in_transaction() and should_close:
  250. conn.commit()
  251. return int(cursor.rowcount or 0)
  252. except Exception as e:
  253. if conn is not None and connection is None and not self._is_in_transaction():
  254. try:
  255. conn.rollback()
  256. except Exception:
  257. pass
  258. raise MySQLQueryError(
  259. message=f"update failed (source={self._source}): {e}",
  260. original_error=e,
  261. ) from e
  262. def delete(
  263. self,
  264. table: str,
  265. where: str,
  266. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  267. connection: Optional[pymysql.connections.Connection] = None,
  268. ) -> int:
  269. sql = f"DELETE FROM {table} WHERE {where}"
  270. params = _normalize_where_params(where_params)
  271. conn: Optional[pymysql.connections.Connection] = None
  272. try:
  273. with self._get_connection_and_cursor(connection) as (conn, cursor, should_close):
  274. cursor.execute(sql, params)
  275. if not self._is_in_transaction() and should_close:
  276. conn.commit()
  277. return int(cursor.rowcount or 0)
  278. except Exception as e:
  279. if conn is not None and connection is None and not self._is_in_transaction():
  280. try:
  281. conn.rollback()
  282. except Exception:
  283. pass
  284. raise MySQLQueryError(
  285. message=f"delete failed (source={self._source}): {e}",
  286. original_error=e,
  287. ) from e
  288. def execute_many(
  289. self,
  290. sql: str,
  291. params_list: List[Sequence[Any] | Mapping[str, Any]],
  292. connection: Optional[pymysql.connections.Connection] = None,
  293. ) -> int:
  294. params_seq = [_normalize_where_params(p) for p in params_list]
  295. conn: Optional[pymysql.connections.Connection] = None
  296. try:
  297. with self._get_connection_and_cursor(connection) as (conn, cursor, should_close):
  298. cursor.executemany(sql, params_seq)
  299. if not self._is_in_transaction() and should_close:
  300. conn.commit()
  301. return int(cursor.rowcount or 0)
  302. except Exception as e:
  303. if conn is not None and connection is None and not self._is_in_transaction():
  304. try:
  305. conn.rollback()
  306. except Exception:
  307. pass
  308. raise MySQLQueryError(
  309. message=f"execute_many failed (source={self._source}): {e}",
  310. original_error=e,
  311. ) from e
  312. # -----------------------
  313. # Query helpers
  314. # -----------------------
  315. def count(
  316. self,
  317. table: str,
  318. where: str = "",
  319. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  320. connection: Optional[pymysql.connections.Connection] = None,
  321. ) -> int:
  322. sql = f"SELECT COUNT(*) as count FROM {table}"
  323. if where:
  324. sql += f" WHERE {where}"
  325. try:
  326. params = _normalize_where_params(where_params)
  327. with self._get_connection_and_cursor(connection) as (_conn, cursor, _should_close):
  328. cursor.execute(sql, params)
  329. r = cursor.fetchone()
  330. if not r:
  331. return 0
  332. return int(r.get("count") or 0)
  333. except Exception as e:
  334. raise MySQLQueryError(
  335. message=f"count failed (source={self._source}): {e}",
  336. original_error=e,
  337. ) from e
  338. def exists(
  339. self,
  340. table: str,
  341. where: str,
  342. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  343. connection: Optional[pymysql.connections.Connection] = None,
  344. ) -> bool:
  345. return self.count(
  346. table, where=where, where_params=where_params, connection=connection
  347. ) > 0
  348. def paginate(
  349. self,
  350. table: str,
  351. page: int = 1,
  352. page_size: int = 20,
  353. columns: str = "*",
  354. where: str = "",
  355. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  356. order_by: str = "",
  357. connection: Optional[pymysql.connections.Connection] = None,
  358. ) -> Dict[str, Any]:
  359. if page < 1:
  360. page = 1
  361. if page_size < 1:
  362. page_size = 20
  363. total_count = self.count(
  364. table, where=where, where_params=where_params, connection=connection
  365. )
  366. total_pages = math.ceil(total_count / page_size) if total_count > 0 else 1
  367. offset = (page - 1) * page_size
  368. sql = f"SELECT {columns} FROM {table}"
  369. if where:
  370. sql += f" WHERE {where}"
  371. if order_by:
  372. sql += f" ORDER BY {order_by}"
  373. sql += f" LIMIT {page_size} OFFSET {offset}"
  374. params = _normalize_where_params(where_params)
  375. try:
  376. with self._get_connection_and_cursor(connection) as (_conn, cursor, _should_close):
  377. cursor.execute(sql, params)
  378. data = list(cursor.fetchall())
  379. except Exception as e:
  380. raise MySQLQueryError(
  381. message=f"paginate failed (source={self._source}): {e}",
  382. original_error=e,
  383. ) from e
  384. return {
  385. "data": data,
  386. "pagination": {
  387. "current_page": page,
  388. "page_size": page_size,
  389. "total_count": total_count,
  390. "total_pages": total_pages,
  391. "has_prev": page > 1,
  392. "has_next": page < total_pages,
  393. "prev_page": page - 1 if page > 1 else None,
  394. "next_page": page + 1 if page < total_pages else None,
  395. },
  396. }
  397. def select_with_sort(
  398. self,
  399. table: str,
  400. columns: str = "*",
  401. where: str = "",
  402. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  403. sort_field: str = "id",
  404. sort_order: str = "ASC",
  405. limit: Optional[int] = None,
  406. connection: Optional[pymysql.connections.Connection] = None,
  407. ) -> List[Dict[str, Any]]:
  408. sort_order = (sort_order or "").upper()
  409. if sort_order not in ["ASC", "DESC"]:
  410. sort_order = "ASC"
  411. order_by = f"{sort_field} {sort_order}"
  412. return self.select(
  413. table,
  414. columns=columns,
  415. where=where,
  416. where_params=where_params,
  417. order_by=order_by,
  418. limit=limit,
  419. connection=connection,
  420. )
  421. def select_with_multiple_sort(
  422. self,
  423. table: str,
  424. columns: str = "*",
  425. where: str = "",
  426. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  427. sort_fields: Optional[List[Tuple[str, str]]] = None,
  428. limit: Optional[int] = None,
  429. connection: Optional[pymysql.connections.Connection] = None,
  430. ) -> List[Dict[str, Any]]:
  431. order_by = ""
  432. if sort_fields:
  433. parts: List[str] = []
  434. for field, order in sort_fields:
  435. order_u = (order or "").upper()
  436. if order_u not in ["ASC", "DESC"]:
  437. order_u = "ASC"
  438. parts.append(f"{field} {order_u}")
  439. order_by = ", ".join(parts)
  440. return self.select(
  441. table,
  442. columns=columns,
  443. where=where,
  444. where_params=where_params,
  445. order_by=order_by,
  446. limit=limit,
  447. connection=connection,
  448. )
  449. def aggregate(
  450. self,
  451. table: str,
  452. agg_functions: Dict[str, str],
  453. where: str = "",
  454. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  455. group_by: str = "",
  456. having: str = "",
  457. having_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  458. connection: Optional[pymysql.connections.Connection] = None,
  459. ) -> List[Dict[str, Any]]:
  460. if not agg_functions:
  461. raise ValueError("agg_functions must not be empty")
  462. select_parts: List[str] = []
  463. if group_by:
  464. select_parts.append(group_by)
  465. for alias, func in agg_functions.items():
  466. select_parts.append(f"{func} AS {alias}")
  467. sql = f"SELECT {', '.join(select_parts)} FROM {table}"
  468. if where:
  469. sql += f" WHERE {where}"
  470. if group_by:
  471. sql += f" GROUP BY {group_by}"
  472. if having:
  473. sql += f" HAVING {having}"
  474. params: List[Any] = []
  475. wp = _normalize_where_params(where_params)
  476. if wp is not None:
  477. params.extend(list(wp))
  478. hp = _normalize_where_params(having_params)
  479. if hp is not None:
  480. params.extend(list(hp))
  481. try:
  482. with self._get_connection_and_cursor(connection) as (_conn, cursor, _should_close):
  483. cursor.execute(sql, tuple(params) if params else None)
  484. return list(cursor.fetchall())
  485. except Exception as e:
  486. raise MySQLQueryError(
  487. message=f"aggregate failed (source={self._source}): {e}",
  488. original_error=e,
  489. ) from e
  490. def sum(
  491. self,
  492. table: str,
  493. column: str,
  494. where: str = "",
  495. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  496. connection: Optional[pymysql.connections.Connection] = None,
  497. ) -> float:
  498. rows = self.aggregate(
  499. table=table,
  500. agg_functions={"sum_result": f"SUM({column})"},
  501. where=where,
  502. where_params=where_params,
  503. connection=connection,
  504. )
  505. v = rows[0].get("sum_result") if rows else None
  506. return float(v) if v is not None else 0.0
  507. def avg(
  508. self,
  509. table: str,
  510. column: str,
  511. where: str = "",
  512. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  513. connection: Optional[pymysql.connections.Connection] = None,
  514. ) -> float:
  515. rows = self.aggregate(
  516. table=table,
  517. agg_functions={"avg_result": f"AVG({column})"},
  518. where=where,
  519. where_params=where_params,
  520. connection=connection,
  521. )
  522. v = rows[0].get("avg_result") if rows else None
  523. return float(v) if v is not None else 0.0
  524. def max(
  525. self,
  526. table: str,
  527. column: str,
  528. where: str = "",
  529. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  530. connection: Optional[pymysql.connections.Connection] = None,
  531. ) -> Any:
  532. rows = self.aggregate(
  533. table=table,
  534. agg_functions={"max_result": f"MAX({column})"},
  535. where=where,
  536. where_params=where_params,
  537. connection=connection,
  538. )
  539. return rows[0].get("max_result") if rows and rows[0].get("max_result") is not None else None
  540. def min(
  541. self,
  542. table: str,
  543. column: str,
  544. where: str = "",
  545. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  546. connection: Optional[pymysql.connections.Connection] = None,
  547. ) -> Any:
  548. rows = self.aggregate(
  549. table=table,
  550. agg_functions={"min_result": f"MIN({column})"},
  551. where=where,
  552. where_params=where_params,
  553. connection=connection,
  554. )
  555. return rows[0].get("min_result") if rows and rows[0].get("min_result") is not None else None
  556. def group_count(
  557. self,
  558. table: str,
  559. group_column: str,
  560. where: str = "",
  561. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  562. order_by: str = "",
  563. limit: Optional[int] = None,
  564. connection: Optional[pymysql.connections.Connection] = None,
  565. ) -> List[Dict[str, Any]]:
  566. sql = f"SELECT {group_column}, COUNT(*) as count FROM {table}"
  567. if where:
  568. sql += f" WHERE {where}"
  569. sql += f" GROUP BY {group_column}"
  570. if order_by:
  571. sql += f" ORDER BY {order_by}"
  572. else:
  573. sql += " ORDER BY count DESC"
  574. if limit is not None:
  575. sql += f" LIMIT {limit}"
  576. params = _normalize_where_params(where_params)
  577. try:
  578. with self._get_connection_and_cursor(connection) as (_conn, cursor, _should_close):
  579. cursor.execute(sql, params)
  580. return list(cursor.fetchall())
  581. except Exception as e:
  582. raise MySQLQueryError(
  583. message=f"group_count failed (source={self._source}): {e}",
  584. original_error=e,
  585. ) from e
  586. def search(
  587. self,
  588. table: str,
  589. search_columns: List[str],
  590. keyword: str,
  591. columns: str = "*",
  592. where: str = "",
  593. where_params: Optional[Sequence[Any] | Mapping[str, Any]] = None,
  594. order_by: str = "",
  595. limit: Optional[int] = None,
  596. connection: Optional[pymysql.connections.Connection] = None,
  597. ) -> List[Dict[str, Any]]:
  598. if not search_columns or not keyword:
  599. return []
  600. search_conditions: List[str] = []
  601. search_params: List[Any] = []
  602. for col in search_columns:
  603. search_conditions.append(f"{col} LIKE %s")
  604. search_params.append(f"%{keyword}%")
  605. search_where = f"({' OR '.join(search_conditions)})"
  606. final_where = search_where
  607. final_params: List[Any] = list(search_params)
  608. if where:
  609. final_where = f"{search_where} AND ({where})"
  610. wp = _normalize_where_params(where_params)
  611. if wp is not None:
  612. final_params.extend(list(wp))
  613. return self.select(
  614. table,
  615. columns=columns,
  616. where=final_where,
  617. where_params=tuple(final_params),
  618. order_by=order_by,
  619. limit=limit,
  620. connection=connection,
  621. )
  622. # -----------------------
  623. # Functional transaction helpers (optional)
  624. # -----------------------
  625. def execute_in_transaction(
  626. self,
  627. func,
  628. *args,
  629. isolation_level: Optional[str] = None,
  630. **kwargs,
  631. ) -> Any:
  632. with self.transaction(isolation_level=isolation_level) as conn:
  633. return func(conn, *args, **kwargs)
  634. def batch_operations(
  635. self,
  636. operations: list,
  637. isolation_level: Optional[str] = None,
  638. ) -> list:
  639. results: list = []
  640. with self.transaction(isolation_level=isolation_level) as conn:
  641. for op in operations:
  642. method_name, args, op_kwargs = op
  643. op_kwargs = op_kwargs or {}
  644. op_kwargs["connection"] = conn
  645. method = getattr(self, method_name)
  646. results.append(method(*args, **op_kwargs))
  647. return results
  648. _GLOBAL_DB: Dict[str, MySQLDB] = {}
  649. def get_mysql_db(source: str = "default") -> MySQLDB:
  650. if source not in _GLOBAL_DB:
  651. mgr = get_global_manager()
  652. _GLOBAL_DB[source] = MySQLDB(manager=mgr, source=source)
  653. return _GLOBAL_DB[source]
  654. # For compatibility with how_decode/utils/mysql (global mysql_db)
  655. mysql_db = get_mysql_db("default")