from typing import Optional from datetime import datetime from zoneinfo import ZoneInfo from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from scheduler.decode_dispatch_job import run_decode_dispatch_job from utils.scheduler_logger import get_scheduler_logger logger = get_scheduler_logger() _scheduler: Optional[BackgroundScheduler] = None def start_scheduler() -> None: global _scheduler if _scheduler and _scheduler.running: logger.info("调度器已在运行,跳过重复启动") return _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai")) _scheduler.add_job( run_decode_dispatch_job, trigger=CronTrigger(minute="*/3"), id="decode_dispatch_every_3min", replace_existing=True, next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")), ) _scheduler.start() logger.info("调度器已启动:立即触发一次,之后每3分钟执行一次") def stop_scheduler() -> None: global _scheduler if _scheduler and _scheduler.running: _scheduler.shutdown(wait=False) logger.info("调度器已停止") _scheduler = None def run_dispatch_once() -> None: logger.info("手动触发执行一次调度任务") run_decode_dispatch_job()