zhangyong 1 anno fa
parent
commit
d9bb7a1585
2 ha cambiato i file con 47 aggiunte e 31 eliminazioni
  1. 45 29
      google_ai/generativeai_video.py
  2. 2 2
      job_video_processing.py

+ 45 - 29
google_ai/generativeai_video.py

@@ -1,5 +1,7 @@
 import asyncio
 import os
+import time
+
 import cv2
 import aiohttp
 import requests
@@ -36,18 +38,19 @@ class VideoAnalyzer:
             # 发送 GET 请求获取视频内容
             random_filename = f"{uuid.uuid4()}.mp4"
             save_path = os.path.join(save_directory, random_filename)
-            async with aiohttp.ClientSession() as session:
-                async with session.get( video_url ) as response:
-                    response.raise_for_status()  # 检查请求是否成功
-                    with open( save_path, 'wb' ) as video_file:
-                        while True:
-                            chunk = await response.content.read( 1024 * 1024 )  # 每次读取 1MB
-                            if not chunk:
-                                break
-                            video_file.write( chunk )
-
+            headers = {}
+            payload = {}
+            for i in range( 3 ):
+                response = requests.request("GET", video_url, headers=headers, data=payload)
+                if response.status_code == 200:
+                    # 以二进制写入模式打开文件
+                    with open(f"{save_path}", "wb") as file:
+                        # 将响应内容写入文件
+                        file.write(response.content)
+                        time.sleep(1)
                 print( f"视频已成功下载并保存到: {save_path}" )
                 return save_path
+            return None
         except requests.exceptions.RequestException as e:
             print( f"下载视频时出现错误: {e}" )
             return None
@@ -60,7 +63,7 @@ class VideoAnalyzer:
         except Exception as e:
             Common.logger("ai").info(f'上传视频文件并获取视频文件对象失败异常信息{e}')
             self.video_file.delete()
-            print( f"上传视频文件并获取视频文件对象失败:{e}" )
+            return f"上传视频文件并获取视频文件对象失败:{e}"
 
     async def _wait_for_processing(self):
         """等待视频文件处理完成"""
@@ -71,21 +74,28 @@ class VideoAnalyzer:
         print( f'视频处理完成: {self.video_file.uri}' )
 
     async def create_cache(self):
-        generation_config = {
-            "response_mime_type": "application/json"
-        }
-        """创建缓存内容,并返回生成模型"""
-        # 创建生成模型,使用 gemini-1.5-flash 模型
-        model = genai.GenerativeModel(
-            model_name="gemini-1.5-flash",
-            generation_config=generation_config,
-            safety_settings={
-                HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
+        try:
+            generation_config = {
+                "response_mime_type": "application/json"
             }
+            """创建缓存内容,并返回生成模型"""
+            # 创建生成模型,使用 gemini-1.5-flash 模型
+            model = genai.GenerativeModel(
+                model_name="gemini-1.5-flash",
+                generation_config=generation_config,
+                safety_settings={
+                    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
+                }
 
-        )
+            )
+
+            return model
+        except Exception as e:
+            Common.logger("ai").info(f'视频创建缓存内容,并返回生成模型异常信息{e}')
+            self.video_file.delete()
+            print(f"视频创建缓存内容,并返回生成模型异常信息{e}")
+            return f"视频创建缓存内容,并返回生成模型异常信息{e}"
 
-        return model
     async def analyze_video(self, model, questions, sample_data):
         try:
             chat_session = model.start_chat(history=[])
@@ -101,9 +111,10 @@ class VideoAnalyzer:
             self.video_file.delete()
             return response
         except Exception as e:
-            Common.logger("ai").info(f'创建缓存模型异常信息{e}')
+            Common.logger("ai").info(f'视频处理请求失败:{e}')
             self.video_file.delete()
-            print( f"视频处理失败:{e}" )
+            print( f"视频处理请求失败:{e}" )
+            return f"视频处理请求失败:{e}"
 
     def video_duration(self, filename):
         cap = cv2.VideoCapture( filename )
@@ -130,18 +141,21 @@ async def main(video_path, api_key, prompt, mark):
             if int( duration ) >= 600 or int( duration ) == 0:
                 return f"视频时长过长/视频时长为:{duration}秒"
             save_path = await analyzer.download_video(video_path)
-            if not save_path:
+            if not save_path or save_path == None:
                 if os.path.exists( save_path ):
                     os.remove( save_path )
                     print( f"文件已删除: {save_path}" )
                 print("视频下载失败")
                 return "视频下载失败"
             # 上传并处理视频
-            await analyzer.upload_video(save_path)
+            upload_response = await analyzer.upload_video( save_path )
+            if upload_response:
+                return upload_response, mark
             # 创建缓存模型
             model =await analyzer.create_cache()
-            print("创建缓存模型")
-
+            if isinstance( model, str ):
+                return model, mark
+            print("创建缓存模型成功")
             sample_data = {
                 "一、基础信息": {
                     "视觉/音乐/文字": "",
@@ -176,6 +190,8 @@ async def main(video_path, api_key, prompt, mark):
                 }
             }
             response =await analyzer.analyze_video( model, prompt, sample_data )
+            if isinstance( response, str ):
+                return response, mark
             print( response.usage_metadata )
             print(response.text)
             if os.path.exists( save_path ):

+ 2 - 2
job_video_processing.py

@@ -4,11 +4,11 @@ from common.redis import install_video_data
 from video_processing.video_processing import VideoProcessing
 
 
-max_workers = 6
+max_workers = 10
 
 def video_ai_task_start():
     with ThreadPoolExecutor( max_workers=max_workers) as executor:
-        redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend']
+        redis_task_list = ['task:video_ai_top', 'task:video_ai_recommend'] * 5
         # 任务索引
         task_index = 0
         while True: