historyTask.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import asyncio
  6. import datetime
  7. import traceback
  8. from concurrent.futures import ThreadPoolExecutor, as_completed
  9. from tasks.history_task import historyContentIdTask
  10. from applications.db import AsyncMySQLClient
  11. from applications.log import logging
  12. from applications.feishu import bot
  13. from applications.config.const import HistoryContentIdTaskConst
  14. async def main(publish_flag):
  15. """
  16. main job
  17. :return:
  18. """
  19. async_mysql_pool = AsyncMySQLClient()
  20. try:
  21. await async_mysql_pool.init_pool()
  22. logging(
  23. code="history0001",
  24. info="Init MySQL pool successfully",
  25. alg="historyContentIdTask",
  26. function="main"
  27. )
  28. except Exception as e:
  29. logging(
  30. code="history0002",
  31. info="Init MySQL pool failed",
  32. alg="historyContentIdTask",
  33. function="main",
  34. data={
  35. "error": str(e),
  36. "traceback": traceback.format_exc()
  37. }
  38. )
  39. bot(
  40. title="historyContentIdTask INIT MYSQL FAILS",
  41. detail={
  42. "error": str(e),
  43. "traceback": traceback.format_exc()
  44. }
  45. )
  46. try:
  47. history_content_id_task = historyContentIdTask(async_mysql_pool)
  48. except Exception as e:
  49. logging(
  50. code="history0003",
  51. info="Init historyContentIdTask failed",
  52. alg="historyContentIdTask",
  53. function="main",
  54. data={
  55. "error": str(e),
  56. "traceback": traceback.format_exc()
  57. }
  58. )
  59. return
  60. await history_content_id_task.deal(publish_flag)
  61. def run_thread(publish_flag, sleep_seconds):
  62. """
  63. 执行进程
  64. """
  65. while True:
  66. loop = asyncio.new_event_loop()
  67. asyncio.set_event_loop(loop)
  68. loop.run_until_complete(main(publish_flag=publish_flag))
  69. loop.close()
  70. now_str = datetime.datetime.now().__str__()
  71. logging(
  72. code="history0004",
  73. info="History task finished",
  74. alg="run_thread2"
  75. )
  76. print("{} 请求执行完成, 等待{}s".format(now_str, sleep_seconds))
  77. time.sleep(sleep_seconds)
  78. if __name__ == '__main__':
  79. const = HistoryContentIdTaskConst()
  80. with ThreadPoolExecutor(max_workers=2) as executor:
  81. futures = [
  82. executor.submit(run_thread, const.NEED_PUBLISH, const.NEED_PUBLISH_WAIT_TIME),
  83. executor.submit(run_thread, const.DO_NOT_NEED_PUBLISH, const.DO_NOT_NEED_PUBLISH_WAIT_TIME)
  84. ]
  85. try:
  86. for future in as_completed(futures):
  87. future.result()
  88. except KeyboardInterrupt:
  89. print("Stopping all threads...")