zhangyong 6 months ago
parent
commit
64febdffdf
2 changed files with 35 additions and 40 deletions
  1. 32 32
      analyze_video.py
  2. 3 8
      job_video_processing.py

+ 32 - 32
analyze_video.py

@@ -1,43 +1,43 @@
 import uvicorn
+import asyncio
 from fastapi import FastAPI
 from pydantic import BaseModel
 from google_ai.generativeai_video import main
 
 app = FastAPI()
-api_keys = ["AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs", "AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8"]
-api_key_index = 0  # 初始化索引
+api_keys = [
+    {'key': 'AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs', 'locked': False},
+    {'key': 'AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8', 'locked': False},
+]
+lock = asyncio.Lock()
+
 class VideoRequest(BaseModel):
     video_path: str
     prompt: str
     mark: str
     sample_data: str
-#
-# @app.post("/process_video_test/")
-# async def process_video_test(request: VideoRequest):
-#     """处理视频请求"""
-#     video_path = request.video_path
-#     prompt = request.prompt
-#     mark = request.mark
-#     sample_data = request.sample_data
-#
-#     api_key = "AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs"
-#     try:
-#         print("来一个请求,使用 API key:", api_key)
-#         result = await main(video_path, api_key, prompt, mark, sample_data)
-#         return {
-#             "code": 0,
-#             "message": "视频处理成功",
-#             "result": result,
-#             "mark": mark
-#         }
-#     except Exception as e:
-#         print(f"视频处理失败: {str(e)}")
-#         return {
-#             "code": 1,
-#             "message": f"处理失败: {e}",
-#             "result": f"处理失败: {e}",
-#             "mark": f"处理失败: {e}"
-#         }
+
+async def get_available_api_key():
+    """获取一个未锁定的 API key,如果没有可用的则等待 5 秒后重试"""
+    while True:
+        async with lock:
+            for key_data in api_keys:
+                if not key_data['locked']:
+                    key_data['locked'] = True  # 锁定该 key
+                    return key_data['key']
+        print( "没有可用的 API key,等待 5 秒后重试..." )
+        await asyncio.sleep( 5 )
+
+async def release_api_key(api_key):
+    """释放已锁定的 API key,并将其放到列表末尾"""
+    async with lock:
+        for i, key_data in enumerate( api_keys ):
+            if key_data['key'] == api_key:
+                key_data['locked'] = False  # 释放该 key
+                # 将释放的 key 移动到列表末尾
+                api_keys.append( api_keys.pop( i ) )
+                break
+
 
 @app.post("/process_video/")
 async def process_video(request: VideoRequest):
@@ -47,8 +47,7 @@ async def process_video(request: VideoRequest):
     prompt = request.prompt
     mark = request.mark
     sample_data = request.sample_data
-    api_key = api_keys[api_key_index]
-    api_key_index = (api_key_index + 1) % len(api_keys)
+    api_key = await get_available_api_key()
     try:
         print("来一个请求,使用 API key:", api_key)
         result = await main(video_path, api_key, prompt, sample_data)
@@ -66,7 +65,8 @@ async def process_video(request: VideoRequest):
             "result": f"处理失败: {e}",
             "mark": f"处理失败: {e}"
         }
-
+    finally:
+        await release_api_key( api_key )
 
 
 if __name__ == "__main__":

+ 3 - 8
job_video_processing.py

@@ -6,26 +6,21 @@ max_workers = 2  # 最大线程数
 
 
 def video_ai_task_start():
-    redis_task_list = ['task:video_ai_recommend', 'task:video_ai_recommend', 'task:video_ai_recommend', 'task:video_ai_top', 'task:video_ai_top']
+    redis_task_list = ['task:video_ai_recommend', 'task:video_ai_top']
     futures = []  # 用来存储任务的 Future 对象
     task_index = 0
-
     with ThreadPoolExecutor( max_workers=max_workers ) as executor:
         while True:
             futures = [f for f in futures if not f.done()]
-
             if len(futures) < max_workers:
                 try:
                     # 提交新任务
                     future = executor.submit( process_video_ai, redis_task_list[task_index] )
                     futures.append( future )
-
                     task_index += 1
                     if task_index >= len( redis_task_list ):
                         task_index = 0
-
                     time.sleep( 1 )
-
                 except Exception as e:
                     print( f"异常信息: {e}" )
                     time.sleep(1)
@@ -39,10 +34,10 @@ def process_video_ai(redis_task):
         video_processor = VideoProcessing()
         video_processor.get_video( redis_task )
         print( f"执行完成: {redis_task}" )
-        time.sleep( 5 )  # 模拟处理时间
+        time.sleep( 1 )  # 模拟处理时间
     except Exception as e:
         print( f"处理任务时出现异常: {e}" )
-        time.sleep( 5 )  # 出现异常时,等待5秒再处理
+        time.sleep( 1 )  # 出现异常时,等待5秒再处理
 
 
 if __name__ == '__main__':