__init__.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. """
  2. @author: luojunhui
  3. """
  4. import aiomysql
  5. from aiomysql.cursors import Cursor
  6. from applications.config import denet_config, long_articles_config
  7. class AsyncMySQLClient(object):
  8. """
  9. 异步 mysql 连接池
  10. """
  11. def __init__(self, app=None, aigc=False):
  12. self.aigc = aigc
  13. if not app:
  14. self.mysql_pool = None
  15. else:
  16. self.mysql_pool = app
  17. async def init_pool(self):
  18. """
  19. 初始化连接
  20. :return:
  21. """
  22. if self.aigc:
  23. db_config = denet_config
  24. else:
  25. db_config = long_articles_config
  26. self.mysql_pool = await aiomysql.create_pool(
  27. host=db_config['host'],
  28. port=db_config['port'],
  29. user=db_config['user'],
  30. password=db_config['password'],
  31. db=db_config['db'],
  32. charset=db_config['charset'],
  33. connect_timeout=120,
  34. )
  35. print("{} mysql init successfully".format("Denet" if self.aigc else "长文"))
  36. async def close_pool(self):
  37. """
  38. 关闭 mysql 连接
  39. :return:
  40. """
  41. self.mysql_pool.close()
  42. await self.mysql_pool.wait_closed()
  43. async def async_select(self, sql, cursor_type=Cursor):
  44. """
  45. select method
  46. :param sql:
  47. :param cursor_type:
  48. :return:
  49. """
  50. async with self.mysql_pool.acquire() as conn:
  51. async with conn.cursor(cursor_type) as cursor:
  52. await cursor.execute(sql)
  53. result = await cursor.fetchall()
  54. return result
  55. async def async_insert(self, sql, params):
  56. """
  57. insert and update method
  58. :param params:
  59. :param sql:
  60. :return:
  61. """
  62. async with self.mysql_pool.acquire() as conn:
  63. async with conn.cursor() as cursor:
  64. try:
  65. await cursor.execute(sql, params)
  66. affected_rows = cursor.rowcount
  67. await conn.commit()
  68. return affected_rows
  69. except Exception as e:
  70. await conn.rollback()
  71. raise
  72. async def async_insert_many(self, sql, params_list):
  73. """
  74. :param sql:
  75. :param params_list:
  76. :return:
  77. """
  78. async with self.mysql_pool.acquire() as conn:
  79. async with conn.cursor() as cursor:
  80. try:
  81. await cursor.executemany(sql, params_list)
  82. affected_rows = cursor.rowcount
  83. await conn.commit()
  84. return affected_rows
  85. except Exception as e:
  86. await conn.rollback()
  87. raise
  88. async def __aenter__(self):
  89. await self.init_pool()
  90. return self
  91. async def __aexit__(self, exc_type, exc_val, exc_tb):
  92. await self.close_pool()
  93. class TaskMySQLClient(object):
  94. """
  95. Async MySQL
  96. """
  97. def __init__(self):
  98. self.mysql_pool = None
  99. async def init_pool(self):
  100. """
  101. 初始化连接
  102. :return:
  103. """
  104. self.mysql_pool = await aiomysql.create_pool(
  105. host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
  106. port=3306,
  107. user='changwen_admin',
  108. password='changwen@123456',
  109. db='long_articles',
  110. charset='utf8mb4',
  111. connect_timeout=120
  112. )
  113. print("mysql init successfully")
  114. async def close_pool(self):
  115. """
  116. 关闭 mysql 连接
  117. :return:
  118. """
  119. self.mysql_pool.close()
  120. await self.mysql_pool.wait_closed()
  121. async def async_select(self, sql):
  122. """
  123. select method
  124. :param sql:
  125. :return:
  126. """
  127. async with self.mysql_pool.acquire() as conn:
  128. async with conn.cursor() as cursor:
  129. await cursor.execute(sql)
  130. result = await cursor.fetchall()
  131. return result
  132. async def async_insert(self, sql, params):
  133. """
  134. insert and update method
  135. :param params:
  136. :param sql:
  137. :return:
  138. """
  139. async with self.mysql_pool.acquire() as conn:
  140. async with conn.cursor() as cursor:
  141. await cursor.execute(sql, params)
  142. await conn.commit()