main.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import asyncio
  2. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  3. from app.services.collector import CollectorService
  4. from app.core.config import settings
  5. from app.core.logger import logger
  6. async def scheduled_task():
  7. """定时任务包装器"""
  8. service = CollectorService()
  9. try:
  10. await service.run_pipeline()
  11. finally:
  12. await service.close()
  13. async def start_scheduler():
  14. """启动调度器"""
  15. logger.info("微信指数采集初始化...")
  16. scheduler = AsyncIOScheduler()
  17. # 添加定时任务
  18. scheduler.add_job(
  19. scheduled_task,
  20. 'cron',
  21. hour='9-23', # 不设置 hour 参数,代表每小时都跑
  22. minute=settings.CRON_MINUTE
  23. )
  24. # (可选) 启动时立即执行一次,方便测试,生产环境可注释
  25. # scheduler.add_job(scheduled_task, 'date')
  26. scheduler.start()
  27. logger.info(f"⏰ 调度开始,每小时的 {settings.CRON_MINUTE:02d} 分执行")
  28. # 阻塞主线程,保持 Docker 容器运行
  29. try:
  30. while True:
  31. await asyncio.sleep(1)
  32. except (KeyboardInterrupt, SystemExit):
  33. scheduler.shutdown()
  34. def main():
  35. # 使用 asyncio.run() 来运行异步主函数
  36. asyncio.run(start_scheduler())
  37. if __name__ == "__main__":
  38. main()
  39. # print(settings.DATABASE_URL)