jihuaqiang 2 週間 前
コミット
a424deea50
1 ファイル変更148 行追加18 行削除
  1. 148 18
      utils/google_ai_analyze.py

+ 148 - 18
utils/google_ai_analyze.py

@@ -2,11 +2,14 @@ import os
 import time
 import uuid
 from typing import  Optional
+import functools
 
 import google.generativeai as genai
 import orjson
 import requests
 from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
+from google.api_core import exceptions as google_exceptions
+from requests.exceptions import RequestException, Timeout, ConnectionError
 from loguru import logger
 from utils.coze_hook import CozeHook
 
@@ -19,6 +22,94 @@ CACHE_DIR = os.path.join(os.getcwd(), 'video_cache')
 # PROXY_ADDR = 'http://localhost:1081'
 # os.environ['http_proxy'] = PROXY_ADDR
 # os.environ['https_proxy'] = PROXY_ADDR
+
+def retry_on_error(max_retries: int = 3, backoff_factor: float = 1.0):
+    """
+    装饰器:在特定错误时重试
+    """
+    def decorator(func):
+        @functools.wraps(func)
+        def wrapper(*args, **kwargs):
+            last_exception = None
+            
+            for attempt in range(max_retries + 1):
+                try:
+                    return func(*args, **kwargs)
+                except Exception as e:
+                    last_exception = e
+                    
+                    # 判断是否应该重试
+                    should_retry = False
+                    if isinstance(e, google_exceptions.GoogleAPIError):
+                        # 对于429(频率限制)、500(服务器错误)、503(服务不可用)进行重试
+                        should_retry = e.code in [429, 500, 503]
+                    elif isinstance(e, (Timeout, ConnectionError)):
+                        # 网络超时和连接错误进行重试
+                        should_retry = True
+                    elif "overloaded" in str(e).lower() or "timeout" in str(e).lower():
+                        # 服务器过载或超时进行重试
+                        should_retry = True
+                    
+                    if should_retry and attempt < max_retries:
+                        wait_time = backoff_factor * (2 ** attempt)
+                        logger.warning(f"[重试] 第{attempt + 1}次尝试失败,{wait_time}秒后重试: {str(e)}")
+                        time.sleep(wait_time)
+                        continue
+                    else:
+                        # 不应该重试或已达到最大重试次数
+                        break
+            
+            # 重试失败,抛出最后一次的异常
+            raise last_exception
+        
+        return wrapper
+    return decorator
+
+def handle_genai_error(error: Exception) -> str:
+    """
+    统一处理Google GenerativeAI相关的错误
+    返回用户友好的错误信息
+    """
+    error_type = type(error).__name__
+    error_msg = str(error)
+    
+    # Google API 相关错误
+    if isinstance(error, google_exceptions.GoogleAPIError):
+        if error.code == 400:
+            return f"请求参数错误: {error_msg}"
+        elif error.code == 401:
+            return f"API密钥无效或已过期: {error_msg}"
+        elif error.code == 403:
+            return f"权限不足或服务不可用: {error_msg}"
+        elif error.code == 404:
+            return f"模型或资源不存在: {error_msg}"
+        elif error.code == 429:
+            return f"请求频率超限,请稍后重试: {error_msg}"
+        elif error.code == 500:
+            return f"服务器内部错误: {error_msg}"
+        elif error.code == 503:
+            return f"服务暂时不可用: {error_msg}"
+        else:
+            return f"Google API错误 ({error.code}): {error_msg}"
+    
+    # 网络相关错误
+    elif isinstance(error, (RequestException, Timeout, ConnectionError)):
+        return f"网络连接错误: {error_msg}"
+    
+    # 通用错误处理
+    elif "API_KEY" in error_msg.upper() or "PERMISSION" in error_msg.upper():
+        return f"API密钥错误或权限不足: {error_msg}"
+    elif "MODEL" in error_msg.upper() and ("NOT_FOUND" in error_msg.upper() or "UNAVAILABLE" in error_msg.upper()):
+        return f"模型不可用或不存在: {error_msg}"
+    elif "QUOTA" in error_msg.upper() or "LIMIT" in error_msg.upper():
+        return f"配额超限或请求限制: {error_msg}"
+    elif "TIMEOUT" in error_msg.upper():
+        return f"请求超时: {error_msg}"
+    elif "OVERLOADED" in error_msg.upper():
+        return f"服务器负载过高,请稍后重试: {error_msg}"
+    else:
+        return f"创建GenerativeModel失败 ({error_type}): {error_msg}"
+
 def load_prompts():
     """从prompt.py加载Prompt"""
     try:
