matchVideoFromHistoryArticleASC.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. import aiomysql
  6. import asyncio
  7. from tasks import MatchTask3
  8. class TaskMySQLClient(object):
  9. """
  10. Async MySQL
  11. """
  12. def __init__(self, db_name):
  13. self.db_name = db_name
  14. self.mysql_pool = None
  15. async def init_pool(self):
  16. """
  17. 初始化连接
  18. :return:
  19. """
  20. if self.db_name == 'pq':
  21. self.mysql_pool = await aiomysql.create_pool(
  22. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  23. port=3306,
  24. user='crawler',
  25. password='crawler123456@',
  26. db='piaoquan-crawler',
  27. charset='utf8mb4',
  28. connect_timeout=120,
  29. )
  30. print("mysql init successfully")
  31. elif self.db_name == 'denet':
  32. self.mysql_pool = await aiomysql.create_pool(
  33. host='rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',
  34. port=3306,
  35. user='crawler_admin',
  36. password='cyber#crawler_2023',
  37. db='aigc-admin-prod',
  38. charset='utf8mb4',
  39. connect_timeout=120,
  40. )
  41. print("mysql init successfully")
  42. elif self.db_name == 'long-article':
  43. self.mysql_pool = await aiomysql.create_pool(
  44. host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
  45. port=3306,
  46. user='changwen_admin',
  47. password='changwen@123456',
  48. db='long_articles',
  49. charset='utf8mb4',
  50. connect_timeout=120,
  51. )
  52. print("mysql init successfully")
  53. else:
  54. print("mysql init fail")
  55. async def close_pool(self):
  56. """
  57. 关闭 mysql 连接
  58. :return:
  59. """
  60. self.mysql_pool.close()
  61. await self.mysql_pool.wait_closed()
  62. async def async_select(self, sql):
  63. """
  64. select method
  65. :param sql:
  66. :return:
  67. """
  68. async with self.mysql_pool.acquire() as conn:
  69. async with conn.cursor() as cursor:
  70. await cursor.execute(sql)
  71. result = await cursor.fetchall()
  72. return result
  73. async def async_insert(self, sql, params):
  74. """
  75. insert and update method
  76. :param params:
  77. :param sql:
  78. :return:
  79. """
  80. async with self.mysql_pool.acquire() as coon:
  81. async with coon.cursor() as cursor:
  82. await cursor.execute(sql, params)
  83. await coon.commit()
  84. async def main():
  85. """
  86. main job
  87. :return:
  88. """
  89. pq_db_client = TaskMySQLClient("pq")
  90. await pq_db_client.init_pool()
  91. denet_db_client = TaskMySQLClient("denet")
  92. await denet_db_client.init_pool()
  93. long_article_db_client = TaskMySQLClient("long-article")
  94. await long_article_db_client.init_pool()
  95. history_task = MatchTask3(
  96. pq_client=pq_db_client,
  97. denet_client=denet_db_client,
  98. long_article_client=long_article_db_client
  99. )
  100. await history_task.deal()
  101. now_str = datetime.datetime.now().__str__()
  102. print("{} 请求执行完成, 等待1分钟".format(now_str))
  103. await asyncio.sleep(1 * 60)
  104. if __name__ == '__main__':
  105. while True:
  106. asyncio.run(main())