""" @author: luojunhui """ import datetime import aiomysql import asyncio from tasks import MatchTask3 class TaskMySQLClient(object): """ Async MySQL """ def __init__(self, db_name): self.db_name = db_name self.mysql_pool = None async def init_pool(self): """ 初始化连接 :return: """ if self.db_name == 'pq': self.mysql_pool = await aiomysql.create_pool( host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com', port=3306, user='crawler', password='crawler123456@', db='piaoquan-crawler', charset='utf8mb4', connect_timeout=120, ) print("mysql init successfully") elif self.db_name == 'denet': self.mysql_pool = await aiomysql.create_pool( host='rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com', port=3306, user='crawler_admin', password='cyber#crawler_2023', db='aigc-admin-prod', charset='utf8mb4', connect_timeout=120, ) print("mysql init successfully") elif self.db_name == 'long-article': self.mysql_pool = await aiomysql.create_pool( host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com', port=3306, user='changwen_admin', password='changwen@123456', db='long_articles', charset='utf8mb4', connect_timeout=120, ) print("mysql init successfully") else: print("mysql init fail") async def close_pool(self): """ 关闭 mysql 连接 :return: """ self.mysql_pool.close() await self.mysql_pool.wait_closed() async def async_select(self, sql): """ select method :param sql: :return: """ async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(sql) result = await cursor.fetchall() return result async def async_insert(self, sql, params): """ insert and update method :param params: :param sql: :return: """ async with self.mysql_pool.acquire() as coon: async with coon.cursor() as cursor: await cursor.execute(sql, params) await coon.commit() async def main(): """ main job :return: """ pq_db_client = TaskMySQLClient("pq") await pq_db_client.init_pool() denet_db_client = TaskMySQLClient("denet") await denet_db_client.init_pool() long_article_db_client = TaskMySQLClient("long-article") await long_article_db_client.init_pool() history_task = MatchTask3( pq_client=pq_db_client, denet_client=denet_db_client, long_article_client=long_article_db_client ) await history_task.deal() now_str = datetime.datetime.now().__str__() print("{} 请求执行完成, 等待1分钟".format(now_str)) await asyncio.sleep(1 * 60) if __name__ == '__main__': while True: asyncio.run(main())