manager.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from datetime import datetime
  2. from zoneinfo import ZoneInfo
  3. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  4. from apscheduler.triggers.cron import CronTrigger
  5. from apscheduler.triggers.date import DateTrigger
  6. from apscheduler.triggers.interval import IntervalTrigger
  7. from app.core.config import settings
  8. from app.scheduler.jobs import (
  9. demand_pool_daily_strategy_alert_job,
  10. demand_pool_today_incremental_sync_job,
  11. heartbeat_job,
  12. )
  13. # 与 scheduler 一致。APScheduler 3.x 在 add_job 里传入 CronTrigger 实例时,若未显式指定
  14. # timezone,会按 UTC 等默认时区解析 hour/minute,导致「配置的 9:40」实际在 UTC 触发,
  15. # 北京时间观测为 17:40(+8 小时)。
  16. _CRON_TZ = "Asia/Shanghai"
  17. scheduler = AsyncIOScheduler(timezone=_CRON_TZ)
  18. def setup_jobs() -> None:
  19. scheduler.add_job(
  20. heartbeat_job,
  21. trigger=IntervalTrigger(seconds=settings.scheduler_heartbeat_seconds),
  22. id="heartbeat_job",
  23. replace_existing=True,
  24. )
  25. if settings.demand_pool_hourly_sync_enabled:
  26. scheduler.add_job(
  27. demand_pool_today_incremental_sync_job,
  28. trigger=CronTrigger(
  29. minute=settings.demand_pool_hourly_sync_minute,
  30. timezone=_CRON_TZ,
  31. ),
  32. id="demand_pool_hourly_sync_job",
  33. replace_existing=True,
  34. max_instances=1,
  35. coalesce=True,
  36. )
  37. if settings.demand_pool_daily_strategy_alert_enabled:
  38. scheduler.add_job(
  39. demand_pool_daily_strategy_alert_job,
  40. trigger=CronTrigger(
  41. hour=settings.demand_pool_daily_strategy_alert_hour,
  42. minute=settings.demand_pool_daily_strategy_alert_minute,
  43. timezone=_CRON_TZ,
  44. ),
  45. id="demand_pool_daily_strategy_alert_job",
  46. replace_existing=True,
  47. max_instances=1,
  48. coalesce=True,
  49. )
  50. def start_scheduler() -> None:
  51. if scheduler.running:
  52. return
  53. setup_jobs()
  54. scheduler.start()
  55. if settings.demand_pool_hourly_sync_enabled:
  56. scheduler.add_job(
  57. demand_pool_today_incremental_sync_job,
  58. trigger=DateTrigger(
  59. run_date=datetime.now(ZoneInfo(_CRON_TZ)),
  60. timezone=_CRON_TZ,
  61. ),
  62. id="demand_pool_startup_sync_job",
  63. replace_existing=True,
  64. max_instances=1,
  65. coalesce=True,
  66. )
  67. def shutdown_scheduler() -> None:
  68. if not scheduler.running:
  69. return
  70. scheduler.shutdown(wait=False)