job_redis_data.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import datetime
  2. import time
  3. import schedule
  4. from common.redis import install_video_data
  5. def bot_video_ai_top():
  6. """当日头部"""
  7. dt = datetime.datetime.now().strftime('%Y%m%d')
  8. print(f"开始执行头部{dt}")
  9. redis_task = 'task:video_ai_top'
  10. table_name = 'content_ai_tag_return_top_merge'
  11. install_video_data(dt, redis_task, table_name)
  12. def bot_video_ai_recommend():
  13. """新推荐"""
  14. dt = datetime.datetime.now().strftime('%Y%m%d%H')
  15. print(f"开始执行新推荐{dt}")
  16. redis_task = 'task:video_ai_recommend'
  17. table_name = 'content_ai_tag_recommend'
  18. install_video_data(dt, redis_task, table_name)
  19. def schedule_tasks():
  20. schedule.every().hour.at(":22").do(bot_video_ai_recommend)
  21. schedule.every().day.at("01:25").do(bot_video_ai_top)
  22. if __name__ == "__main__":
  23. schedule_tasks() # 调用任务调度函数
  24. while True:
  25. schedule.run_pending()
  26. time.sleep(1) # 每秒钟检查一次
  27. # bot_video_ai_top()
  28. # bot_video_ai_recommend()