task.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import datetime
  6. import asyncio
  7. import aiomysql
  8. from applications.deal import ProcessDeal
  9. from applications.deal.process_deal_2 import ProcessDeal2
  10. class TaskMySQLClient(object):
  11. """
  12. Async MySQL
  13. """
  14. def __init__(self):
  15. self.mysql_pool = None
  16. async def init_pool(self):
  17. """
  18. 初始化连接
  19. :return:
  20. """
  21. self.mysql_pool = await aiomysql.create_pool(
  22. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  23. port=3306,
  24. user='crawler',
  25. password='crawler123456@',
  26. db='piaoquan-crawler',
  27. charset='utf8mb4',
  28. connect_timeout=120,
  29. )
  30. print("mysql init successfully")
  31. async def close_pool(self):
  32. """
  33. 关闭 mysql 连接
  34. :return:
  35. """
  36. self.mysql_pool.close()
  37. await self.mysql_pool.wait_closed()
  38. async def async_select(self, sql):
  39. """
  40. select method
  41. :param sql:
  42. :return:
  43. """
  44. async with self.mysql_pool.acquire() as conn:
  45. async with conn.cursor() as cursor:
  46. await cursor.execute(sql)
  47. result = await cursor.fetchall()
  48. return result
  49. async def async_insert(self, sql):
  50. """
  51. insert and update method
  52. :param sql:
  53. :return:
  54. """
  55. async with self.mysql_pool.acquire() as coon:
  56. async with coon.cursor() as cursor:
  57. await cursor.execute(sql)
  58. await coon.commit()
  59. async def main():
  60. """
  61. main job
  62. :return:
  63. """
  64. TMC = TaskMySQLClient()
  65. await TMC.init_pool()
  66. PD = ProcessDeal(TMC)
  67. await PD.deal()
  68. async def main2():
  69. """
  70. main2
  71. :return:
  72. """
  73. TMC = TaskMySQLClient()
  74. await TMC.init_pool()
  75. PD = ProcessDeal2(TMC)
  76. await PD.deal()
  77. if __name__ == '__main__':
  78. while True:
  79. asyncio.run(main())
  80. now_str = datetime.datetime.now().__str__()
  81. print("{} 请求执行完成, 等待60s".format(now_str))
  82. time.sleep(60)
  83. asyncio.run(main2())
  84. now_str = datetime.datetime.now().__str__()
  85. print("查找历史数据{} 请求执行完成, 等待60s".format(now_str))
  86. time.sleep(60)