| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- from typing import Optional
- from datetime import datetime
- from zoneinfo import ZoneInfo
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.triggers.cron import CronTrigger
- from scheduler.decode_dispatch_job import run_decode_dispatch_job
- from scheduler.decode_hourly_stats_job import run_decode_hourly_stats_job
- from utils.scheduler_logger import get_scheduler_logger
- logger = get_scheduler_logger()
- _scheduler: Optional[BackgroundScheduler] = None
- def start_scheduler() -> None:
- global _scheduler
- if _scheduler and _scheduler.running:
- logger.info("调度器已在运行,跳过重复启动")
- return
- _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai"))
- _scheduler.add_job(
- run_decode_dispatch_job,
- trigger=CronTrigger(minute="*/3"),
- id="decode_dispatch_every_3min",
- replace_existing=True,
- next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")),
- )
- _scheduler.add_job(
- run_decode_hourly_stats_job,
- trigger=CronTrigger(minute=6),
- id="decode_hourly_stats",
- replace_existing=True,
- )
- _scheduler.start()
- logger.info("调度器已启动:解码任务每3分钟执行,统计任务每小时整点执行")
- def stop_scheduler() -> None:
- global _scheduler
- if _scheduler and _scheduler.running:
- _scheduler.shutdown(wait=False)
- logger.info("调度器已停止")
- _scheduler = None
- def run_dispatch_once() -> None:
- logger.info("手动触发执行一次调度任务")
- run_decode_dispatch_job()
|