zhangyong há 1 ano atrás
pai
commit
dfbd6ff3ef

+ 32 - 2
analyze_video.py

@@ -2,8 +2,10 @@ import uvicorn
 from fastapi import FastAPI
 from pydantic import BaseModel
 from google_ai.generativeai_video import main
+import itertools
 
 app = FastAPI()
+api_keys = itertools.cycle(["AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs", "AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8"])
 
 class VideoRequest(BaseModel):
     video_path: str
@@ -12,6 +14,34 @@ class VideoRequest(BaseModel):
     sample_data: str
 
 
+@app.post("/process_video_test/")
+async def process_video_test(request: VideoRequest):
+    """处理视频请求"""
+    video_path = request.video_path
+    prompt = request.prompt
+    mark = request.mark
+    sample_data = request.sample_data
+
+    api_key = "AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs"
+    try:
+        print("来一个请求,使用 API key:", api_key)
+        result, mark = await main(video_path, api_key, prompt, mark, sample_data)
+        return {
+            "code": 0,
+            "message": "视频处理成功",
+            "result": result,
+            "mark": mark
+        }
+    except Exception as e:
+        print(f"视频处理失败: {str(e)}")
+        return {
+            "code": 1,
+            "message": f"视频处理失败: {e}",
+            "result": f"视频处理失败: {e}",
+            "mark": f"视频处理失败: {e}"
+        }
+
+
 @app.post("/process_video/")
 async def process_video(request: VideoRequest):
     """处理视频请求"""
@@ -19,8 +49,8 @@ async def process_video(request: VideoRequest):
     prompt = request.prompt
     mark = request.mark
     sample_data = request.sample_data
-
-    api_key = "AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8"
+    # api_keys = ["AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs","AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8"]
+    api_key = next(api_keys)
     try:
         print("来一个请求,使用 API key:", api_key)
         result, mark = await main(video_path, api_key, prompt, mark, sample_data)

+ 10 - 10
google_ai/generativeai_video.py

@@ -1,18 +1,12 @@
 import asyncio
 import os
 import time
-
 import cv2
-import aiohttp
 import requests
 import google.generativeai as genai
-
 import uuid
-
 from google.generativeai.types import HarmCategory, HarmBlockThreshold
-
 from common import Common
-from common.feishu_data import Material
 
 
 class VideoAnalyzer:
@@ -63,6 +57,12 @@ class VideoAnalyzer:
         print("达到最大重试次数,视频下载失败")
         return None
 
+    async def delete_video(self):
+        try:
+            self.video_file.delete()
+        except Exception as e:
+            Common.logger( "ai" ).info( f'视频删除异常{e}' )
+
     async def upload_video(self, save_path, mime_type = None):
         """上传视频文件并获取视频文件对象"""
         try:
@@ -70,7 +70,6 @@ class VideoAnalyzer:
             await self._wait_for_processing()
         except Exception as e:
             Common.logger("ai").info(f'上传视频文件并获取视频文件对象失败异常信息{e}')
-            self.video_file.delete()
             return f"上传视频文件并获取视频文件对象失败:{e}"
 
     async def _wait_for_processing(self):
@@ -100,7 +99,6 @@ class VideoAnalyzer:
             return model
         except Exception as e:
             Common.logger("ai").info(f'视频创建缓存内容,并返回生成模型异常信息{e}')
-            self.video_file.delete()
             print(f"视频创建缓存内容,并返回生成模型异常信息{e}")
             return f"视频创建缓存内容,并返回生成模型异常信息{e}"
 
@@ -116,11 +114,9 @@ class VideoAnalyzer:
                 ]
             }
             response = chat_session.send_message( message_content )
-            self.video_file.delete()
             return response
         except Exception as e:
             Common.logger("ai").info(f'视频处理请求失败:{e}')
-            self.video_file.delete()
             print( f"视频处理请求失败:{e}" )
             return f"视频处理请求失败:{e}"
 
@@ -158,10 +154,12 @@ async def main(video_path, api_key, prompt, mark, sample_data):
             # 上传并处理视频
             upload_response = await analyzer.upload_video( save_path )
             if upload_response:
+                await analyzer.delete_video()
                 return upload_response, mark
             # 创建缓存模型
             model =await analyzer.create_cache()
             if isinstance( model, str ):
+                await analyzer.delete_video()
                 return model, mark
             print("创建缓存模型成功")
             # sample_data = {
@@ -199,12 +197,14 @@ async def main(video_path, api_key, prompt, mark, sample_data):
             # }
             response =await analyzer.analyze_video( model, prompt, sample_data )
             if isinstance( response, str ):
+                await analyzer.delete_video()
                 return response, mark
             print( response.usage_metadata )
             print(response.text)
             if os.path.exists( save_path ):
                 os.remove( save_path )
                 print( f"文件已删除: {save_path}" )
+            await analyzer.delete_video()
             return response.text, mark
         except Exception as e:
             attempt += 1  # 增加尝试次数

+ 5 - 5
job_video_processing.py

@@ -6,7 +6,7 @@ max_workers = 5  # 最大线程数
 
 
 def video_ai_task_start():
-    redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend']
+    redis_task_list = ['task:video_ai_top', 'task:video_ai_top', 'task:video_ai_top', 'task:video_ai_recommend', 'task:video_ai_recommend']
     futures = []  # 用来存储任务的 Future 对象
     task_index = 0
 
@@ -14,7 +14,7 @@ def video_ai_task_start():
         while True:
             futures = [f for f in futures if not f.done()]
 
-            if len( futures ) < max_workers:
+            if len(futures) < max_workers:
                 try:
                     # 提交新任务
                     future = executor.submit( process_video_ai, redis_task_list[task_index] )
@@ -22,15 +22,15 @@ def video_ai_task_start():
 
                     task_index += 1
                     if task_index >= len( redis_task_list ):
-                        task_index = 0  # 重置索引
+                        task_index = 0
 
                     time.sleep( 1 )
 
                 except Exception as e:
                     print( f"异常信息: {e}" )
-                    time.sleep( 1 )
+                    time.sleep(1)
             else:
-                time.sleep( 5)  # 暂停1秒,避免忙等待
+                time.sleep(1)
 
 
 def process_video_ai(redis_task):

+ 1 - 2
video_processing/video_processing.py

@@ -72,11 +72,10 @@ class VideoProcessing:
         video_data = get_video_data(redis_task)
         if not video_data:
             print("没有获取到视频内容")
-            time.sleep(120)
+            time.sleep(5)
             return
         # 解码为字符串
         data_str = video_data.decode( 'utf-8' )
-
         # 解析为 JSON 对象
         data_json = json.loads( data_str )
         video_id = data_json['video_id']