job_video_processing.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.redis import install_video_data
  4. from video_processing.video_processing import VideoProcessing
  5. def video_ai_task_start():
  6. max_workers = 10 # 最大线程数
  7. redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend']
  8. # 创建线程池
  9. with ThreadPoolExecutor( max_workers=max_workers ) as executor:
  10. futures = []
  11. task_index = 0
  12. while True:
  13. # 检查已经完成的任务
  14. futures = [f for f in futures if not f.done()]
  15. if len( futures ) < max_workers:
  16. try:
  17. # 提交任务并将 Future 对象添加到列表
  18. future = executor.submit( process_video_ai, redis_task_list[task_index] )
  19. futures.append( future )
  20. task_index += 1
  21. if task_index >= len( redis_task_list ):
  22. task_index = 0 # 重置索引
  23. time.sleep( 1 ) # 每秒提交一个任务
  24. except Exception as e:
  25. print( f"异常信息: {e}" )
  26. time.sleep( 3 ) # 等待3秒后重试
  27. def process_video_ai(redis_task):
  28. try:
  29. print(f"开始执行任务{redis_task}")
  30. video_processor = VideoProcessing()
  31. video_processor.get_video(redis_task)
  32. print(f"执行完成{redis_task}")
  33. time.sleep(5)
  34. except Exception as e:
  35. print("处理任务时出现异常:", e)
  36. time.sleep(5)
  37. if __name__ == '__main__':
  38. video_ai_task_start()