123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- import os
- import time
- import uuid
- from typing import Optional
- import functools
- import google.generativeai as genai
- import orjson
- import requests
- from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
- from google.api_core import exceptions as google_exceptions
- from requests.exceptions import RequestException, Timeout, ConnectionError
- from loguru import logger
- from utils.coze_hook import CozeHook
- from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT, VIDEO_ANALYSIS_PROMPT
- # from utils.feishu_data import Material
- CACHE_DIR = os.path.join(os.getcwd(), 'video_cache')
- # CACHE_DIR = '/Users/z/Downloads/'
- # PROXY_ADDR = 'http://localhost:1081'
- # os.environ['http_proxy'] = PROXY_ADDR
- # os.environ['https_proxy'] = PROXY_ADDR
- def retry_on_error(max_retries: int = 3, backoff_factor: float = 1.0):
- """
- 装饰器:在特定错误时重试
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- last_exception = None
-
- for attempt in range(max_retries + 1):
- try:
- return func(*args, **kwargs)
- except Exception as e:
- last_exception = e
-
- # 判断是否应该重试
- should_retry = False
- if isinstance(e, google_exceptions.GoogleAPIError):
- # 对于429(频率限制)、500(服务器错误)、503(服务不可用)进行重试
- should_retry = e.code in [429, 500, 503]
- elif isinstance(e, (Timeout, ConnectionError)):
- # 网络超时和连接错误进行重试
- should_retry = True
- elif "overloaded" in str(e).lower() or "timeout" in str(e).lower():
- # 服务器过载或超时进行重试
- should_retry = True
-
- if should_retry and attempt < max_retries:
- wait_time = backoff_factor * (2 ** attempt)
- logger.warning(f"[重试] 第{attempt + 1}次尝试失败,{wait_time}秒后重试: {str(e)}")
- time.sleep(wait_time)
- continue
- else:
- # 不应该重试或已达到最大重试次数
- break
-
- # 重试失败,抛出最后一次的异常
- raise last_exception
-
- return wrapper
- return decorator
- def handle_genai_error(error: Exception) -> str:
- """
- 统一处理Google GenerativeAI相关的错误
- 返回用户友好的错误信息
- """
- error_type = type(error).__name__
- error_msg = str(error)
-
- # Google API 相关错误
- if isinstance(error, google_exceptions.GoogleAPIError):
- if error.code == 400:
- return f"请求参数错误: {error_msg}"
- elif error.code == 401:
- return f"API密钥无效或已过期: {error_msg}"
- elif error.code == 403:
- return f"权限不足或服务不可用: {error_msg}"
- elif error.code == 404:
- return f"模型或资源不存在: {error_msg}"
- elif error.code == 429:
- return f"请求频率超限,请稍后重试: {error_msg}"
- elif error.code == 500:
- return f"服务器内部错误: {error_msg}"
- elif error.code == 503:
- return f"服务暂时不可用: {error_msg}"
- else:
- return f"Google API错误 ({error.code}): {error_msg}"
-
- # 网络相关错误
- elif isinstance(error, (RequestException, Timeout, ConnectionError)):
- return f"网络连接错误: {error_msg}"
-
- # 通用错误处理
- elif "API_KEY" in error_msg.upper() or "PERMISSION" in error_msg.upper():
- return f"API密钥错误或权限不足: {error_msg}"
- elif "MODEL" in error_msg.upper() and ("NOT_FOUND" in error_msg.upper() or "UNAVAILABLE" in error_msg.upper()):
- return f"模型不可用或不存在: {error_msg}"
- elif "QUOTA" in error_msg.upper() or "LIMIT" in error_msg.upper():
- return f"配额超限或请求限制: {error_msg}"
- elif "TIMEOUT" in error_msg.upper():
- return f"请求超时: {error_msg}"
- elif "OVERLOADED" in error_msg.upper():
- return f"服务器负载过高,请稍后重试: {error_msg}"
- else:
- return f"创建GenerativeModel失败 ({error_type}): {error_msg}"
- def load_prompts():
- """从prompt.py加载Prompt"""
- try:
- print("\n[初始化] 从prompt.py加载Prompt")
-
- prompts = [
- # {
- # "name": "视频选题与要点理解",
- # "content": VIDEO_TOPIC_ANALYSIS_PROMPT
- # },
- # {
- # "name": "视频分段与时间点分析",
- # "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
- # }
- {
- "name": "视频内容分析",
- "content": VIDEO_ANALYSIS_PROMPT
- }
- ]
-
- print(f"[成功] 加载 {len(prompts)} 个Prompt")
- return prompts
-
- except Exception as e:
- raise Exception(f"加载Prompt失败: {str(e)}")
-
- class GoogleAI(object):
- @classmethod
- def download_video(cls, video_link: str) -> Optional[str]:
- file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
- try:
- # 确保缓存目录存在
- try:
- os.makedirs(CACHE_DIR, exist_ok=True)
- except Exception as e:
- error_info = {
- "error_type": type(e).__name__,
- "error_message": str(e),
- "cache_dir": CACHE_DIR,
- "current_dir": os.getcwd(),
- "dir_exists": os.path.exists(CACHE_DIR),
- "dir_permissions": oct(os.stat(os.path.dirname(CACHE_DIR)).st_mode)[-3:] if os.path.exists(os.path.dirname(CACHE_DIR)) else "N/A"
- }
- error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
- logger.error(f'[内容分析] 创建缓存目录失败: {error_json}')
- return None
-
- for _ in range(3):
- try:
- response = requests.get(url=video_link, timeout=60)
- print(f"response content: {file_path}")
- if response.status_code == 200:
- try:
- with open(file_path, 'wb') as f:
- f.write(response.content)
- logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}')
- except Exception as e:
- error_info = {
- "error_type": type(e).__name__,
- "error_message": str(e),
- "file_path": file_path,
- "content_length": len(response.content) if response.content else 0
- }
- error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
- logger.error(f'[内容分析] 视频保存失败: {error_json}')
- return None
- return file_path
- except Exception:
- time.sleep(1)
- continue
- except Exception:
- logger.error(f'[内容分析] 创建缓存目录失败')
- return None
-
- @classmethod
- @retry_on_error(max_retries=2, backoff_factor=1.5)
- def _analyze_content(cls, video, prompt):
- # logger.info(f"[视频分析] 开始分析, 视频: {video}, 提示: {prompt}")
- """增强版内容分析"""
-
- # 添加模型创建的错误处理
- try:
- model = genai.GenerativeModel(
- model_name='gemini-2.0-flash',
- generation_config=genai.GenerationConfig(
- response_mime_type='application/json',
- temperature=0.3,
- max_output_tokens=20480
- ),
- safety_settings={
- HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
- HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
- }
- )
- except Exception as e:
- error_msg = handle_genai_error(e)
- logger.error(f"[视频分析] {error_msg}")
- raise Exception(error_msg)
-
- try:
- response = model.generate_content(
- contents=[video, prompt],
- request_options={'timeout': 300}
- )
-
- if hasattr(response, '_error') and response._error:
- raise Exception(f"生成错误: {response._error}")
-
- result = orjson.loads(response.text.strip())
- logger.info(f"[视频分析] 响应: {result}")
- print(f"[视频分析] 响应: {result}")
- if not isinstance(result, dict):
- raise ValueError("响应格式错误:非字典结构")
-
- return result
- except orjson.JSONDecodeError as e:
- error_msg = f"响应解析失败,非JSON格式: {str(e)}"
- logger.error(f"[视频分析] {error_msg}")
- raise Exception(error_msg)
- except Exception as e:
- # 如果是Google API相关错误,使用统一错误处理
- if isinstance(e, (google_exceptions.GoogleAPIError, RequestException, Timeout, ConnectionError)):
- error_msg = handle_genai_error(e)
- logger.error(f"[视频分析] {error_msg}")
- raise Exception(error_msg)
- else:
- error_msg = f"分析失败: {str(e)}"
- logger.error(f"[视频分析] {error_msg}")
- raise Exception(error_msg)
- @classmethod
- def run(cls, api_key, video_url):
- print(f"api_key:{api_key},video_url:{video_url}")
- video_path = None
- try:
- genai.configure(api_key=api_key)
- video_path = cls.download_video(video_link=video_url)
- if not video_path:
- logger.error(f'[内容分析] 视频下载失败, 跳过任务')
- os.remove(video_path)
- logger.info(f"[内容分析] 文件已删除: {video_path}")
- return "[异常] 视频下载失败",""
- video = genai.upload_file(path=video_path, mime_type='video/mp4')
- while video.state.name == 'PROCESSING':
- time.sleep(1)
- video = genai.get_file(name=video.name)
- if video.state.name != 'ACTIVE':
- genai.delete_file(name=video.name)
- os.remove(video_path)
- return "[异常] 上传视频失败", ""
- logger.info(f"[内容分析] 文件下载完成: {video_path}")
- prompts = load_prompts()
- analysis_data = {}
- for prompt in prompts[:3]:
- # print(f"[分析] 正在执行: {prompt['name']}")
- try:
- result = cls._analyze_content(video, prompt['content'])
- # 提取 result 中的 "内容分段" 和 "视频简介"
- analysis_data['视频选题与要点理解'] = {
- "视频简介": result.get('视频简介', ''),
- "视频内容类型": result.get('视频内容类型', ''),
- "段落类型相似度": result.get('段落类型相似度', 1)
- }
- analysis_data['视频分段与时间点分析'] = {
- "内容分段": result.get('内容分段', [])
- }
- except Exception as e:
- analysis_data[prompt['name']] = {
- "error": str(e),
- "error_type": type(e).__name__
- }
- # print(f"[分析] 所有分析完成, 结果: {analysis_data}")
- coze_hook = CozeHook()
- demand_list = coze_hook.run(analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"])
- # print(f"[分析] 所有分析完成, 结果: {demand_list}")
- genai.delete_file(name=video.name)
- os.remove(video_path)
- return analysis_data, demand_list
- except Exception as e:
- logger.error(f"[内容分析] 处理异常,异常信息{e}")
- os.remove(video_path)
- return f"[异常] {e}",""
-
- @classmethod
- def _analyze_content_with_api(cls, video_url):
- """使用API分析视频内容"""
- try:
- # 检查视频URL是否有效
- if not video_url or not video_url.startswith('http'):
- raise Exception("无效的视频URL")
-
- # 获取视频文件以确定正确的MIME类型
- try:
- response = requests.head(video_url, timeout=10)
- content_type = response.headers.get('content-type', '')
- if not content_type or 'video' not in content_type.lower():
- # 如果无法从HEAD请求获取正确的content-type,尝试GET请求
- response = requests.get(video_url, stream=True, timeout=10)
- content_type = response.headers.get('content-type', '')
- if not content_type or 'video' not in content_type.lower():
- content_type = 'video/mp4' # 默认使用mp4
- except Exception as e:
- logger.warning(f"[内容分析] 获取视频MIME类型失败: {str(e)}, 使用默认类型video/mp4")
- content_type = 'video/mp4'
- # 使用API分析视频内容
- response = requests.post(
- 'http://ai-api.piaoquantv.com/aigc-server/gemini/generateContent',
- json={
- "mediaUrl": video_url,
- "type": 2,
- "prompt": VIDEO_ANALYSIS_PROMPT,
- "model": "gemini-2.0-flash",
- "temperature": "0.3",
- "mimeType": content_type # 添加正确的MIME类型
- },
- timeout=300
- )
- response.raise_for_status()
- result = response.json()
- # print(f"[内容分析] API原始响应: {result}")
-
- if not result:
- raise Exception("API返回结果为空")
-
- if result.get('code') != 0:
- error_msg = result.get('msg', '未知错误')
- if 'data' in error_msg and 'error' in error_msg:
- try:
- error_data = orjson.loads(error_msg)
- if isinstance(error_data, dict) and 'error' in error_data:
- error_msg = f"API错误: {error_data['error'].get('message', error_msg)}"
- except:
- pass
- raise Exception(f"API返回错误: {error_msg}")
-
- if not result.get('data') or not result['data'].get('result'):
- raise Exception("API返回数据格式错误: 缺少result字段")
-
- try:
- # 解析返回的JSON字符串
- analysis_result = orjson.loads(result['data']['result'])
- if not isinstance(analysis_result, dict):
- raise ValueError("API返回的result不是有效的JSON对象")
-
- # 构建analysis_data
- analysis_data = {
- '视频选题与要点理解': {
- "视频简介": analysis_result.get('视频简介', ''),
- "视频内容类型": analysis_result.get('视频内容类型', ''),
- "段落类型相似度": analysis_result.get('段落类型相似度', 1)
- },
- '视频分段与时间点分析': {
- "内容分段": analysis_result.get('内容分段', [])
- }
- }
-
- # 使用coze_hook处理数据
- coze_hook = CozeHook()
- demand_list = coze_hook.run(
- analysis_data["视频选题与要点理解"],
- analysis_data["视频分段与时间点分析"]
- )
-
- if not demand_list:
- raise Exception("CozeHook处理结果为空")
-
- # print(f"[内容分析] API分析完成, 结果: {analysis_data}, {demand_list}")
- return analysis_data, demand_list
-
- except orjson.JSONDecodeError as e:
- raise Exception(f"解析API返回的JSON失败: {str(e)}")
- except Exception as e:
- raise Exception(f"处理API返回数据时出错: {str(e)}")
-
- except requests.exceptions.RequestException as e:
- error_msg = f"API请求失败: {str(e)}"
- logger.error(f"[内容分析] {error_msg}")
- return f"[异常] {error_msg}", None
- except Exception as e:
- error_msg = f"API分析失败: {str(e)}"
- logger.error(f"[内容分析] {error_msg}")
- return f"[异常] {error_msg}", None
-
-
- if __name__ == '__main__':
- # 使用示例:展示错误处理
- try:
- ai = GoogleAI()
- result = ai.run("AIzaSyDs7rd3qWV2ElnP4xtY_b0EiLUdt3yviRs",
- "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
- print(f"分析成功: {result}")
- except Exception as e:
- error_msg = str(e)
- print(f"分析失败: {error_msg}")
-
- # 根据错误类型进行不同的处理
- if "API密钥" in error_msg:
- print("请检查API密钥是否正确配置")
- elif "权限不足" in error_msg:
- print("请检查API密钥权限或服务可用性")
- elif "配额超限" in error_msg:
- print("请检查API配额或稍后重试")
- elif "网络" in error_msg:
- print("请检查网络连接")
- elif "服务器负载" in error_msg:
- print("服务器繁忙,请稍后重试")
- else:
- print("未知错误,请联系技术支持")
-
- # ai._analyze_content_with_api("http://rescdn.yishihui.com/longvideo/crawler_local/video/prod/20241206/5f98b0e4464d02d6c75907302793902d12277")
|