""" @author: luojunhui 投流每日任务 """ import asyncio from datetime import datetime, timedelta from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from dailyTasks import updateFromOdps def getYesterdayStr(): """获取昨天的日期字符""" today = datetime.now() # 计算昨天的日期 yesterday = today - timedelta(days=1) return yesterday.strftime('%Y%m%d') async def asyncUpdatePQVideosTask(): """ 更新任务 :return: """ date_info = getYesterdayStr() ufo = updateFromOdps() video_list = ufo.getVideoFromOdps(date_info) await ufo.insertIntoDB(data_list=video_list) if __name__ == '__main__': # 直接执行 # asyncio.run(asyncUpdatePQVideosTask()) # 定时执行 scheduler = AsyncIOScheduler() # 早上9点10分执行更新视频任务 trigger_update_videos = CronTrigger(hour=9, minute=10) scheduler.add_job(asyncUpdatePQVideosTask, trigger_update_videos) scheduler.start() loop = asyncio.get_event_loop() try: loop.run_forever() # 保持事件循环运行 except (KeyboardInterrupt, SystemExit): pass