task_schedule.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from apscheduler.triggers.interval import IntervalTrigger
  3. from decode_task.decodeTask import decode_task_status_handler
  4. from decode_task.scriptTask import script_task_status_handler
  5. from loguru import logger
  6. class TaskScheduler:
  7. def __init__(self):
  8. self.scheduler = BackgroundScheduler()
  9. self.setup_jobs()
  10. def setup_jobs(self):
  11. """设置定时任务"""
  12. self.scheduler.add_job(
  13. func=decode_task_status_handler,
  14. trigger=IntervalTrigger(seconds=60),
  15. id='decode_task_status_handler',
  16. name='任务状态刷新',
  17. max_instances=5 # 防止重复执行
  18. )
  19. self.scheduler.add_job(
  20. func=script_task_status_handler,
  21. trigger=IntervalTrigger(seconds=60),
  22. id='script_task_status_handler',
  23. name='script_task_status_handler',
  24. max_instances=5 # 防止重复执行
  25. )
  26. def start(self):
  27. """启动调度器"""
  28. logger.info("定时任务调度器启动")
  29. try:
  30. self.scheduler.start()
  31. except KeyboardInterrupt:
  32. logger.info("收到停止信号,正在关闭调度器...")
  33. self.shutdown()
  34. def shutdown(self):
  35. """关闭调度器"""
  36. self.scheduler.shutdown()
  37. logger.info("定时任务调度器已关闭")