""" @author: luojunhui """ import aiomysql from aiomysql.cursors import Cursor from applications.config import denet_config, long_articles_config class AsyncMySQLClient(object): """ 异步 mysql 连接池 """ def __init__(self, app=None, aigc=False): self.aigc = aigc if not app: self.mysql_pool = None else: self.mysql_pool = app async def init_pool(self): """ 初始化连接 :return: """ if self.aigc: db_config = denet_config else: db_config = long_articles_config self.mysql_pool = await aiomysql.create_pool( host=db_config['host'], port=db_config['port'], user=db_config['user'], password=db_config['password'], db=db_config['db'], charset=db_config['charset'], connect_timeout=120, ) print("{} mysql init successfully".format("Denet" if self.aigc else "长文")) async def close_pool(self): """ 关闭 mysql 连接 :return: """ self.mysql_pool.close() await self.mysql_pool.wait_closed() async def async_select(self, sql, cursor_type=Cursor): """ select method :param sql: :param cursor_type: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor(cursor_type) as cursor: await cursor.execute(sql) result = await cursor.fetchall() return result async def async_insert(self, sql, params): """ insert and update method :param params: :param sql: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.execute(sql, params) affected_rows = cursor.rowcount await conn.commit() return affected_rows except Exception as e: await conn.rollback() raise async def async_insert_many(self, sql, params_list): """ :param sql: :param params_list: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: try: await cursor.executemany(sql, params_list) affected_rows = cursor.rowcount await conn.commit() return affected_rows except Exception as e: await conn.rollback() raise async def __aenter__(self): await self.init_pool() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close_pool() class TaskMySQLClient(object): """ Async MySQL """ def __init__(self): self.mysql_pool = None async def init_pool(self): """ 初始化连接 :return: """ self.mysql_pool = await aiomysql.create_pool( host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com', port=3306, user='changwen_admin', password='changwen@123456', db='long_articles', charset='utf8mb4', connect_timeout=120 ) print("mysql init successfully") async def close_pool(self): """ 关闭 mysql 连接 :return: """ self.mysql_pool.close() await self.mysql_pool.wait_closed() async def async_select(self, sql): """ select method :param sql: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(sql) result = await cursor.fetchall() return result async def async_insert(self, sql, params): """ insert and update method :param params: :param sql: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(sql, params) await conn.commit()