@@ -95,21 +186,29 @@ class GoogleAI(object):
             return None
       
     @classmethod
+    @retry_on_error(max_retries=2, backoff_factor=1.5)
     def _analyze_content(cls, video, prompt):
         logger.info(f"[视频分析] 开始分析, 视频: {video}, 提示: {prompt}")
         """增强版内容分析"""
-        model = genai.GenerativeModel(
-            model_name='gemini-2.0-flash',
-            generation_config=genai.GenerationConfig(
-                response_mime_type='application/json',
-                temperature=0.3,
-                max_output_tokens=20480
-            ),
-            safety_settings={
-                HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
-                HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
-            }
-        )
+        
+        # 添加模型创建的错误处理
+        try:
+            model = genai.GenerativeModel(
+                model_name='gemini-2.0-flash',
+                generation_config=genai.GenerationConfig(
+                    response_mime_type='application/json',
+                    temperature=0.3,
+                    max_output_tokens=20480
+                ),
+                safety_settings={
+                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+                }
+            )
+        except Exception as e:
+            error_msg = handle_genai_error(e)
+            logger.error(f"[视频分析] {error_msg}")
+            raise Exception(error_msg)
         
         try:
             response = model.generate_content(
@@ -128,10 +227,20 @@ class GoogleAI(object):
                 raise ValueError("响应格式错误:非字典结构")
                 
             return result
-        except orjson.JSONDecodeError:
-            raise Exception("响应解析失败,非JSON格式")
+        except orjson.JSONDecodeError as e:
+            error_msg = f"响应解析失败,非JSON格式: {str(e)}"
+            logger.error(f"[视频分析] {error_msg}")
+            raise Exception(error_msg)
         except Exception as e:
-            raise Exception(f"分析失败: {str(e)}")
+            # 如果是Google API相关错误,使用统一错误处理
+            if isinstance(e, (google_exceptions.GoogleAPIError, RequestException, Timeout, ConnectionError)):
+                error_msg = handle_genai_error(e)
+                logger.error(f"[视频分析] {error_msg}")
+                raise Exception(error_msg)
+            else:
+                error_msg = f"分析失败: {str(e)}"
+                logger.error(f"[视频分析] {error_msg}")
+                raise Exception(error_msg)
 
     @classmethod
     def run(cls, api_key, video_url):
@@ -294,8 +403,29 @@ class GoogleAI(object):
     
 
 if __name__ == '__main__':
-    ai = GoogleAI()
-    ai.run("AIzaSyDs7rd3qWV2ElnP4xtY_b0EiLUdt3yviRs",
-                 "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+    # 使用示例:展示错误处理
+    try:
+        ai = GoogleAI()
+        result = ai.run("AIzaSyDs7rd3qWV2ElnP4xtY_b0EiLUdt3yviRs",
+                       "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+        print(f"分析成功: {result}")
+    except Exception as e:
+        error_msg = str(e)
+        print(f"分析失败: {error_msg}")
+        
+        # 根据错误类型进行不同的处理
+        if "API密钥" in error_msg:
+            print("请检查API密钥是否正确配置")
+        elif "权限不足" in error_msg:
+            print("请检查API密钥权限或服务可用性")
+        elif "配额超限" in error_msg:
+            print("请检查API配额或稍后重试")
+        elif "网络" in error_msg:
+            print("请检查网络连接")
+        elif "服务器负载" in error_msg:
+            print("服务器繁忙,请稍后重试")
+        else:
+            print("未知错误,请联系技术支持")
+    
     # ai._analyze_content_with_api("http://rescdn.yishihui.com/longvideo/crawler_local/video/prod/20241206/5f98b0e4464d02d6c75907302793902d12277")