import time from video_processing.video_processing import VideoProcessing from concurrent.futures import ThreadPoolExecutor, wait, as_completed max_workers = 6 def video_ai_task_start(): redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3 # 模拟任务列表 task_index = 0 total_tasks = len(redis_task_list) # 总任务数 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] while True: while task_index < total_tasks: try: # 如果当前正在执行的任务少于最大并发数,则继续提交新任务 while len(futures) < max_workers and task_index < total_tasks: redis_task = redis_task_list[task_index] futures.append(executor.submit(process_video_ai, redis_task)) print(f"提交任务: {redis_task}") task_index += 1 time.sleep(1) # 每秒提交一个任务 # 检查已经完成的任务并移除 for future in as_completed(futures): futures.remove(future) print(f"完成一个任务,当前运行中的任务数:{len(futures)}") break # 每次移除一个完成的任务,继续提交新任务 except Exception as e: print(f"异常信息: {e}") time.sleep(3) continue # 继续循环处理下一个任务 # 等待所有任务完成 wait(futures) print("所有任务完成") def process_video_ai(redis_task): try: print(f"开始执行任务{redis_task}") video_processor = VideoProcessing() video_processor.get_video(redis_task) print(f"执行完成{redis_task}") time.sleep(5) except Exception as e: print("处理任务时出现异常:", e) time.sleep(5) if __name__ == '__main__': video_ai_task_start()