task.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import aiomysql
  6. from applications.deal import ProcessDeal
  7. class TaskMySQLClient(object):
  8. """
  9. Async MySQL
  10. """
  11. def __init__(self):
  12. self.mysql_pool = None
  13. async def init_pool(self):
  14. """
  15. 初始化连接
  16. :return:
  17. """
  18. self.mysql_pool = await aiomysql.create_pool(
  19. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  20. port=3306,
  21. user='crawler',
  22. password='crawler123456@',
  23. db='piaoquan-crawler',
  24. charset='utf8mb4',
  25. connect_timeout=120,
  26. )
  27. print("mysql init successfully")
  28. async def close_pool(self):
  29. """
  30. 关闭 mysql 连接
  31. :return:
  32. """
  33. self.mysql_pool.close()
  34. await self.mysql_pool.wait_closed()
  35. async def async_select(self, sql):
  36. """
  37. select method
  38. :param sql:
  39. :return:
  40. """
  41. async with self.mysql_pool.acquire() as conn:
  42. async with conn.cursor() as cursor:
  43. await cursor.execute(sql)
  44. result = await cursor.fetchall()
  45. return result
  46. async def async_insert(self, sql):
  47. """
  48. insert and update method
  49. :param sql:
  50. :return:
  51. """
  52. async with self.mysql_pool.acquire() as coon:
  53. async with coon.cursor() as cursor:
  54. await cursor.execute(sql)
  55. await coon.commit()
  56. async def main():
  57. """
  58. main job
  59. :return:
  60. """
  61. TMC = TaskMySQLClient()
  62. await TMC.init_pool()
  63. PD = ProcessDeal(TMC)
  64. await PD.deal()
  65. if __name__ == '__main__':
  66. asyncio.run(main())