import os import time import uuid from typing import Optional import google.generativeai as genai import orjson import requests from google.generativeai.types import (HarmBlockThreshold, HarmCategory) from loguru import logger from utils.coze_hook import CozeHook from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT, VIDEO_ANALYSIS_PROMPT # from utils.feishu_data import Material CACHE_DIR = os.path.join(os.getcwd(), 'video_cache') # CACHE_DIR = '/Users/z/Downloads/' # PROXY_ADDR = 'http://localhost:1081' # os.environ['http_proxy'] = PROXY_ADDR # os.environ['https_proxy'] = PROXY_ADDR def load_prompts(): """从prompt.py加载Prompt""" try: print("\n[初始化] 从prompt.py加载Prompt") prompts = [ # { # "name": "视频选题与要点理解", # "content": VIDEO_TOPIC_ANALYSIS_PROMPT # }, # { # "name": "视频分段与时间点分析", # "content": VIDEO_SEGMENT_ANALYSIS_PROMPT # } { "name": "视频内容分析", "content": VIDEO_ANALYSIS_PROMPT } ] print(f"[成功] 加载 {len(prompts)} 个Prompt") return prompts except Exception as e: raise Exception(f"加载Prompt失败: {str(e)}") class GoogleAI(object): @classmethod def download_video(cls, video_link: str) -> Optional[str]: file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4') try: # 确保缓存目录存在 try: os.makedirs(CACHE_DIR, exist_ok=True) except Exception as e: error_info = { "error_type": type(e).__name__, "error_message": str(e), "cache_dir": CACHE_DIR, "current_dir": os.getcwd(), "dir_exists": os.path.exists(CACHE_DIR), "dir_permissions": oct(os.stat(os.path.dirname(CACHE_DIR)).st_mode)[-3:] if os.path.exists(os.path.dirname(CACHE_DIR)) else "N/A" } error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8') logger.error(f'[内容分析] 创建缓存目录失败: {error_json}') return None for _ in range(3): try: response = requests.get(url=video_link, timeout=60) print(f"response content: {file_path}") if response.status_code == 200: try: with open(file_path, 'wb') as f: f.write(response.content) logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}') except Exception as e: error_info = { "error_type": type(e).__name__, "error_message": str(e), "file_path": file_path, "content_length": len(response.content) if response.content else 0 } error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8') logger.error(f'[内容分析] 视频保存失败: {error_json}') return None return file_path except Exception: time.sleep(1) continue except Exception: logger.error(f'[内容分析] 创建缓存目录失败') return None @classmethod def _analyze_content(cls, video, prompt): """增强版内容分析""" model = genai.GenerativeModel( model_name='gemini-2.0-flash', generation_config=genai.GenerationConfig( response_mime_type='application/json', temperature=0.3, max_output_tokens=20480 ), safety_settings={ HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, } ) try: response = model.generate_content( contents=[video, prompt], request_options={'timeout': 300} ) if hasattr(response, '_error') and response._error: raise Exception(f"生成错误: {response._error}") result = orjson.loads(response.text.strip()) print(f"[视频分析] 响应: {result}") if not isinstance(result, dict): raise ValueError("响应格式错误:非字典结构") return result except orjson.JSONDecodeError: raise Exception("响应解析失败,非JSON格式") except Exception as e: raise Exception(f"分析失败: {str(e)}") @classmethod def run(cls, api_key, video_url): print(f"api_key:{api_key},video_url:{video_url}") video_path = None try: genai.configure(api_key=api_key) video_path = cls.download_video(video_link=video_url) if not video_path: logger.error(f'[内容分析] 视频下载失败, 跳过任务') os.remove(video_path) logger.info(f"[内容分析] 文件已删除: {video_path}") return "[异常] 视频下载失败","" video = genai.upload_file(path=video_path, mime_type='video/mp4') while video.state.name == 'PROCESSING': time.sleep(1) video = genai.get_file(name=video.name) if video.state.name != 'ACTIVE': genai.delete_file(name=video.name) os.remove(video_path) return "[异常] 上传视频失败", "" prompts = load_prompts() analysis_data = {} for prompt in prompts[:3]: # print(f"[分析] 正在执行: {prompt['name']}") try: result = cls._analyze_content(video, prompt['content']) # 提取 result 中的 "内容分段" 和 "视频简介" analysis_data['视频选题与要点理解'] = { "视频简介": result.get('视频简介', ''), "视频内容类型": result.get('视频内容类型', ''), "段落类型相似度": result.get('段落类型相似度', 1) } analysis_data['视频分段与时间点分析'] = { "内容分段": result.get('内容分段', []) } except Exception as e: analysis_data[prompt['name']] = { "error": str(e), "error_type": type(e).__name__ } # print(f"[分析] 所有分析完成, 结果: {analysis_data}") coze_hook = CozeHook() demand_list = coze_hook.run(analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"]) # print(f"[分析] 所有分析完成, 结果: {demand_list}") genai.delete_file(name=video.name) os.remove(video_path) return analysis_data, demand_list except Exception as e: logger.error(f"[内容分析] 处理异常,异常信息{e}") os.remove(video_path) return f"[异常] {e}","" @classmethod def _analyze_content_with_api(cls, video_url): """使用API分析视频内容""" try: # 检查视频URL是否有效 if not video_url or not video_url.startswith('http'): raise Exception("无效的视频URL") # 获取视频文件以确定正确的MIME类型 try: response = requests.head(video_url, timeout=10) content_type = response.headers.get('content-type', '') if not content_type or 'video' not in content_type.lower(): # 如果无法从HEAD请求获取正确的content-type,尝试GET请求 response = requests.get(video_url, stream=True, timeout=10) content_type = response.headers.get('content-type', '') if not content_type or 'video' not in content_type.lower(): content_type = 'video/mp4' # 默认使用mp4 except Exception as e: logger.warning(f"[内容分析] 获取视频MIME类型失败: {str(e)}, 使用默认类型video/mp4") content_type = 'video/mp4' # 使用API分析视频内容 response = requests.post( 'http://ai-api.piaoquantv.com/aigc-server/gemini/generateContent', json={ "mediaUrl": video_url, "type": 2, "prompt": VIDEO_ANALYSIS_PROMPT, "model": "gemini-2.0-flash", "temperature": "0.3", "mimeType": content_type # 添加正确的MIME类型 }, timeout=300 ) response.raise_for_status() result = response.json() # print(f"[内容分析] API原始响应: {result}") if not result: raise Exception("API返回结果为空") if result.get('code') != 0: error_msg = result.get('msg', '未知错误') if 'data' in error_msg and 'error' in error_msg: try: error_data = orjson.loads(error_msg) if isinstance(error_data, dict) and 'error' in error_data: error_msg = f"API错误: {error_data['error'].get('message', error_msg)}" except: pass raise Exception(f"API返回错误: {error_msg}") if not result.get('data') or not result['data'].get('result'): raise Exception("API返回数据格式错误: 缺少result字段") try: # 解析返回的JSON字符串 analysis_result = orjson.loads(result['data']['result']) if not isinstance(analysis_result, dict): raise ValueError("API返回的result不是有效的JSON对象") # 构建analysis_data analysis_data = { '视频选题与要点理解': { "视频简介": analysis_result.get('视频简介', ''), "视频内容类型": analysis_result.get('视频内容类型', ''), "段落类型相似度": analysis_result.get('段落类型相似度', 1) }, '视频分段与时间点分析': { "内容分段": analysis_result.get('内容分段', []) } } # 使用coze_hook处理数据 coze_hook = CozeHook() demand_list = coze_hook.run( analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"] ) if not demand_list: raise Exception("CozeHook处理结果为空") # print(f"[内容分析] API分析完成, 结果: {analysis_data}, {demand_list}") return analysis_data, demand_list except orjson.JSONDecodeError as e: raise Exception(f"解析API返回的JSON失败: {str(e)}") except Exception as e: raise Exception(f"处理API返回数据时出错: {str(e)}") except requests.exceptions.RequestException as e: error_msg = f"API请求失败: {str(e)}" logger.error(f"[内容分析] {error_msg}") return f"[异常] {error_msg}", None except Exception as e: error_msg = f"API分析失败: {str(e)}" logger.error(f"[内容分析] {error_msg}") return f"[异常] {error_msg}", None if __name__ == '__main__': ai = GoogleAI() ai.run("AIzaSyBziAREIdTRtOZCp4KmNUjNpNDpbysgY-s", "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213") # ai._analyze_content_with_api("http://rescdn.yishihui.com/longvideo/crawler_local/video/prod/20241206/5f98b0e4464d02d6c75907302793902d12277")