""" @author: luojunhui 投流每日任务 """ import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from dailyTasks import updateFromOdps async def asyncUpdatePQVideosTask(): """ 更新任务 :return: """ ufo = updateFromOdps() video_list = ufo.getVideoFromOdps() await ufo.insertIntoDB(data_list=video_list) def asyncScheduleJob(): """ 更新代码 :return: """ scheduler = AsyncIOScheduler() # 设置一个cron触发器,hour是整时,minute是分钟 trigger = CronTrigger(hour=10, minute=26) scheduler.add_job(asyncUpdatePQVideosTask, trigger) scheduler.start() if __name__ == '__main__': # 直接执行 asyncio.run(asyncUpdatePQVideosTask()) # 定时执行 # loop = asyncio.get_event_loop() # asyncScheduleJob() # try: # loop.run_forever() # 保持事件循环运行 # except (KeyboardInterrupt, SystemExit): # pass