import datetime import time import schedule from common.redis import install_video_data, install_ad_video_data def bot_video_ai_top(): """当日头部""" try: dt = datetime.datetime.now().strftime('%Y%m%d') print(f"开始执行头部{dt}") redis_task = 'task:video_ai_top' table_name = 'content_ai_tag_return_top_merge' install_video_data(dt, redis_task, table_name) except Exception as e: print(f"当日头部异常了{e}") def bot_video_ai_pq(): """票圈推荐个性化/标签体系/""" try: dt = datetime.datetime.now().strftime('%Y%m%d') print(f"开始执行票圈推荐个性化{dt}") redis_task = 'task:video_ai_pq' table_name = 'history_top2000_add_aitags' install_video_data(dt, redis_task, table_name) except Exception as e: print(f"票圈推荐个性化异常了{e}") def bot_video_ai_recommend(): """新推荐""" try: dt = datetime.datetime.now().strftime('%Y%m%d%H') print(f"开始执行新推荐{dt}") redis_task = 'task:video_ai_recommend' table_name = 'content_ai_tag_recommend' install_video_data(dt, redis_task, table_name) except Exception as e: print(f"新推荐异常了{e}") def ab_video_ai_recommend(): """广告""" try: print(f"开始执行广告") redis_task = 'task:ad_video_recommend' install_ad_video_data(redis_task) except Exception as e: print(f"广告异常了{e}") def schedule_tasks(): schedule.every().hour.at(":22").do(bot_video_ai_recommend) schedule.every().day.at("02:25").do(bot_video_ai_pq) schedule.every().day.at("01:25").do(bot_video_ai_top) schedule.every().day.at("23:25").do(ab_video_ai_recommend) if __name__ == "__main__": schedule_tasks() # 调用任务调度函数 while True: schedule.run_pending() time.sleep(1) # 每秒钟检查一次 # ab_video_ai_recommend() # bot_video_ai_top() # bot_video_ai_recommend() # bot_video_ai_pq()