__init__.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. """
  2. @author: luojunhui
  3. """
  4. import aiomysql
  5. from applications.config import denet_config, long_articles_config
  6. class AsyncMySQLClient(object):
  7. """
  8. 异步 mysql 连接池
  9. """
  10. def __init__(self, app=None, aigc=False):
  11. self.aigc = aigc
  12. if not app:
  13. self.mysql_pool = None
  14. else:
  15. self.mysql_pool = app
  16. async def init_pool(self):
  17. """
  18. 初始化连接
  19. :return:
  20. """
  21. if self.aigc:
  22. db_config = denet_config
  23. else:
  24. db_config = long_articles_config
  25. self.mysql_pool = await aiomysql.create_pool(
  26. host=db_config['host'],
  27. port=db_config['port'],
  28. user=db_config['user'],
  29. password=db_config['password'],
  30. db=db_config['db'],
  31. charset=db_config['charset'],
  32. connect_timeout=120,
  33. )
  34. print("{} mysql init successfully".format("Denet" if self.aigc else "长文"))
  35. async def close_pool(self):
  36. """
  37. 关闭 mysql 连接
  38. :return:
  39. """
  40. self.mysql_pool.close()
  41. await self.mysql_pool.wait_closed()
  42. async def async_select(self, sql):
  43. """
  44. select method
  45. :param sql:
  46. :return:
  47. """
  48. async with self.mysql_pool.acquire() as conn:
  49. async with conn.cursor() as cursor:
  50. await cursor.execute(sql)
  51. result = await cursor.fetchall()
  52. return result
  53. async def async_insert(self, sql, params):
  54. """
  55. insert and update method
  56. :param params:
  57. :param sql:
  58. :return:
  59. """
  60. async with self.mysql_pool.acquire() as conn:
  61. async with conn.cursor() as cursor:
  62. try:
  63. await cursor.execute(sql, params)
  64. affected_rows = cursor.rowcount
  65. await conn.commit()
  66. return affected_rows
  67. except Exception as e:
  68. await conn.rollback()
  69. raise
  70. async def async_insert_many(self, sql, params_list):
  71. """
  72. :param sql:
  73. :param params_list:
  74. :return:
  75. """
  76. async with self.mysql_pool.acquire() as conn:
  77. async with conn.cursor() as cursor:
  78. try:
  79. await cursor.executemany(sql, params_list)
  80. affected_rows = cursor.rowcount
  81. await conn.commit()
  82. return affected_rows
  83. except Exception as e:
  84. await conn.rollback()
  85. raise
  86. async def __aenter__(self):
  87. await self.init_pool()
  88. return self
  89. async def __aexit__(self, exc_type, exc_val, exc_tb):
  90. await self.close_pool()
  91. class TaskMySQLClient(object):
  92. """
  93. Async MySQL
  94. """
  95. def __init__(self):
  96. self.mysql_pool = None
  97. async def init_pool(self):
  98. """
  99. 初始化连接
  100. :return:
  101. """
  102. self.mysql_pool = await aiomysql.create_pool(
  103. host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
  104. port=3306,
  105. user='changwen_admin',
  106. password='changwen@123456',
  107. db='long_articles',
  108. charset='utf8mb4',
  109. connect_timeout=120
  110. )
  111. print("mysql init successfully")
  112. async def close_pool(self):
  113. """
  114. 关闭 mysql 连接
  115. :return:
  116. """
  117. self.mysql_pool.close()
  118. await self.mysql_pool.wait_closed()
  119. async def async_select(self, sql):
  120. """
  121. select method
  122. :param sql:
  123. :return:
  124. """
  125. async with self.mysql_pool.acquire() as conn:
  126. async with conn.cursor() as cursor:
  127. await cursor.execute(sql)
  128. result = await cursor.fetchall()
  129. return result
  130. async def async_insert(self, sql, params):
  131. """
  132. insert and update method
  133. :param params:
  134. :param sql:
  135. :return:
  136. """
  137. async with self.mysql_pool.acquire() as conn:
  138. async with conn.cursor() as cursor:
  139. await cursor.execute(sql, params)
  140. await conn.commit()