job_video_processing.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import time
  2. from video_processing.video_processing import VideoProcessing
  3. from concurrent.futures import ThreadPoolExecutor, wait, as_completed
  4. max_workers = 6
  5. def video_ai_task_start():
  6. redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3 # 模拟任务列表
  7. with ThreadPoolExecutor( max_workers=max_workers ) as executor:
  8. futures = []
  9. task_index = 0
  10. total_tasks = len( redis_task_list )
  11. while True:
  12. try:
  13. while len( futures ) < max_workers and task_index < total_tasks:
  14. redis_task = redis_task_list[task_index]
  15. futures.append( executor.submit( process_video_ai, redis_task ) )
  16. print( f"提交任务: {redis_task}" )
  17. task_index += 1
  18. time.sleep( 1 )
  19. for future in as_completed( futures ):
  20. futures.remove( future )
  21. print( f"完成一个任务,当前运行中的任务数:{len( futures )}" )
  22. if task_index < total_tasks:
  23. redis_task = redis_task_list[task_index]
  24. futures.append( executor.submit( process_video_ai, redis_task ) )
  25. print( f"补充新任务: {redis_task}" )
  26. task_index += 1
  27. time.sleep( 1 )
  28. except Exception as e:
  29. print( f"异常信息: {e}" )
  30. time.sleep( 3 )
  31. continue
  32. def process_video_ai(redis_task):
  33. try:
  34. print(f"开始执行任务{redis_task}")
  35. video_processor = VideoProcessing()
  36. video_processor.get_video(redis_task)
  37. print(f"执行完成{redis_task}")
  38. time.sleep(5)
  39. except Exception as e:
  40. print("处理任务时出现异常:", e)
  41. time.sleep(5)
  42. if __name__ == '__main__':
  43. video_ai_task_start()