import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from app.services.collector import CollectorService from app.core.config import settings from app.core.logger import logger async def scheduled_task(): """定时任务包装器""" service = CollectorService() try: await service.run_pipeline() finally: await service.close() async def start_scheduler(): """启动调度器""" logger.info("微信指数采集初始化...") scheduler = AsyncIOScheduler() # 添加定时任务 scheduler.add_job( scheduled_task, 'cron', hour='9-23', # 不设置 hour 参数,代表每小时都跑 minute=settings.CRON_MINUTE ) # (可选) 启动时立即执行一次,方便测试,生产环境可注释 # scheduler.add_job(scheduled_task, 'date') scheduler.start() logger.info(f"⏰ 调度开始,每小时的 {settings.CRON_MINUTE:02d} 分执行") # 阻塞主线程,保持 Docker 容器运行 try: while True: await asyncio.sleep(1) except (KeyboardInterrupt, SystemExit): scheduler.shutdown() def main(): # 使用 asyncio.run() 来运行异步主函数 asyncio.run(start_scheduler()) if __name__ == "__main__": main() # print(settings.DATABASE_URL)