bootstrap.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from typing import Optional
  2. from datetime import datetime
  3. from zoneinfo import ZoneInfo
  4. from apscheduler.schedulers.background import BackgroundScheduler
  5. from apscheduler.triggers.cron import CronTrigger
  6. from scheduler.decode_dispatch_job import run_decode_dispatch_job
  7. from scheduler.decode_hourly_stats_job import run_decode_hourly_stats_job
  8. from utils.scheduler_logger import get_scheduler_logger
  9. logger = get_scheduler_logger()
  10. _scheduler: Optional[BackgroundScheduler] = None
  11. def start_scheduler() -> None:
  12. global _scheduler
  13. if _scheduler and _scheduler.running:
  14. logger.info("调度器已在运行,跳过重复启动")
  15. return
  16. _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai"))
  17. _scheduler.add_job(
  18. run_decode_dispatch_job,
  19. trigger=CronTrigger(minute="*/3"),
  20. id="decode_dispatch_every_3min",
  21. replace_existing=True,
  22. next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")),
  23. )
  24. _scheduler.add_job(
  25. run_decode_hourly_stats_job,
  26. trigger=CronTrigger(minute=6),
  27. id="decode_hourly_stats",
  28. replace_existing=True,
  29. )
  30. _scheduler.start()
  31. logger.info("调度器已启动:解码任务每3分钟执行,统计任务每小时整点执行")
  32. def stop_scheduler() -> None:
  33. global _scheduler
  34. if _scheduler and _scheduler.running:
  35. _scheduler.shutdown(wait=False)
  36. logger.info("调度器已停止")
  37. _scheduler = None
  38. def run_dispatch_once() -> None:
  39. logger.info("手动触发执行一次调度任务")
  40. run_decode_dispatch_job()