from typing import Optional from datetime import datetime from zoneinfo import ZoneInfo from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from scheduler.decode_dispatch_job import run_decode_dispatch_job from scheduler.decode_hourly_stats_job import run_decode_hourly_stats_job from utils.scheduler_logger import get_scheduler_logger logger = get_scheduler_logger() _scheduler: Optional[BackgroundScheduler] = None def start_scheduler() -> None: global _scheduler if _scheduler and _scheduler.running: logger.info("调度器已在运行,跳过重复启动") return _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai")) _scheduler.add_job( run_decode_dispatch_job, trigger=CronTrigger(minute="*/3"), id="decode_dispatch_every_3min", replace_existing=True, next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")), ) _scheduler.add_job( run_decode_hourly_stats_job, trigger=CronTrigger(minute=6), id="decode_hourly_stats", replace_existing=True, ) _scheduler.start() logger.info("调度器已启动:解码任务每3分钟执行,统计任务每小时整点执行") def stop_scheduler() -> None: global _scheduler if _scheduler and _scheduler.running: _scheduler.shutdown(wait=False) logger.info("调度器已停止") _scheduler = None def run_dispatch_once() -> None: logger.info("手动触发执行一次调度任务") run_decode_dispatch_job()