zhangyong 6 months ago
parent
commit
28c6a14cde
1 changed files with 23 additions and 18 deletions
  1. 23 18
      job_video_processing.py

+ 23 - 18
job_video_processing.py

@@ -4,27 +4,32 @@ from common.redis import install_video_data
 from video_processing.video_processing import VideoProcessing
 
 
-max_workers = 10
-
 def video_ai_task_start():
-    with ThreadPoolExecutor( max_workers=max_workers) as executor:
-        redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend']
-        # 任务索引
+    max_workers = 10  # 最大线程数
+    redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend']
+    # 创建线程池
+    with ThreadPoolExecutor( max_workers=max_workers ) as executor:
+        futures = []
         task_index = 0
+
         while True:
-            try:
-                # 提交任务
-                executor.submit( process_video_ai, redis_task_list[task_index] )
-
-                task_index += 1
-                if task_index >= len( redis_task_list ):
-                    task_index = 0  # 重置索引
-
-                time.sleep( 1 )  # 每秒提交一个任务
-            except Exception as e:
-                print( f"异常信息: {e}" )
-                time.sleep( 3 )  # 等待3秒后重试
-                continue
+            # 检查已经完成的任务
+            futures = [f for f in futures if not f.done()]
+            if len( futures ) < max_workers:
+                try:
+                    # 提交任务并将 Future 对象添加到列表
+                    future = executor.submit( process_video_ai, redis_task_list[task_index] )
+                    futures.append( future )
+
+                    task_index += 1
+                    if task_index >= len( redis_task_list ):
+                        task_index = 0  # 重置索引
+
+                    time.sleep( 1 )  # 每秒提交一个任务
+
+                except Exception as e:
+                    print( f"异常信息: {e}" )
+                    time.sleep( 3 )  # 等待3秒后重试
 
 def process_video_ai(redis_task):
     try: