job_video_processing.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. from video_processing.video_processing import VideoProcessing
  4. max_workers = 1 # 最大线程数
  5. def video_ai_task_start():
  6. redis_task_list = ['task:video_ai_recommend', 'task:video_ai_top']
  7. futures = [] # 用来存储任务的 Future 对象
  8. task_index = 0
  9. with ThreadPoolExecutor( max_workers=max_workers ) as executor:
  10. while True:
  11. futures = [f for f in futures if not f.done()]
  12. if len(futures) < max_workers:
  13. try:
  14. # 提交新任务
  15. future = executor.submit( process_video_ai, redis_task_list[task_index] )
  16. futures.append( future )
  17. task_index += 1
  18. if task_index >= len( redis_task_list ):
  19. task_index = 0
  20. time.sleep( 1 )
  21. except Exception as e:
  22. print( f"异常信息: {e}" )
  23. time.sleep(1)
  24. else:
  25. time.sleep(1)
  26. def process_video_ai(redis_task):
  27. try:
  28. print( f"开始执行任务: {redis_task}" )
  29. video_processor = VideoProcessing()
  30. video_processor.get_video( redis_task )
  31. print( f"执行完成: {redis_task}" )
  32. time.sleep( 1 ) # 模拟处理时间
  33. except Exception as e:
  34. print( f"处理任务时出现异常: {e}" )
  35. time.sleep( 1 ) # 出现异常时,等待5秒再处理
  36. if __name__ == '__main__':
  37. video_ai_task_start()