import datetime import time import schedule from common.redis import install_video_data def bot_video_ai_top(): """头部""" dt = datetime.datetime.now().strftime('%Y%m%d') print(f"开始执行头部{dt}") # dt = '20241014' redis_task = 'task:video_ai' table_name = 'content_ai_tag_return_top' install_video_data(dt, redis_task, table_name) def bot_video_ai_recommend(): """新推荐""" dt = datetime.datetime.now().strftime('%Y%m%d%H') print(f"开始执行新推荐{dt}") # dt = '2024101514' redis_task = 'task:video_ai' table_name = 'content_ai_tag_recommend' install_video_data(dt, redis_task, table_name) def bot_video_ai_complex_mode(): """复推""" dt = datetime.datetime.now().strftime('%Y%m%d%H') print(f"开始执行复推{dt}") # dt = '2024101514' redis_task = 'task:video_ai' table_name = 'content_ai_tag_reflowpool' install_video_data(dt, redis_task, table_name) def schedule_tasks(): schedule.every().hour.at(":20").do(bot_video_ai_complex_mode) schedule.every().hour.at(":22").do(bot_video_ai_recommend) # 每天 00:10 执行 schedule.every().day.at("01:25").do(bot_video_ai_top) if __name__ == "__main__": schedule_tasks() # 调用任务调度函数 while True: schedule.run_pending() time.sleep(1) # 每秒钟检查一次 # bot_video_ai_top() # bot_video_ai_recommend() # bot_video_ai_complex_mode()