matchVideoFromHistoryArticleASC.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. import aiomysql
  6. import asyncio
  7. from tasks import MatchTask3
  8. class TaskMySQLClient(object):
  9. """
  10. Async MySQL
  11. """
  12. def __init__(self, db_name):
  13. self.db_name = db_name
  14. self.mysql_pool = None
  15. async def init_pool(self):
  16. """
  17. 初始化连接
  18. :return:
  19. """
  20. match self.db_name:
  21. case "pq":
  22. self.mysql_pool = await aiomysql.create_pool(
  23. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  24. port=3306,
  25. user='crawler',
  26. password='crawler123456@',
  27. db='piaoquan-crawler',
  28. charset='utf8mb4',
  29. connect_timeout=120,
  30. )
  31. case "denet":
  32. self.mysql_pool = await aiomysql.create_pool(
  33. host='rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',
  34. port=3306,
  35. user='crawler_admin',
  36. password='cyber#crawler_2023',
  37. db='aigc-admin-prod',
  38. charset='utf8mb4',
  39. connect_timeout=120,
  40. )
  41. case "long-article":
  42. self.mysql_pool = await aiomysql.create_pool(
  43. host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
  44. port=3306,
  45. user='changwen_admin',
  46. password='changwen@123456',
  47. db='long_articles',
  48. charset='utf8mb4',
  49. connect_timeout=120,
  50. )
  51. print("mysql init successfully")
  52. async def close_pool(self):
  53. """
  54. 关闭 mysql 连接
  55. :return:
  56. """
  57. self.mysql_pool.close()
  58. await self.mysql_pool.wait_closed()
  59. async def async_select(self, sql):
  60. """
  61. select method
  62. :param sql:
  63. :return:
  64. """
  65. async with self.mysql_pool.acquire() as conn:
  66. async with conn.cursor() as cursor:
  67. await cursor.execute(sql)
  68. result = await cursor.fetchall()
  69. return result
  70. async def async_insert(self, sql, params):
  71. """
  72. insert and update method
  73. :param params:
  74. :param sql:
  75. :return:
  76. """
  77. async with self.mysql_pool.acquire() as coon:
  78. async with coon.cursor() as cursor:
  79. await cursor.execute(sql, params)
  80. await coon.commit()
  81. async def main():
  82. """
  83. main job
  84. :return:
  85. """
  86. pq_db_client = TaskMySQLClient("pq")
  87. await pq_db_client.init_pool()
  88. denet_db_client = TaskMySQLClient("denet")
  89. await denet_db_client.init_pool()
  90. long_article_db_client = TaskMySQLClient("long-article")
  91. await long_article_db_client.init_pool()
  92. history_task = MatchTask3(
  93. pq_client=pq_db_client,
  94. denet_client=denet_db_client,
  95. long_article_client=long_article_db_client
  96. )
  97. await history_task.deal()
  98. now_str = datetime.datetime.now().__str__()
  99. print("{} 请求执行完成, 等待1分钟".format(now_str))
  100. await asyncio.sleep(1 * 60)
  101. if __name__ == '__main__':
  102. while True:
  103. asyncio.run(main())