touliu_schedule_task.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. """
  2. @author: luojunhui
  3. 投流每日任务
  4. """
  5. import asyncio
  6. from datetime import datetime, timedelta
  7. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  8. from apscheduler.triggers.cron import CronTrigger
  9. from dailyTasks import updateFromOdps
  10. def getYesterdayStr():
  11. """获取昨天的日期字符"""
  12. today = datetime.now()
  13. # 计算昨天的日期
  14. yesterday = today - timedelta(days=1)
  15. return yesterday.strftime('%Y%m%d')
  16. async def asyncUpdatePQVideosTask():
  17. """
  18. 更新任务
  19. :return:
  20. """
  21. date_info = getYesterdayStr()
  22. ufo = updateFromOdps()
  23. video_list = ufo.getVideoFromOdps(date_info)
  24. await ufo.insertIntoDB(data_list=video_list)
  25. if __name__ == '__main__':
  26. # 直接执行
  27. # asyncio.run(asyncUpdatePQVideosTask())
  28. # 定时执行
  29. scheduler = AsyncIOScheduler()
  30. # 早上9点10分执行更新视频任务
  31. trigger_update_videos = CronTrigger(hour=9, minute=10)
  32. scheduler.add_job(asyncUpdatePQVideosTask, trigger_update_videos)
  33. scheduler.start()
  34. loop = asyncio.get_event_loop()
  35. try:
  36. loop.run_forever() # 保持事件循环运行
  37. except (KeyboardInterrupt, SystemExit):
  38. pass