task_schedule.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from apscheduler.triggers.interval import IntervalTrigger
  3. from decode_task.decodeTask import decode_task_status_handler
  4. from decode_task.scriptTask import script_task_status_handler
  5. from loguru import logger
  6. class TaskScheduler:
  7. def __init__(self):
  8. self.scheduler = BackgroundScheduler()
  9. self.setup_jobs()
  10. def setup_jobs(self):
  11. """设置定时任务"""
  12. self.scheduler.add_job(
  13. func=decode_task_status_handler,
  14. trigger=IntervalTrigger(seconds=30),
  15. id='decode_task_handler',
  16. name='decode_task任务刷新',
  17. max_instances=1 # 防止重复执行
  18. )
  19. # self.scheduler.add_job(
  20. # func=script_task_status_handler,
  21. # trigger=IntervalTrigger(seconds=30),
  22. # id='script_task_status_handler',
  23. # name='script_task任务刷新',
  24. # max_instances=1 # 防止重复执行
  25. # )
  26. def start(self):
  27. """启动调度器"""
  28. logger.info("定时任务调度器启动")
  29. try:
  30. self.scheduler.start()
  31. except KeyboardInterrupt:
  32. logger.info("收到停止信号,正在关闭调度器...")
  33. self.shutdown()
  34. def shutdown(self):
  35. """关闭调度器"""
  36. self.scheduler.shutdown()
  37. logger.info("定时任务调度器已关闭")