123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- """
- @author: luojunhui
- """
- import datetime
- import aiomysql
- import asyncio
- from tasks import MatchTask3
- class TaskMySQLClient(object):
- """
- Async MySQL
- """
- def __init__(self, db_name):
- self.db_name = db_name
- self.mysql_pool = None
- async def init_pool(self):
- """
- 初始化连接
- :return:
- """
- if self.db_name == 'pq':
- self.mysql_pool = await aiomysql.create_pool(
- host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
- port=3306,
- user='crawler',
- password='crawler123456@',
- db='piaoquan-crawler',
- charset='utf8mb4',
- connect_timeout=120,
- )
- print("mysql init successfully")
- elif self.db_name == 'denet':
- self.mysql_pool = await aiomysql.create_pool(
- host='rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',
- port=3306,
- user='crawler_admin',
- password='cyber#crawler_2023',
- db='aigc-admin-prod',
- charset='utf8mb4',
- connect_timeout=120,
- )
- print("mysql init successfully")
- elif self.db_name == 'long-article':
- self.mysql_pool = await aiomysql.create_pool(
- host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
- port=3306,
- user='changwen_admin',
- password='changwen@123456',
- db='long_articles',
- charset='utf8mb4',
- connect_timeout=120,
- )
- print("mysql init successfully")
- else:
- print("mysql init fail")
- async def close_pool(self):
- """
- 关闭 mysql 连接
- :return:
- """
- self.mysql_pool.close()
- await self.mysql_pool.wait_closed()
- async def async_select(self, sql):
- """
- select method
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as conn:
- async with conn.cursor() as cursor:
- await cursor.execute(sql)
- result = await cursor.fetchall()
- return result
- async def async_insert(self, sql, params):
- """
- insert and update method
- :param params:
- :param sql:
- :return:
- """
- async with self.mysql_pool.acquire() as coon:
- async with coon.cursor() as cursor:
- await cursor.execute(sql, params)
- await coon.commit()
- async def main():
- """
- main job
- :return:
- """
- pq_db_client = TaskMySQLClient("pq")
- await pq_db_client.init_pool()
- denet_db_client = TaskMySQLClient("denet")
- await denet_db_client.init_pool()
- long_article_db_client = TaskMySQLClient("long-article")
- await long_article_db_client.init_pool()
- history_task = MatchTask3(
- pq_client=pq_db_client,
- denet_client=denet_db_client,
- long_article_client=long_article_db_client
- )
- await history_task.deal()
- now_str = datetime.datetime.now().__str__()
- print("{} 请求执行完成, 等待1分钟".format(now_str))
- await asyncio.sleep(1 * 60)
- if __name__ == '__main__':
- while True:
- asyncio.run(main())
|