import asyncio import os import time import cv2 import requests import google.generativeai as genai import uuid from google.generativeai.types import HarmCategory, HarmBlockThreshold from common import Common class VideoAnalyzer: def __init__(self, api_key): """初始化类,配置 API 密钥和视频路径""" genai.configure(api_key=api_key) self.video_file = None async def process_and_delete_file(self, file_path): """删除文件""" try: print(f"正在处理文件: {file_path}" ) os.remove( file_path ) print( f"文件已删除: {file_path}" ) except Exception as e: print( f"处理或删除文件时发生错误: {str( e )}" ) async def download_video(self, video_url, save_directory='/root/google_ai_studio/path'): # async def download_video(self, video_url, save_directory='/Users/tzld/Desktop/google_ai_studio/path'): """从给定的视频链接下载视频并保存到指定路径""" random_filename = f"{uuid.uuid4()}.mp4" save_path = os.path.join(save_directory, random_filename) max_retries = 3 # 最大重试次数 for attempt in range(max_retries): try: # 发送 GET 请求获取视频内容 headers = {} payload = {} response = requests.request("GET", video_url, headers=headers, data=payload) if response.status_code == 200: # 以二进制写入模式打开文件 with open(save_path, "wb") as file: # 将响应内容写入文件 file.write(response.content) print(f"视频已成功下载并保存到: {save_path}") return save_path else: print(f"请求失败,状态码: {response.status_code}") except requests.exceptions.RequestException as e: print(f"下载视频时出现错误: {e}") # 如果下载失败,等待一段时间后重试 time.sleep(1) print("达到最大重试次数,视频下载失败") return None async def delete_video(self): try: self.video_file.delete() except Exception as e: Common.logger( "ai" ).info( f'视频删除异常{e}' ) async def upload_video(self, save_path, mime_type = None): """上传视频文件并获取视频文件对象""" try: self.video_file = genai.upload_file(save_path, mime_type=mime_type) await self._wait_for_processing() except Exception as e: Common.logger("ai").info(f'上传视频文件并获取视频文件对象失败异常信息{e}') return f"上传视频文件并获取视频文件对象失败:{e}" async def _wait_for_processing(self): """等待视频文件处理完成""" while self.video_file.state.name == 'PROCESSING': print( '等待视频处理完成...' ) await asyncio.sleep(2) # 使用异步睡眠代替阻塞睡眠 self.video_file = genai.get_file( self.video_file.name ) print( f'视频处理完成: {self.video_file.uri}' ) async def create_cache(self): try: generation_config = { "response_mime_type": "application/json" } """创建缓存内容,并返回生成模型""" # 创建生成模型,使用 gemini-1.5-flash 模型 model = genai.GenerativeModel( model_name="gemini-1.5-flash", generation_config=generation_config, safety_settings={ HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, } ) return model except Exception as e: Common.logger("ai").info(f'视频创建缓存内容,并返回生成模型异常信息{e}') print(f"视频创建缓存内容,并返回生成模型异常信息{e}") return f"视频创建缓存内容,并返回生成模型异常信息{e}" async def analyze_video(self, model, questions, sample_data): try: chat_session = model.start_chat(history=[]) message_content = { "parts": [ self.video_file, str(questions) + "输出返回格式样例:\n" + str(sample_data) ] } response = chat_session.send_message( message_content ) return response except Exception as e: Common.logger("ai").info(f'视频处理请求失败:{e}') print( f"视频处理请求失败:{e}" ) return f"视频处理请求失败:{e}" def video_duration(self, filename): cap = cv2.VideoCapture( filename ) if cap.isOpened(): rate = cap.get( 5 ) frame_num = cap.get( 7 ) duration = frame_num / rate return int(duration) return 0 async def main(video_path, api_key, prompt, sample_data): """主函数,执行视频上传、缓存创建、问题生成""" attempt = 0 max_attempts = 1 while attempt < max_attempts: try: # 初始化视频分析类 analyzer = VideoAnalyzer(api_key ) for file in genai.list_files(): # file.delete() print( " ", file.name ) duration = analyzer.video_duration( video_path ) print( f"视频时长为{duration}秒" ) if int( duration ) >= 600 or int( duration ) == 0: return f"视频时长过长/视频时长为:{duration}秒" save_path = await analyzer.download_video(video_path) if not save_path or save_path == None: if os.path.exists( save_path ): os.remove( save_path ) print( f"文件已删除: {save_path}" ) print("视频下载失败") return "视频下载失败" # 上传并处理视频 upload_response = await analyzer.upload_video( save_path ) if upload_response: await analyzer.delete_video() return upload_response # 创建缓存模型 model =await analyzer.create_cache() if isinstance( model, str ): await analyzer.delete_video() return model print("创建缓存模型成功") # sample_data = { # "一、基础信息": { # "视觉/音乐/文字": "", # "内容选题": "", # "视频主题": "" # }, # "二、主体和场景": { # "视频主体": "", # "视频场景": [] # }, # "三、情感与风格": {}, # "四、视频传播性与观众": { # "片尾引导": {}, # "传播性判断": "", # "观众画像": {} # }, # "五、音画细节": { # "音频细节": {}, # "视频水印": {}, # "视频字幕": {}, # "视频口播": "" # }, # "六、人物与场景": { # "知名人物": {}, # "人物年龄段": "", # "场景描述": [] # }, # "七、时效性与分类": { # "时效性": {}, # "视频一级分类": "", # "二级分类": ["品类- 、分数-", "品类- 、分数-", "品类- 、分数-"] # } # } response =await analyzer.analyze_video( model, prompt, sample_data ) if isinstance( response, str ): await analyzer.delete_video() return response print( response.usage_metadata ) print(response.text) if os.path.exists( save_path ): os.remove( save_path ) print( f"文件已删除: {save_path}" ) await analyzer.delete_video() return response.text except Exception as e: attempt += 1 # 增加尝试次数 if attempt < max_attempts: print(f"重试第 {attempt} 次...") else: print( "达到最大重试次数,处理失败" ) Common.logger( "ai" ).info( f'视频分析处理失败异常信息{e}' ) return f"视频分析处理失败:{e}" if __name__ == "__main__": proxy_url = 'http://127.0.0.1:1081' os.environ["http_proxy"] = proxy_url os.environ["https_proxy"] = proxy_url # video_path = 'http://temp.yishihui.com/longvideo/transcode/video/vpc/20240926/66510681PACx7zsp2wDBHJlicE.mp4' video_path = 'http://temp.yishihui.com/longvideo/transcode/video/vpc/20240605/68754804SJz5E9JNe5hAdSkRwF.mp4' api_key = "AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8" asyncio.run(main(video_path,api_key))