| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- import asyncio
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from app.services.collector import CollectorService
- from app.core.config import settings
- from app.core.logger import logger
- async def scheduled_task():
- """定时任务包装器"""
- service = CollectorService()
- try:
- await service.run_pipeline()
- finally:
- await service.close()
- async def start_scheduler():
- """启动调度器"""
- logger.info("微信指数采集初始化...")
- scheduler = AsyncIOScheduler()
- # 添加定时任务
- scheduler.add_job(
- scheduled_task,
- 'cron',
- hour='9-23', # 不设置 hour 参数,代表每小时都跑
- minute=settings.CRON_MINUTE
- )
- # (可选) 启动时立即执行一次,方便测试,生产环境可注释
- # scheduler.add_job(scheduled_task, 'date')
- scheduler.start()
- logger.info(f"⏰ 调度开始,每小时的 {settings.CRON_MINUTE:02d} 分执行")
- # 阻塞主线程,保持 Docker 容器运行
- try:
- while True:
- await asyncio.sleep(1)
- except (KeyboardInterrupt, SystemExit):
- scheduler.shutdown()
- def main():
- # 使用 asyncio.run() 来运行异步主函数
- asyncio.run(start_scheduler())
- if __name__ == "__main__":
- main()
- # print(settings.DATABASE_URL)
|