match_video_task.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import datetime
  6. import asyncio
  7. import aiomysql
  8. from tasks import MatchTask1, MatchTask2
  9. class TaskMySQLClient(object):
  10. """
  11. Async MySQL
  12. """
  13. def __init__(self):
  14. self.mysql_pool = None
  15. async def init_pool(self):
  16. """
  17. 初始化连接
  18. :return:
  19. """
  20. self.mysql_pool = await aiomysql.create_pool(
  21. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  22. port=3306,
  23. user='crawler',
  24. password='crawler123456@',
  25. db='piaoquan-crawler',
  26. charset='utf8mb4',
  27. connect_timeout=120,
  28. )
  29. print("mysql init successfully")
  30. async def close_pool(self):
  31. """
  32. 关闭 mysql 连接
  33. :return:
  34. """
  35. self.mysql_pool.close()
  36. await self.mysql_pool.wait_closed()
  37. async def async_select(self, sql):
  38. """
  39. select method
  40. :param sql:
  41. :return:
  42. """
  43. async with self.mysql_pool.acquire() as conn:
  44. async with conn.cursor() as cursor:
  45. await cursor.execute(sql)
  46. result = await cursor.fetchall()
  47. return result
  48. async def async_insert(self, sql, params):
  49. """
  50. insert and update method
  51. :param params:
  52. :param sql:
  53. :return:
  54. """
  55. async with self.mysql_pool.acquire() as coon:
  56. async with coon.cursor() as cursor:
  57. await cursor.execute(sql, params)
  58. await coon.commit()
  59. async def main():
  60. """
  61. main job
  62. :return:
  63. """
  64. TMC = TaskMySQLClient()
  65. await TMC.init_pool()
  66. PD = MatchTask1(TMC)
  67. await PD.deal()
  68. async def main2():
  69. """
  70. main2
  71. :return:
  72. """
  73. TMC = TaskMySQLClient()
  74. await TMC.init_pool()
  75. PD = MatchTask2(TMC)
  76. await PD.deal()
  77. if __name__ == '__main__':
  78. # asyncio.run(main())
  79. while True:
  80. asyncio.run(main())
  81. now_str = datetime.datetime.now().__str__()
  82. print("{} 请求执行完成, 等待60s".format(now_str))
  83. time.sleep(60)
  84. asyncio.run(main2())
  85. now_str = datetime.datetime.now().__str__()
  86. print("查找历史数据{} 请求执行完成, 等待60s".format(now_str))
  87. time.sleep(60)