bootstrap.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from typing import Optional
  2. from datetime import datetime
  3. from zoneinfo import ZoneInfo
  4. from apscheduler.schedulers.background import BackgroundScheduler
  5. from apscheduler.triggers.cron import CronTrigger
  6. from scheduler.decode_dispatch_job import run_decode_dispatch_job
  7. from utils.scheduler_logger import get_scheduler_logger
  8. logger = get_scheduler_logger()
  9. _scheduler: Optional[BackgroundScheduler] = None
  10. def start_scheduler() -> None:
  11. global _scheduler
  12. if _scheduler and _scheduler.running:
  13. logger.info("调度器已在运行,跳过重复启动")
  14. return
  15. _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai"))
  16. _scheduler.add_job(
  17. run_decode_dispatch_job,
  18. trigger=CronTrigger(minute="*/3"),
  19. id="decode_dispatch_every_3min",
  20. replace_existing=True,
  21. next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")),
  22. )
  23. _scheduler.start()
  24. logger.info("调度器已启动:立即触发一次,之后每3分钟执行一次")
  25. def stop_scheduler() -> None:
  26. global _scheduler
  27. if _scheduler and _scheduler.running:
  28. _scheduler.shutdown(wait=False)
  29. logger.info("调度器已停止")
  30. _scheduler = None
  31. def run_dispatch_once() -> None:
  32. logger.info("手动触发执行一次调度任务")
  33. run_decode_dispatch_job()