1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import time
- from video_processing.video_processing import VideoProcessing
- from concurrent.futures import ThreadPoolExecutor, wait, as_completed
- max_workers = 6
- def video_ai_task_start():
- redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3 # 模拟任务列表
- with ThreadPoolExecutor( max_workers=max_workers ) as executor:
- futures = []
- task_index = 0
- total_tasks = len( redis_task_list )
- while True:
- try:
- while len( futures ) < max_workers and task_index < total_tasks:
- redis_task = redis_task_list[task_index]
- futures.append( executor.submit( process_video_ai, redis_task ) )
- print( f"提交任务: {redis_task}" )
- task_index += 1
- time.sleep( 1 )
- for future in as_completed( futures ):
- futures.remove( future )
- print( f"完成一个任务,当前运行中的任务数:{len( futures )}" )
- if task_index < total_tasks:
- redis_task = redis_task_list[task_index]
- futures.append( executor.submit( process_video_ai, redis_task ) )
- print( f"补充新任务: {redis_task}" )
- task_index += 1
- time.sleep( 1 )
- except Exception as e:
- print( f"异常信息: {e}" )
- time.sleep( 3 )
- continue
- def process_video_ai(redis_task):
- try:
- print(f"开始执行任务{redis_task}")
- video_processor = VideoProcessing()
- video_processor.get_video(redis_task)
- print(f"执行完成{redis_task}")
- time.sleep(5)
- except Exception as e:
- print("处理任务时出现异常:", e)
- time.sleep(5)
- if __name__ == '__main__':
- video_ai_task_start()
|