job_video_processing.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. from loguru import logger
  4. from video_processing.video_processing import VideoProcessing
  5. max_workers = 1 # 最大线程数
  6. def video_ai_task_start():
  7. redis_task_list = ['task:video_ai_recommend', 'task:video_ai_top']
  8. futures = [] # 用来存储任务的 Future 对象
  9. task_index = 0
  10. with ThreadPoolExecutor( max_workers=max_workers ) as executor:
  11. while True:
  12. futures = [f for f in futures if not f.done()]
  13. if len(futures) < max_workers:
  14. try:
  15. # 提交新任务
  16. future = executor.submit( process_video_ai, redis_task_list[task_index] )
  17. futures.append( future )
  18. task_index += 1
  19. if task_index >= len( redis_task_list ):
  20. task_index = 0
  21. time.sleep( 1 )
  22. except Exception as e:
  23. logger.error( f"异常信息: {e}" )
  24. time.sleep(1)
  25. else:
  26. time.sleep(1)
  27. def process_video_ai(redis_task):
  28. try:
  29. logger.info( f"开始执行任务: {redis_task}" )
  30. video_processor = VideoProcessing()
  31. video_processor.get_video( redis_task )
  32. logger.success( f"执行完成: {redis_task}" )
  33. time.sleep( 1 ) # 模拟处理时间
  34. except Exception as e:
  35. logger.error( f"处理任务时出现异常: {e}" )
  36. time.sleep( 1 ) # 出现异常时,等待5秒再处理
  37. if __name__ == '__main__':
  38. video_ai_task_start()