zhangyong 6 kuukautta sitten
vanhempi
commit
96f76e50a6
1 muutettua tiedostoa jossa 32 lisäystä ja 21 poistoa
  1. 32 21
      job_video_processing.py

+ 32 - 21
job_video_processing.py

@@ -1,31 +1,42 @@
 import time
-from concurrent.futures import ThreadPoolExecutor, wait
-from common.redis import install_video_data
 from video_processing.video_processing import VideoProcessing
+from concurrent.futures import ThreadPoolExecutor, wait, as_completed
 
 
-max_workers = 10
+max_workers = 6
 
 def video_ai_task_start():
-    with ThreadPoolExecutor( max_workers=max_workers) as executor:
+    redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3  # 模拟任务列表
+    task_index = 0
+    total_tasks = len(redis_task_list)  # 总任务数
+
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        futures = []
         while True:
-            try:
-                redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 3
-                futures = []
-                start_time = time.time()  # 记录开始时间
-
-                for redis_task in redis_task_list:
-                    futures.append(executor.submit( process_video_ai, redis_task))
-                    time.sleep(1)  # 每秒提交一个任务
-                wait( futures )  # 等待所有任务完成
-                end_time = time.time()  # 记录结束时间
-
-                total_time = end_time - start_time  # 计算总耗时
-                print( f"6个线程完成任务总耗时:{total_time:.2f}秒" )
-            except Exception as e:
-                print(f"异常信息{e}")
-                time.sleep(3)
-                continue
+            while task_index < total_tasks:
+                try:
+                    # 如果当前正在执行的任务少于最大并发数,则继续提交新任务
+                    while len(futures) < max_workers and task_index < total_tasks:
+                        redis_task = redis_task_list[task_index]
+                        futures.append(executor.submit(process_video_ai, redis_task))
+                        print(f"提交任务: {redis_task}")
+                        task_index += 1
+                        time.sleep(1)  # 每秒提交一个任务
+
+                    # 检查已经完成的任务并移除
+                    for future in as_completed(futures):
+                        futures.remove(future)
+                        print(f"完成一个任务,当前运行中的任务数:{len(futures)}")
+                        break  # 每次移除一个完成的任务,继续提交新任务
+
+                except Exception as e:
+                    print(f"异常信息: {e}")
+                    time.sleep(3)
+                    continue  # 继续循环处理下一个任务
+            # 等待所有任务完成
+            wait(futures)
+            print("所有任务完成")
+
 
 def process_video_ai(redis_task):
     try: