historyTask.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import asyncio
  6. import datetime
  7. import traceback
  8. import threading
  9. from concurrent.futures import ThreadPoolExecutor, as_completed
  10. from tasks.history_task import historyContentIdTask
  11. from applications.db import AsyncMySQLClient
  12. from applications.log import logging
  13. from applications.feishu import bot
  14. from applications.config.const import HistoryContentIdTaskConst
  15. async def do_job(publish_flag):
  16. """
  17. main job
  18. :return:
  19. """
  20. async_mysql_pool = AsyncMySQLClient()
  21. try:
  22. await async_mysql_pool.init_pool()
  23. logging(
  24. code="history0001",
  25. info="Init MySQL pool successfully",
  26. alg="historyContentIdTask",
  27. function="main"
  28. )
  29. except Exception as e:
  30. logging(
  31. code="history0002",
  32. info="Init MySQL pool failed",
  33. alg="historyContentIdTask",
  34. function="main",
  35. data={
  36. "error": str(e),
  37. "traceback": traceback.format_exc()
  38. }
  39. )
  40. bot(
  41. title="historyContentIdTask INIT MYSQL FAILS",
  42. detail={
  43. "error": str(e),
  44. "traceback": traceback.format_exc()
  45. }
  46. )
  47. try:
  48. history_content_id_task = historyContentIdTask(async_mysql_pool, publish_flag)
  49. except Exception as e:
  50. logging(
  51. code="history0003",
  52. info="Init historyContentIdTask failed",
  53. alg="historyContentIdTask",
  54. function="main",
  55. data={
  56. "error": str(e),
  57. "traceback": traceback.format_exc()
  58. }
  59. )
  60. return
  61. await history_content_id_task.deal()
  62. def run_thread(publish_flag, sleep_seconds, stop_flag):
  63. """
  64. 执行进程
  65. """
  66. while not stop_flag.is_set():
  67. loop = asyncio.new_event_loop()
  68. asyncio.set_event_loop(loop)
  69. loop.run_until_complete(do_job(publish_flag=publish_flag))
  70. loop.close()
  71. now_str = datetime.datetime.now().__str__()
  72. logging(
  73. code="history0004",
  74. info="History task finished",
  75. function="run_thread"
  76. )
  77. print("Task{}--{} 请求执行完成, 等待{}s".format(publish_flag, now_str, sleep_seconds))
  78. time.sleep(sleep_seconds)
  79. def main():
  80. """
  81. main function
  82. """
  83. const = HistoryContentIdTaskConst()
  84. stop_event = threading.Event()
  85. # 启动两个线程,分别执行两个函数
  86. with ThreadPoolExecutor(max_workers=2) as executor:
  87. futures = [
  88. executor.submit(run_thread, const.NEED_PUBLISH, const.NEED_PUBLISH_WAIT_TIME, stop_event),
  89. executor.submit(run_thread, const.DO_NOT_NEED_PUBLISH, const.DO_NOT_NEED_PUBLISH_WAIT_TIME, stop_event)
  90. ]
  91. try:
  92. for future in as_completed(futures):
  93. future.result()
  94. except KeyboardInterrupt:
  95. print("Stopping all threads...")
  96. stop_event.set()
  97. if __name__ == '__main__':
  98. main()