123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- """
- @author: luojunhui
- """
- import aiomysql
- from aiomysql.cursors import Cursor
- from applications.config import denet_config, long_articles_config
- class AsyncMySQLClient(object):
- """
- 异步 mysql 连接池
- """
- def __init__(self, app=None, aigc=False):
- self.aigc = aigc
- if not app:
- self.mysql_pool = None
- else:
- self.mysql_pool = app
- async def init_pool(self):
- """
- 初始化连接
- :return:
- """
- if self.aigc:
- db_config = denet_config
- else:
- db_config = long_articles_config
- self.mysql_pool = await aiomysql.create_pool(
- host=db_config['host'],
- port=db_config['port'],
- user=db_config['user'],
- password=db_config['password'],
- db=db_config['db'],
- charset=db_config['charset'],
- connect_timeout=120,
- )
- print("{} mysql init successfully".format("Denet" if self.aigc else "长文"))
- async def close_pool(self):
- """
- 关闭 mysql 连接
- :return:
- """
- self.mysql_pool.close()
- await self.mysql_pool.wait_closed()
- async def async_select(self, sql, cursor_type=Cursor):
- """
- select method
- :param sql:
- :param cursor_type:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor(cursor_type) as cursor:
- await cursor.execute(sql)
- result = await cursor.fetchall()
- return result
- async def async_insert(self, sql, params):
- """
- insert and update method
- :param params:
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor() as cursor:
- try:
- await cursor.execute(sql, params)
- affected_rows = cursor.rowcount
- await conn.commit()
- return affected_rows
- except Exception as e:
- await conn.rollback()
- raise
- async def async_insert_many(self, sql, params_list):
- """
- :param sql:
- :param params_list:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor() as cursor:
- try:
- await cursor.executemany(sql, params_list)
- affected_rows = cursor.rowcount
- await conn.commit()
- return affected_rows
- except Exception as e:
- await conn.rollback()
- raise
- async def __aenter__(self):
- await self.init_pool()
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.close_pool()
- class TaskMySQLClient(object):
- """
- Async MySQL
- """
- def __init__(self):
- self.mysql_pool = None
- async def init_pool(self):
- """
- 初始化连接
- :return:
- """
- self.mysql_pool = await aiomysql.create_pool(
- host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
- port=3306,
- user='changwen_admin',
- password='changwen@123456',
- db='long_articles',
- charset='utf8mb4',
- connect_timeout=120
- )
- print("mysql init successfully")
- async def close_pool(self):
- """
- 关闭 mysql 连接
- :return:
- """
- self.mysql_pool.close()
- await self.mysql_pool.wait_closed()
- async def async_select(self, sql):
- """
- select method
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor() as cursor:
- await cursor.execute(sql)
- result = await cursor.fetchall()
- return result
- async def async_insert(self, sql, params):
- """
- insert and update method
- :param params:
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor() as cursor:
- await cursor.execute(sql, params)
- await conn.commit()
|