from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from decode_task.decodeTask import decode_task_status_handler from decode_task.scriptTask import script_task_status_handler from loguru import logger class TaskScheduler: def __init__(self): self.scheduler = BackgroundScheduler() self.setup_jobs() def setup_jobs(self): """设置定时任务""" self.scheduler.add_job( func=decode_task_status_handler, trigger=IntervalTrigger(seconds=60), id='decode_task_status_handler', name='任务状态刷新', max_instances=5 # 防止重复执行 ) self.scheduler.add_job( func=script_task_status_handler, trigger=IntervalTrigger(seconds=60), id='script_task_status_handler', name='script_task_status_handler', max_instances=5 # 防止重复执行 ) def start(self): """启动调度器""" logger.info("定时任务调度器启动") try: self.scheduler.start() except KeyboardInterrupt: logger.info("收到停止信号,正在关闭调度器...") self.shutdown() def shutdown(self): """关闭调度器""" self.scheduler.shutdown() logger.info("定时任务调度器已关闭")