zhangyong 6 tháng trước cách đây
mục cha
commit
62281dc619
1 tập tin đã thay đổi với 24 bổ sung24 xóa
  1. 24 24
      job_video_processing.py

+ 24 - 24
job_video_processing.py

@@ -7,36 +7,36 @@ max_workers = 6
 
 def video_ai_task_start():
     redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3  # 模拟任务列表
-    task_index = 0
-    total_tasks = len(redis_task_list)  # 总任务数
 
-    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+    with ThreadPoolExecutor( max_workers=max_workers ) as executor:
         futures = []
+        task_index = 0
+        total_tasks = len( redis_task_list )
+
         while True:
-            while task_index < total_tasks:
-                try:
-                    # 如果当前正在执行的任务少于最大并发数,则继续提交新任务
-                    while len(futures) < max_workers and task_index < total_tasks:
+            try:
+                while len( futures ) < max_workers and task_index < total_tasks:
+                    redis_task = redis_task_list[task_index]
+                    futures.append( executor.submit( process_video_ai, redis_task ) )
+                    print( f"提交任务: {redis_task}" )
+                    task_index += 1
+                    time.sleep( 1 )
+
+                for future in as_completed( futures ):
+                    futures.remove( future )
+                    print( f"完成一个任务,当前运行中的任务数:{len( futures )}" )
+
+                    if task_index < total_tasks:
                         redis_task = redis_task_list[task_index]
-                        futures.append(executor.submit(process_video_ai, redis_task))
-                        print(f"提交任务: {redis_task}")
+                        futures.append( executor.submit( process_video_ai, redis_task ) )
+                        print( f"补充新任务: {redis_task}" )
                         task_index += 1
-                        time.sleep(1)  # 每秒提交一个任务
-
-                    # 检查已经完成的任务并移除
-                    for future in as_completed(futures):
-                        futures.remove(future)
-                        print(f"完成一个任务,当前运行中的任务数:{len(futures)}")
-                        break  # 每次移除一个完成的任务,继续提交新任务
-
-                except Exception as e:
-                    print(f"异常信息: {e}")
-                    time.sleep(3)
-                    continue  # 继续循环处理下一个任务
-            # 等待所有任务完成
-            wait(futures)
-            print("所有任务完成")
+                        time.sleep( 1 )
 
+            except Exception as e:
+                print( f"异常信息: {e}" )
+                time.sleep( 3 )
+                continue
 
 def process_video_ai(redis_task):
     try: