import os import time import uuid from typing import Optional import functools import google.generativeai as genai import orjson import requests from google.generativeai.types import (HarmBlockThreshold, HarmCategory) from google.api_core import exceptions as google_exceptions from requests.exceptions import RequestException, Timeout, ConnectionError from loguru import logger from utils.coze_hook import CozeHook from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT, VIDEO_ANALYSIS_PROMPT # from utils.feishu_data import Material CACHE_DIR = os.path.join(os.getcwd(), 'video_cache') # CACHE_DIR = '/Users/z/Downloads/' # PROXY_ADDR = 'http://localhost:1081' # os.environ['http_proxy'] = PROXY_ADDR # os.environ['https_proxy'] = PROXY_ADDR def retry_on_error(max_retries: int = 3, backoff_factor: float = 1.0): """ 装饰器:在特定错误时重试 """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except Exception as e: last_exception = e # 判断是否应该重试 should_retry = False if isinstance(e, google_exceptions.GoogleAPIError): # 对于429(频率限制)、500(服务器错误)、503(服务不可用)进行重试 should_retry = e.code in [429, 500, 503] elif isinstance(e, (Timeout, ConnectionError)): # 网络超时和连接错误进行重试 should_retry = True elif "overloaded" in str(e).lower() or "timeout" in str(e).lower(): # 服务器过载或超时进行重试 should_retry = True if should_retry and attempt < max_retries: wait_time = backoff_factor * (2 ** attempt) logger.warning(f"[重试] 第{attempt + 1}次尝试失败,{wait_time}秒后重试: {str(e)}") time.sleep(wait_time) continue else: # 不应该重试或已达到最大重试次数 break # 重试失败,抛出最后一次的异常 raise last_exception return wrapper return decorator def handle_genai_error(error: Exception) -> str: """ 统一处理Google GenerativeAI相关的错误 返回用户友好的错误信息 """ error_type = type(error).__name__ error_msg = str(error) # Google API 相关错误 if isinstance(error, google_exceptions.GoogleAPIError): if error.code == 400: return f"请求参数错误: {error_msg}" elif error.code == 401: return f"API密钥无效或已过期: {error_msg}" elif error.code == 403: return f"权限不足或服务不可用: {error_msg}" elif error.code == 404: return f"模型或资源不存在: {error_msg}" elif error.code == 429: return f"请求频率超限,请稍后重试: {error_msg}" elif error.code == 500: return f"服务器内部错误: {error_msg}" elif error.code == 503: return f"服务暂时不可用: {error_msg}" else: return f"Google API错误 ({error.code}): {error_msg}" # 网络相关错误 elif isinstance(error, (RequestException, Timeout, ConnectionError)): return f"网络连接错误: {error_msg}" # 通用错误处理 elif "API_KEY" in error_msg.upper() or "PERMISSION" in error_msg.upper(): return f"API密钥错误或权限不足: {error_msg}" elif "MODEL" in error_msg.upper() and ("NOT_FOUND" in error_msg.upper() or "UNAVAILABLE" in error_msg.upper()): return f"模型不可用或不存在: {error_msg}" elif "QUOTA" in error_msg.upper() or "LIMIT" in error_msg.upper(): return f"配额超限或请求限制: {error_msg}" elif "TIMEOUT" in error_msg.upper(): return f"请求超时: {error_msg}" elif "OVERLOADED" in error_msg.upper(): return f"服务器负载过高,请稍后重试: {error_msg}" else: return f"创建GenerativeModel失败 ({error_type}): {error_msg}" def load_prompts(): """从prompt.py加载Prompt""" try: print("\n[初始化] 从prompt.py加载Prompt") prompts = [ # { # "name": "视频选题与要点理解", # "content": VIDEO_TOPIC_ANALYSIS_PROMPT # }, # { # "name": "视频分段与时间点分析", # "content": VIDEO_SEGMENT_ANALYSIS_PROMPT # } { "name": "视频内容分析", "content": VIDEO_ANALYSIS_PROMPT } ] print(f"[成功] 加载 {len(prompts)} 个Prompt") return prompts except Exception as e: raise Exception(f"加载Prompt失败: {str(e)}") class GoogleAI(object): @classmethod def download_video(cls, video_link: str) -> Optional[str]: file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4') try: # 确保缓存目录存在 try: os.makedirs(CACHE_DIR, exist_ok=True) except Exception as e: error_info = { "error_type": type(e).__name__, "error_message": str(e), "cache_dir": CACHE_DIR, "current_dir": os.getcwd(), "dir_exists": os.path.exists(CACHE_DIR), "dir_permissions": oct(os.stat(os.path.dirname(CACHE_DIR)).st_mode)[-3:] if os.path.exists(os.path.dirname(CACHE_DIR)) else "N/A" } error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8') logger.error(f'[内容分析] 创建缓存目录失败: {error_json}') return None for _ in range(3): try: response = requests.get(url=video_link, timeout=60) print(f"response content: {file_path}") if response.status_code == 200: try: with open(file_path, 'wb') as f: f.write(response.content) logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}') except Exception as e: error_info = { "error_type": type(e).__name__, "error_message": str(e), "file_path": file_path, "content_length": len(response.content) if response.content else 0 } error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8') logger.error(f'[内容分析] 视频保存失败: {error_json}') return None return file_path except Exception: time.sleep(1) continue except Exception: logger.error(f'[内容分析] 创建缓存目录失败') return None @classmethod @retry_on_error(max_retries=2, backoff_factor=1.5) def _analyze_content(cls, video, prompt): # logger.info(f"[视频分析] 开始分析, 视频: {video}, 提示: {prompt}") """增强版内容分析""" # 添加模型创建的错误处理 try: model = genai.GenerativeModel( model_name='gemini-2.0-flash', generation_config=genai.GenerationConfig( response_mime_type='application/json', temperature=0.3, max_output_tokens=20480 ), safety_settings={ HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, } ) except Exception as e: error_msg = handle_genai_error(e) logger.error(f"[视频分析] {error_msg}") raise Exception(error_msg) try: response = model.generate_content( contents=[video, prompt], request_options={'timeout': 300} ) if hasattr(response, '_error') and response._error: raise Exception(f"生成错误: {response._error}") result = orjson.loads(response.text.strip()) logger.info(f"[视频分析] 响应: {result}") print(f"[视频分析] 响应: {result}") if not isinstance(result, dict): raise ValueError("响应格式错误:非字典结构") return result except orjson.JSONDecodeError as e: error_msg = f"响应解析失败,非JSON格式: {str(e)}" logger.error(f"[视频分析] {error_msg}") raise Exception(error_msg) except Exception as e: # 如果是Google API相关错误,使用统一错误处理 if isinstance(e, (google_exceptions.GoogleAPIError, RequestException, Timeout, ConnectionError)): error_msg = handle_genai_error(e) logger.error(f"[视频分析] {error_msg}") raise Exception(error_msg) else: error_msg = f"分析失败: {str(e)}" logger.error(f"[视频分析] {error_msg}") raise Exception(error_msg) @classmethod def run(cls, api_key, video_url): print(f"api_key:{api_key},video_url:{video_url}") video_path = None try: genai.configure(api_key=api_key) video_path = cls.download_video(video_link=video_url) if not video_path: logger.error(f'[内容分析] 视频下载失败, 跳过任务') os.remove(video_path) logger.info(f"[内容分析] 文件已删除: {video_path}") return "[异常] 视频下载失败","" video = genai.upload_file(path=video_path, mime_type='video/mp4') while video.state.name == 'PROCESSING': time.sleep(1) video = genai.get_file(name=video.name) if video.state.name != 'ACTIVE': genai.delete_file(name=video.name) os.remove(video_path) return "[异常] 上传视频失败", "" logger.info(f"[内容分析] 文件下载完成: {video_path}") prompts = load_prompts() analysis_data = {} for prompt in prompts[:3]: # print(f"[分析] 正在执行: {prompt['name']}") try: result = cls._analyze_content(video, prompt['content']) # 提取 result 中的 "内容分段" 和 "视频简介" analysis_data['视频选题与要点理解'] = { "视频简介": result.get('视频简介', ''), "视频内容类型": result.get('视频内容类型', ''), "段落类型相似度": result.get('段落类型相似度', 1) } analysis_data['视频分段与时间点分析'] = { "内容分段": result.get('内容分段', []) } except Exception as e: analysis_data[prompt['name']] = { "error": str(e), "error_type": type(e).__name__ } # print(f"[分析] 所有分析完成, 结果: {analysis_data}") coze_hook = CozeHook() demand_list = coze_hook.run(analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"]) # print(f"[分析] 所有分析完成, 结果: {demand_list}") genai.delete_file(name=video.name) os.remove(video_path) return analysis_data, demand_list except Exception as e: logger.error(f"[内容分析] 处理异常,异常信息{e}") os.remove(video_path) return f"[异常] {e}","" @classmethod def _analyze_content_with_api(cls, video_url): """使用API分析视频内容""" try: # 检查视频URL是否有效 if not video_url or not video_url.startswith('http'): raise Exception("无效的视频URL") # 获取视频文件以确定正确的MIME类型 try: response = requests.head(video_url, timeout=10) content_type = response.headers.get('content-type', '') if not content_type or 'video' not in content_type.lower(): # 如果无法从HEAD请求获取正确的content-type,尝试GET请求 response = requests.get(video_url, stream=True, timeout=10) content_type = response.headers.get('content-type', '') if not content_type or 'video' not in content_type.lower(): content_type = 'video/mp4' # 默认使用mp4 except Exception as e: logger.warning(f"[内容分析] 获取视频MIME类型失败: {str(e)}, 使用默认类型video/mp4") content_type = 'video/mp4' # 使用API分析视频内容 response = requests.post( 'http://ai-api.piaoquantv.com/aigc-server/gemini/generateContent', json={ "mediaUrl": video_url, "type": 2, "prompt": VIDEO_ANALYSIS_PROMPT, "model": "gemini-2.0-flash", "temperature": "0.3", "mimeType": content_type # 添加正确的MIME类型 }, timeout=300 ) response.raise_for_status() result = response.json() # print(f"[内容分析] API原始响应: {result}") if not result: raise Exception("API返回结果为空") if result.get('code') != 0: error_msg = result.get('msg', '未知错误') if 'data' in error_msg and 'error' in error_msg: try: error_data = orjson.loads(error_msg) if isinstance(error_data, dict) and 'error' in error_data: error_msg = f"API错误: {error_data['error'].get('message', error_msg)}" except: pass raise Exception(f"API返回错误: {error_msg}") if not result.get('data') or not result['data'].get('result'): raise Exception("API返回数据格式错误: 缺少result字段") try: # 解析返回的JSON字符串 analysis_result = orjson.loads(result['data']['result']) if not isinstance(analysis_result, dict): raise ValueError("API返回的result不是有效的JSON对象") # 构建analysis_data analysis_data = { '视频选题与要点理解': { "视频简介": analysis_result.get('视频简介', ''), "视频内容类型": analysis_result.get('视频内容类型', ''), "段落类型相似度": analysis_result.get('段落类型相似度', 1) }, '视频分段与时间点分析': { "内容分段": analysis_result.get('内容分段', []) } } # 使用coze_hook处理数据 coze_hook = CozeHook() demand_list = coze_hook.run( analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"] ) if not demand_list: raise Exception("CozeHook处理结果为空") # print(f"[内容分析] API分析完成, 结果: {analysis_data}, {demand_list}") return analysis_data, demand_list except orjson.JSONDecodeError as e: raise Exception(f"解析API返回的JSON失败: {str(e)}") except Exception as e: raise Exception(f"处理API返回数据时出错: {str(e)}") except requests.exceptions.RequestException as e: error_msg = f"API请求失败: {str(e)}" logger.error(f"[内容分析] {error_msg}") return f"[异常] {error_msg}", None except Exception as e: error_msg = f"API分析失败: {str(e)}" logger.error(f"[内容分析] {error_msg}") return f"[异常] {error_msg}", None if __name__ == '__main__': # 使用示例:展示错误处理 try: ai = GoogleAI() result = ai.run("AIzaSyDs7rd3qWV2ElnP4xtY_b0EiLUdt3yviRs", "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213") print(f"分析成功: {result}") except Exception as e: error_msg = str(e) print(f"分析失败: {error_msg}") # 根据错误类型进行不同的处理 if "API密钥" in error_msg: print("请检查API密钥是否正确配置") elif "权限不足" in error_msg: print("请检查API密钥权限或服务可用性") elif "配额超限" in error_msg: print("请检查API配额或稍后重试") elif "网络" in error_msg: print("请检查网络连接") elif "服务器负载" in error_msg: print("服务器繁忙,请稍后重试") else: print("未知错误,请联系技术支持") # ai._analyze_content_with_api("http://rescdn.yishihui.com/longvideo/crawler_local/video/prod/20241206/5f98b0e4464d02d6c75907302793902d12277")