job_video_processing.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import time
  2. from video_processing.video_processing import VideoProcessing
  3. from concurrent.futures import ThreadPoolExecutor, wait, as_completed
  4. max_workers = 6
  5. def video_ai_task_start():
  6. redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3 # 模拟任务列表
  7. task_index = 0
  8. total_tasks = len(redis_task_list) # 总任务数
  9. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  10. futures = []
  11. while True:
  12. while task_index < total_tasks:
  13. try:
  14. # 如果当前正在执行的任务少于最大并发数,则继续提交新任务
  15. while len(futures) < max_workers and task_index < total_tasks:
  16. redis_task = redis_task_list[task_index]
  17. futures.append(executor.submit(process_video_ai, redis_task))
  18. print(f"提交任务: {redis_task}")
  19. task_index += 1
  20. time.sleep(1) # 每秒提交一个任务
  21. # 检查已经完成的任务并移除
  22. for future in as_completed(futures):
  23. futures.remove(future)
  24. print(f"完成一个任务,当前运行中的任务数:{len(futures)}")
  25. break # 每次移除一个完成的任务,继续提交新任务
  26. except Exception as e:
  27. print(f"异常信息: {e}")
  28. time.sleep(3)
  29. continue # 继续循环处理下一个任务
  30. # 等待所有任务完成
  31. wait(futures)
  32. print("所有任务完成")
  33. def process_video_ai(redis_task):
  34. try:
  35. print(f"开始执行任务{redis_task}")
  36. video_processor = VideoProcessing()
  37. video_processor.get_video(redis_task)
  38. print(f"执行完成{redis_task}")
  39. time.sleep(5)
  40. except Exception as e:
  41. print("处理任务时出现异常:", e)
  42. time.sleep(5)
  43. if __name__ == '__main__':
  44. video_ai_task_start()