database.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. #! /usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # vim:fenc=utf-8
  4. #
  5. # Copyright © 2024 StrayWarrior <i@straywarrior.com>
  6. import pymysql
  7. from logging_service import logger
  8. class MySQLManager:
  9. def __init__(self, config):
  10. self.config = config
  11. def select(self, sql, cursor_type=None, args=None):
  12. """
  13. sql: SQL to execute, string
  14. """
  15. conn = pymysql.connect(**self.config)
  16. cursor = conn.cursor(cursor_type)
  17. cursor.execute(sql, args)
  18. data = cursor.fetchall()
  19. # do not handle exception
  20. cursor.close()
  21. conn.close()
  22. return data
  23. def execute(self, sql, args=None):
  24. conn = pymysql.connect(**self.config)
  25. cursor = conn.cursor()
  26. try:
  27. cursor.execute(sql, args)
  28. affected_rows = cursor.rowcount
  29. conn.commit()
  30. return affected_rows
  31. except Exception as e:
  32. conn.rollback()
  33. raise e
  34. finally:
  35. conn.close()
  36. def batch_insert(self, table, data, columns=None, ignore=False):
  37. """
  38. table: table name, string
  39. data: data, list[tuple] or list[dict]
  40. columns: column names, list, required if data is list[tuple]
  41. """
  42. if data is None or len(data) == 0:
  43. return None
  44. conn = pymysql.connect(**self.config)
  45. try:
  46. if isinstance(data[0], dict):
  47. keys = data[0].keys()
  48. columns_str = ','.join(keys)
  49. placeholders_str = ','.join([f'%({key})s' for key in keys])
  50. else:
  51. if len(data[0]) != len(columns):
  52. raise Exception("data length != column length")
  53. columns_str = ','.join(columns)
  54. placeholders_str = ','.join(['%s'] * len(data[0]))
  55. ignore_keyword = 'IGNORE' if ignore else ''
  56. with conn.cursor() as cursor:
  57. sql_str = f"INSERT {ignore_keyword} INTO {table} ({columns_str}) VALUES ({placeholders_str})"
  58. rows = cursor.executemany(sql_str, data)
  59. conn.commit()
  60. return rows
  61. except pymysql.MySQLError as e:
  62. logger.error(f"Error in batch_insert: {e}")
  63. conn.rollback()
  64. raise e
  65. conn.close()