""" @author: luojunhui 投流每日任务 """ import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from dailyTasks import updateFromOdps async def asyncUpdatePQVideosTask(): """ 更新任务 :return: """ ufo = updateFromOdps() video_list = ufo.getVideoFromOdps() await ufo.insertIntoDB(data_list=video_list) if __name__ == '__main__': # 直接执行 # asyncio.run(asyncUpdatePQVideosTask()) # 定时执行 scheduler = AsyncIOScheduler() # 早上9点10分执行更新视频任务 trigger_update_videos = CronTrigger(hour=9, minute=10) scheduler.add_job(asyncUpdatePQVideosTask, trigger_update_videos) scheduler.start() loop = asyncio.get_event_loop() try: loop.run_forever() # 保持事件循环运行 except (KeyboardInterrupt, SystemExit): pass