job_redis_data.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import datetime
  2. import time
  3. import schedule
  4. from common.redis import install_video_data
  5. def bot_video_ai_top():
  6. """头部"""
  7. dt = datetime.datetime.now().strftime('%Y%m%d')
  8. print(f"开始执行头部{dt}")
  9. # dt = '20241014'
  10. redis_task = 'task:video_ai'
  11. table_name = 'content_ai_tag_return_top'
  12. install_video_data(dt, redis_task, table_name)
  13. def bot_video_ai_recommend():
  14. """新推荐"""
  15. dt = datetime.datetime.now().strftime('%Y%m%d%H')
  16. print(f"开始执行新推荐{dt}")
  17. # dt = '2024101514'
  18. redis_task = 'task:video_ai'
  19. table_name = 'content_ai_tag_recommend'
  20. install_video_data(dt, redis_task, table_name)
  21. def bot_video_ai_complex_mode():
  22. """复推"""
  23. dt = datetime.datetime.now().strftime('%Y%m%d%H')
  24. print(f"开始执行复推{dt}")
  25. # dt = '2024101514'
  26. redis_task = 'task:video_ai'
  27. table_name = 'content_ai_tag_reflowpool'
  28. install_video_data(dt, redis_task, table_name)
  29. def schedule_tasks():
  30. schedule.every().hour.at(":20").do(bot_video_ai_complex_mode)
  31. schedule.every().hour.at(":22").do(bot_video_ai_recommend)
  32. # 每天 00:10 执行
  33. schedule.every().day.at("01:25").do(bot_video_ai_top)
  34. if __name__ == "__main__":
  35. schedule_tasks() # 调用任务调度函数
  36. while True:
  37. schedule.run_pending()
  38. time.sleep(1) # 每秒钟检查一次
  39. # bot_video_ai_top()
  40. # bot_video_ai_recommend()
  41. # bot_video_ai_complex_mode()