task_schedule.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from apscheduler.triggers.interval import IntervalTrigger
  3. from decode_task.decodeTask import decode_task_status_handler
  4. from loguru import logger
  5. class TaskScheduler:
  6. def __init__(self):
  7. self.scheduler = BackgroundScheduler()
  8. self.setup_jobs()
  9. def setup_jobs(self):
  10. """设置定时任务"""
  11. self.scheduler.add_job(
  12. func=decode_task_status_handler,
  13. trigger=IntervalTrigger(seconds=30),
  14. id='decode_task_handler',
  15. name='decode_task任务刷新',
  16. max_instances=1 # 防止重复执行
  17. )
  18. # self.scheduler.add_job(
  19. # func=script_task_status_handler,
  20. # trigger=IntervalTrigger(seconds=30),
  21. # id='script_task_status_handler',
  22. # name='script_task任务刷新',
  23. # max_instances=1 # 防止重复执行
  24. # )
  25. def start(self):
  26. """启动调度器"""
  27. logger.info("定时任务调度器启动")
  28. try:
  29. self.scheduler.start()
  30. except KeyboardInterrupt:
  31. logger.info("收到停止信号,正在关闭调度器...")
  32. self.shutdown()
  33. def shutdown(self):
  34. """关闭调度器"""
  35. self.scheduler.shutdown()
  36. logger.info("定时任务调度器已关闭")