12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from common.redis import install_video_data
- from video_processing.video_processing import VideoProcessing
- def video_ai_task_start():
- max_workers = 10 # 最大线程数
- redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend']
- # 创建线程池
- with ThreadPoolExecutor( max_workers=max_workers ) as executor:
- futures = []
- task_index = 0
- while True:
- # 检查已经完成的任务
- futures = [f for f in futures if not f.done()]
- if len( futures ) < max_workers:
- try:
- # 提交任务并将 Future 对象添加到列表
- future = executor.submit( process_video_ai, redis_task_list[task_index] )
- futures.append( future )
- task_index += 1
- if task_index >= len( redis_task_list ):
- task_index = 0 # 重置索引
- time.sleep( 1 ) # 每秒提交一个任务
- except Exception as e:
- print( f"异常信息: {e}" )
- time.sleep( 3 ) # 等待3秒后重试
- def process_video_ai(redis_task):
- try:
- print(f"开始执行任务{redis_task}")
- video_processor = VideoProcessing()
- video_processor.get_video(redis_task)
- print(f"执行完成{redis_task}")
- time.sleep(5)
- except Exception as e:
- print("处理任务时出现异常:", e)
- time.sleep(5)
- if __name__ == '__main__':
- video_ai_task_start()
|