12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- """
- @author: luojunhui
- 投流每日任务
- """
- import asyncio
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- from dailyTasks import updateFromOdps
- async def asyncUpdatePQVideosTask():
- """
- 更新任务
- :return:
- """
- ufo = updateFromOdps()
- video_list = ufo.getVideoFromOdps()
- await ufo.insertIntoDB(data_list=video_list)
- def asyncScheduleJob():
- """
- 更新代码
- :return:
- """
- scheduler = AsyncIOScheduler()
- # 设置一个cron触发器,hour是整时,minute是分钟
- trigger = CronTrigger(hour=10, minute=26)
- scheduler.add_job(asyncUpdatePQVideosTask, trigger)
- scheduler.start()
- if __name__ == '__main__':
- # 直接执行
- asyncio.run(asyncUpdatePQVideosTask())
- # 定时执行
- # loop = asyncio.get_event_loop()
- # asyncScheduleJob()
- # try:
- # loop.run_forever() # 保持事件循环运行
- # except (KeyboardInterrupt, SystemExit):
- # pass
|