from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from decode_task.decodeTask import decode_task_status_handler from loguru import logger class TaskScheduler: def __init__(self): self.scheduler = BackgroundScheduler() self.setup_jobs() def setup_jobs(self): """设置定时任务""" self.scheduler.add_job( func=decode_task_status_handler, trigger=IntervalTrigger(seconds=30), id='decode_task_handler', name='decode_task任务刷新', max_instances=1 # 防止重复执行 ) # self.scheduler.add_job( # func=script_task_status_handler, # trigger=IntervalTrigger(seconds=30), # id='script_task_status_handler', # name='script_task任务刷新', # max_instances=1 # 防止重复执行 # ) def start(self): """启动调度器""" logger.info("定时任务调度器启动") try: self.scheduler.start() except KeyboardInterrupt: logger.info("收到停止信号,正在关闭调度器...") self.shutdown() def shutdown(self): """关闭调度器""" self.scheduler.shutdown() logger.info("定时任务调度器已关闭")