google_ai_analyze.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. import os
  2. import time
  3. import uuid
  4. from typing import Optional
  5. import functools
  6. import google.generativeai as genai
  7. import orjson
  8. import requests
  9. from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
  10. from google.api_core import exceptions as google_exceptions
  11. from requests.exceptions import RequestException, Timeout, ConnectionError
  12. from loguru import logger
  13. from utils.coze_hook import CozeHook
  14. from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT, VIDEO_ANALYSIS_PROMPT
  15. # from utils.feishu_data import Material
  16. CACHE_DIR = os.path.join(os.getcwd(), 'video_cache')
  17. # CACHE_DIR = '/Users/z/Downloads/'
  18. # PROXY_ADDR = 'http://localhost:1081'
  19. # os.environ['http_proxy'] = PROXY_ADDR
  20. # os.environ['https_proxy'] = PROXY_ADDR
  21. def retry_on_error(max_retries: int = 3, backoff_factor: float = 1.0):
  22. """
  23. 装饰器:在特定错误时重试
  24. """
  25. def decorator(func):
  26. @functools.wraps(func)
  27. def wrapper(*args, **kwargs):
  28. last_exception = None
  29. for attempt in range(max_retries + 1):
  30. try:
  31. return func(*args, **kwargs)
  32. except Exception as e:
  33. last_exception = e
  34. # 判断是否应该重试
  35. should_retry = False
  36. if isinstance(e, google_exceptions.GoogleAPIError):
  37. # 对于429(频率限制)、500(服务器错误)、503(服务不可用)进行重试
  38. should_retry = e.code in [429, 500, 503]
  39. elif isinstance(e, (Timeout, ConnectionError)):
  40. # 网络超时和连接错误进行重试
  41. should_retry = True
  42. elif "overloaded" in str(e).lower() or "timeout" in str(e).lower():
  43. # 服务器过载或超时进行重试
  44. should_retry = True
  45. if should_retry and attempt < max_retries:
  46. wait_time = backoff_factor * (2 ** attempt)
  47. logger.warning(f"[重试] 第{attempt + 1}次尝试失败,{wait_time}秒后重试: {str(e)}")
  48. time.sleep(wait_time)
  49. continue
  50. else:
  51. # 不应该重试或已达到最大重试次数
  52. break
  53. # 重试失败,抛出最后一次的异常
  54. raise last_exception
  55. return wrapper
  56. return decorator
  57. def handle_genai_error(error: Exception) -> str:
  58. """
  59. 统一处理Google GenerativeAI相关的错误
  60. 返回用户友好的错误信息
  61. """
  62. error_type = type(error).__name__
  63. error_msg = str(error)
  64. # Google API 相关错误
  65. if isinstance(error, google_exceptions.GoogleAPIError):
  66. if error.code == 400:
  67. return f"请求参数错误: {error_msg}"
  68. elif error.code == 401:
  69. return f"API密钥无效或已过期: {error_msg}"
  70. elif error.code == 403:
  71. return f"权限不足或服务不可用: {error_msg}"
  72. elif error.code == 404:
  73. return f"模型或资源不存在: {error_msg}"
  74. elif error.code == 429:
  75. return f"请求频率超限,请稍后重试: {error_msg}"
  76. elif error.code == 500:
  77. return f"服务器内部错误: {error_msg}"
  78. elif error.code == 503:
  79. return f"服务暂时不可用: {error_msg}"
  80. else:
  81. return f"Google API错误 ({error.code}): {error_msg}"
  82. # 网络相关错误
  83. elif isinstance(error, (RequestException, Timeout, ConnectionError)):
  84. return f"网络连接错误: {error_msg}"
  85. # 通用错误处理
  86. elif "API_KEY" in error_msg.upper() or "PERMISSION" in error_msg.upper():
  87. return f"API密钥错误或权限不足: {error_msg}"
  88. elif "MODEL" in error_msg.upper() and ("NOT_FOUND" in error_msg.upper() or "UNAVAILABLE" in error_msg.upper()):
  89. return f"模型不可用或不存在: {error_msg}"
  90. elif "QUOTA" in error_msg.upper() or "LIMIT" in error_msg.upper():
  91. return f"配额超限或请求限制: {error_msg}"
  92. elif "TIMEOUT" in error_msg.upper():
  93. return f"请求超时: {error_msg}"
  94. elif "OVERLOADED" in error_msg.upper():
  95. return f"服务器负载过高,请稍后重试: {error_msg}"
  96. else:
  97. return f"创建GenerativeModel失败 ({error_type}): {error_msg}"
  98. def load_prompts():
  99. """从prompt.py加载Prompt"""
  100. try:
  101. print("\n[初始化] 从prompt.py加载Prompt")
  102. prompts = [
  103. # {
  104. # "name": "视频选题与要点理解",
  105. # "content": VIDEO_TOPIC_ANALYSIS_PROMPT
  106. # },
  107. # {
  108. # "name": "视频分段与时间点分析",
  109. # "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
  110. # }
  111. {
  112. "name": "视频内容分析",
  113. "content": VIDEO_ANALYSIS_PROMPT
  114. }
  115. ]
  116. print(f"[成功] 加载 {len(prompts)} 个Prompt")
  117. return prompts
  118. except Exception as e:
  119. raise Exception(f"加载Prompt失败: {str(e)}")
  120. class GoogleAI(object):
  121. @classmethod
  122. def download_video(cls, video_link: str) -> Optional[str]:
  123. file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
  124. try:
  125. # 确保缓存目录存在
  126. try:
  127. os.makedirs(CACHE_DIR, exist_ok=True)
  128. except Exception as e:
  129. error_info = {
  130. "error_type": type(e).__name__,
  131. "error_message": str(e),
  132. "cache_dir": CACHE_DIR,
  133. "current_dir": os.getcwd(),
  134. "dir_exists": os.path.exists(CACHE_DIR),
  135. "dir_permissions": oct(os.stat(os.path.dirname(CACHE_DIR)).st_mode)[-3:] if os.path.exists(os.path.dirname(CACHE_DIR)) else "N/A"
  136. }
  137. error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
  138. logger.error(f'[内容分析] 创建缓存目录失败: {error_json}')
  139. return None
  140. for _ in range(3):
  141. try:
  142. response = requests.get(url=video_link, timeout=60)
  143. print(f"response content: {file_path}")
  144. if response.status_code == 200:
  145. try:
  146. with open(file_path, 'wb') as f:
  147. f.write(response.content)
  148. logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}')
  149. except Exception as e:
  150. error_info = {
  151. "error_type": type(e).__name__,
  152. "error_message": str(e),
  153. "file_path": file_path,
  154. "content_length": len(response.content) if response.content else 0
  155. }
  156. error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
  157. logger.error(f'[内容分析] 视频保存失败: {error_json}')
  158. return None
  159. return file_path
  160. except Exception:
  161. time.sleep(1)
  162. continue
  163. except Exception:
  164. logger.error(f'[内容分析] 创建缓存目录失败')
  165. return None
  166. @classmethod
  167. @retry_on_error(max_retries=2, backoff_factor=1.5)
  168. def _analyze_content(cls, video, prompt):
  169. logger.info(f"[视频分析] 开始分析, 视频: {video}, 提示: {prompt}")
  170. """增强版内容分析"""
  171. # 添加模型创建的错误处理
  172. try:
  173. model = genai.GenerativeModel(
  174. model_name='gemini-2.0-flash',
  175. generation_config=genai.GenerationConfig(
  176. response_mime_type='application/json',
  177. temperature=0.3,
  178. max_output_tokens=20480
  179. ),
  180. safety_settings={
  181. HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
  182. HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
  183. }
  184. )
  185. except Exception as e:
  186. error_msg = handle_genai_error(e)
  187. logger.error(f"[视频分析] {error_msg}")
  188. raise Exception(error_msg)
  189. try:
  190. response = model.generate_content(
  191. contents=[video, prompt],
  192. request_options={'timeout': 300}
  193. )
  194. if hasattr(response, '_error') and response._error:
  195. raise Exception(f"生成错误: {response._error}")
  196. result = orjson.loads(response.text.strip())
  197. logger.info(f"[视频分析] 响应: {result}")
  198. print(f"[视频分析] 响应: {result}")
  199. if not isinstance(result, dict):
  200. raise ValueError("响应格式错误:非字典结构")
  201. return result
  202. except orjson.JSONDecodeError as e:
  203. error_msg = f"响应解析失败,非JSON格式: {str(e)}"
  204. logger.error(f"[视频分析] {error_msg}")
  205. raise Exception(error_msg)
  206. except Exception as e:
  207. # 如果是Google API相关错误,使用统一错误处理
  208. if isinstance(e, (google_exceptions.GoogleAPIError, RequestException, Timeout, ConnectionError)):
  209. error_msg = handle_genai_error(e)
  210. logger.error(f"[视频分析] {error_msg}")
  211. raise Exception(error_msg)
  212. else:
  213. error_msg = f"分析失败: {str(e)}"
  214. logger.error(f"[视频分析] {error_msg}")
  215. raise Exception(error_msg)
  216. @classmethod
  217. def run(cls, api_key, video_url):
  218. print(f"api_key:{api_key},video_url:{video_url}")
  219. video_path = None
  220. try:
  221. genai.configure(api_key=api_key)
  222. video_path = cls.download_video(video_link=video_url)
  223. if not video_path:
  224. logger.error(f'[内容分析] 视频下载失败, 跳过任务')
  225. os.remove(video_path)
  226. logger.info(f"[内容分析] 文件已删除: {video_path}")
  227. return "[异常] 视频下载失败",""
  228. video = genai.upload_file(path=video_path, mime_type='video/mp4')
  229. while video.state.name == 'PROCESSING':
  230. time.sleep(1)
  231. video = genai.get_file(name=video.name)
  232. if video.state.name != 'ACTIVE':
  233. genai.delete_file(name=video.name)
  234. os.remove(video_path)
  235. return "[异常] 上传视频失败", ""
  236. logger.info(f"[内容分析] 文件下载完成: {video_path}")
  237. prompts = load_prompts()
  238. analysis_data = {}
  239. for prompt in prompts[:3]:
  240. # print(f"[分析] 正在执行: {prompt['name']}")
  241. try:
  242. result = cls._analyze_content(video, prompt['content'])
  243. # 提取 result 中的 "内容分段" 和 "视频简介"
  244. analysis_data['视频选题与要点理解'] = {
  245. "视频简介": result.get('视频简介', ''),
  246. "视频内容类型": result.get('视频内容类型', ''),
  247. "段落类型相似度": result.get('段落类型相似度', 1)
  248. }
  249. analysis_data['视频分段与时间点分析'] = {
  250. "内容分段": result.get('内容分段', [])
  251. }
  252. except Exception as e:
  253. analysis_data[prompt['name']] = {
  254. "error": str(e),
  255. "error_type": type(e).__name__
  256. }
  257. # print(f"[分析] 所有分析完成, 结果: {analysis_data}")
  258. coze_hook = CozeHook()
  259. demand_list = coze_hook.run(analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"])
  260. # print(f"[分析] 所有分析完成, 结果: {demand_list}")
  261. genai.delete_file(name=video.name)
  262. os.remove(video_path)
  263. return analysis_data, demand_list
  264. except Exception as e:
  265. logger.error(f"[内容分析] 处理异常,异常信息{e}")
  266. os.remove(video_path)
  267. return f"[异常] {e}",""
  268. @classmethod
  269. def _analyze_content_with_api(cls, video_url):
  270. """使用API分析视频内容"""
  271. try:
  272. # 检查视频URL是否有效
  273. if not video_url or not video_url.startswith('http'):
  274. raise Exception("无效的视频URL")
  275. # 获取视频文件以确定正确的MIME类型
  276. try:
  277. response = requests.head(video_url, timeout=10)
  278. content_type = response.headers.get('content-type', '')
  279. if not content_type or 'video' not in content_type.lower():
  280. # 如果无法从HEAD请求获取正确的content-type,尝试GET请求
  281. response = requests.get(video_url, stream=True, timeout=10)
  282. content_type = response.headers.get('content-type', '')
  283. if not content_type or 'video' not in content_type.lower():
  284. content_type = 'video/mp4' # 默认使用mp4
  285. except Exception as e:
  286. logger.warning(f"[内容分析] 获取视频MIME类型失败: {str(e)}, 使用默认类型video/mp4")
  287. content_type = 'video/mp4'
  288. # 使用API分析视频内容
  289. response = requests.post(
  290. 'http://ai-api.piaoquantv.com/aigc-server/gemini/generateContent',
  291. json={
  292. "mediaUrl": video_url,
  293. "type": 2,
  294. "prompt": VIDEO_ANALYSIS_PROMPT,
  295. "model": "gemini-2.0-flash",
  296. "temperature": "0.3",
  297. "mimeType": content_type # 添加正确的MIME类型
  298. },
  299. timeout=300
  300. )
  301. response.raise_for_status()
  302. result = response.json()
  303. # print(f"[内容分析] API原始响应: {result}")
  304. if not result:
  305. raise Exception("API返回结果为空")
  306. if result.get('code') != 0:
  307. error_msg = result.get('msg', '未知错误')
  308. if 'data' in error_msg and 'error' in error_msg:
  309. try:
  310. error_data = orjson.loads(error_msg)
  311. if isinstance(error_data, dict) and 'error' in error_data:
  312. error_msg = f"API错误: {error_data['error'].get('message', error_msg)}"
  313. except:
  314. pass
  315. raise Exception(f"API返回错误: {error_msg}")
  316. if not result.get('data') or not result['data'].get('result'):
  317. raise Exception("API返回数据格式错误: 缺少result字段")
  318. try:
  319. # 解析返回的JSON字符串
  320. analysis_result = orjson.loads(result['data']['result'])
  321. if not isinstance(analysis_result, dict):
  322. raise ValueError("API返回的result不是有效的JSON对象")
  323. # 构建analysis_data
  324. analysis_data = {
  325. '视频选题与要点理解': {
  326. "视频简介": analysis_result.get('视频简介', ''),
  327. "视频内容类型": analysis_result.get('视频内容类型', ''),
  328. "段落类型相似度": analysis_result.get('段落类型相似度', 1)
  329. },
  330. '视频分段与时间点分析': {
  331. "内容分段": analysis_result.get('内容分段', [])
  332. }
  333. }
  334. # 使用coze_hook处理数据
  335. coze_hook = CozeHook()
  336. demand_list = coze_hook.run(
  337. analysis_data["视频选题与要点理解"],
  338. analysis_data["视频分段与时间点分析"]
  339. )
  340. if not demand_list:
  341. raise Exception("CozeHook处理结果为空")
  342. # print(f"[内容分析] API分析完成, 结果: {analysis_data}, {demand_list}")
  343. return analysis_data, demand_list
  344. except orjson.JSONDecodeError as e:
  345. raise Exception(f"解析API返回的JSON失败: {str(e)}")
  346. except Exception as e:
  347. raise Exception(f"处理API返回数据时出错: {str(e)}")
  348. except requests.exceptions.RequestException as e:
  349. error_msg = f"API请求失败: {str(e)}"
  350. logger.error(f"[内容分析] {error_msg}")
  351. return f"[异常] {error_msg}", None
  352. except Exception as e:
  353. error_msg = f"API分析失败: {str(e)}"
  354. logger.error(f"[内容分析] {error_msg}")
  355. return f"[异常] {error_msg}", None
  356. if __name__ == '__main__':
  357. # 使用示例:展示错误处理
  358. try:
  359. ai = GoogleAI()
  360. result = ai.run("AIzaSyDs7rd3qWV2ElnP4xtY_b0EiLUdt3yviRs",
  361. "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
  362. print(f"分析成功: {result}")
  363. except Exception as e:
  364. error_msg = str(e)
  365. print(f"分析失败: {error_msg}")
  366. # 根据错误类型进行不同的处理
  367. if "API密钥" in error_msg:
  368. print("请检查API密钥是否正确配置")
  369. elif "权限不足" in error_msg:
  370. print("请检查API密钥权限或服务可用性")
  371. elif "配额超限" in error_msg:
  372. print("请检查API配额或稍后重试")
  373. elif "网络" in error_msg:
  374. print("请检查网络连接")
  375. elif "服务器负载" in error_msg:
  376. print("服务器繁忙,请稍后重试")
  377. else:
  378. print("未知错误,请联系技术支持")
  379. # ai._analyze_content_with_api("http://rescdn.yishihui.com/longvideo/crawler_local/video/prod/20241206/5f98b0e4464d02d6c75907302793902d12277")