""" @author: luojunhui 投流每日任务 """ import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from dailyTasks import updateFromOdps async def asyncUpdatePQVideosTask(): """ 更新任务 :return: """ ufo = updateFromOdps() video_list = ufo.getVideoFromOdps() await ufo.insertIntoDB(data_list=video_list) def asyncScheduleJob(): """ 更新代码 :return: """ scheduler = AsyncIOScheduler() # 设置一个cron触发器,每天的9点执行任务 trigger = CronTrigger(hour=10, minute=0) scheduler.add_job(asyncUpdatePQVideosTask, trigger) scheduler.start() if __name__ == '__main__': loop = asyncio.get_event_loop() asyncScheduleJob() try: loop.run_forever() # 保持事件循环运行 except (KeyboardInterrupt, SystemExit): pass