123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- """
- @author: luojunhui
- 投流每日任务
- """
- import asyncio
- from datetime import datetime, timedelta
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- from dailyTasks import updateFromOdps
- def getYesterdayStr():
- """获取昨天的日期字符"""
- today = datetime.now()
- # 计算昨天的日期
- yesterday = today - timedelta(days=1)
- return yesterday.strftime('%Y%m%d')
- async def asyncUpdatePQVideosTask():
- """
- 更新任务
- :return:
- """
- date_info = getYesterdayStr()
- ufo = updateFromOdps()
- video_list = ufo.getVideoFromOdps(date_info)
- await ufo.insertIntoDB(data_list=video_list)
- if __name__ == '__main__':
- # 直接执行
- # asyncio.run(asyncUpdatePQVideosTask())
- # 定时执行
- scheduler = AsyncIOScheduler()
- # 早上9点10分执行更新视频任务
- trigger_update_videos = CronTrigger(hour=9, minute=10)
- scheduler.add_job(asyncUpdatePQVideosTask, trigger_update_videos)
- scheduler.start()
- loop = asyncio.get_event_loop()
- try:
- loop.run_forever() # 保持事件循环运行
- except (KeyboardInterrupt, SystemExit):
- pass
|