123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import asyncio
- import os
- import time
- import cv2
- import requests
- import google.generativeai as genai
- import uuid
- from google.generativeai.types import HarmCategory, HarmBlockThreshold
- from common import Common
- class VideoAnalyzer:
- def __init__(self, api_key):
- """初始化类,配置 API 密钥和视频路径"""
- genai.configure(api_key=api_key)
- self.video_file = None
- async def process_and_delete_file(self, file_path):
- """删除文件"""
- try:
- print(f"正在处理文件: {file_path}" )
- os.remove( file_path )
- print( f"文件已删除: {file_path}" )
- except Exception as e:
- print( f"处理或删除文件时发生错误: {str( e )}" )
- async def download_video(self, video_url, save_directory='/root/google_ai_studio/path'):
- # async def download_video(self, video_url, save_directory='/Users/tzld/Desktop/google_ai_studio/path'):
- """从给定的视频链接下载视频并保存到指定路径"""
- random_filename = f"{uuid.uuid4()}.mp4"
- save_path = os.path.join(save_directory, random_filename)
- max_retries = 3 # 最大重试次数
- for attempt in range(max_retries):
- try:
- # 发送 GET 请求获取视频内容
- headers = {}
- payload = {}
- response = requests.request("GET", video_url, headers=headers, data=payload)
- if response.status_code == 200:
- # 以二进制写入模式打开文件
- with open(save_path, "wb") as file:
- # 将响应内容写入文件
- file.write(response.content)
- print(f"视频已成功下载并保存到: {save_path}")
- return save_path
- else:
- print(f"请求失败,状态码: {response.status_code}")
- except requests.exceptions.RequestException as e:
- print(f"下载视频时出现错误: {e}")
- # 如果下载失败,等待一段时间后重试
- time.sleep(1)
- print("达到最大重试次数,视频下载失败")
- return None
- async def delete_video(self):
- try:
- self.video_file.delete()
- except Exception as e:
- Common.logger( "ai" ).info( f'视频删除异常{e}' )
- async def upload_video(self, save_path, mime_type = None):
- """上传视频文件并获取视频文件对象"""
- try:
- self.video_file = genai.upload_file(save_path, mime_type=mime_type)
- await self._wait_for_processing()
- except Exception as e:
- Common.logger("ai").info(f'上传视频文件并获取视频文件对象失败异常信息{e}')
- return f"上传视频文件并获取视频文件对象失败:{e}"
- async def _wait_for_processing(self):
- """等待视频文件处理完成"""
- while self.video_file.state.name == 'PROCESSING':
- print( '等待视频处理完成...' )
- await asyncio.sleep(2) # 使用异步睡眠代替阻塞睡眠
- self.video_file = genai.get_file( self.video_file.name )
- print( f'视频处理完成: {self.video_file.uri}' )
- async def create_cache(self):
- try:
- generation_config = {
- "response_mime_type": "application/json"
- }
- """创建缓存内容,并返回生成模型"""
- # 创建生成模型,使用 gemini-1.5-flash 模型
- model = genai.GenerativeModel(
- model_name="gemini-1.5-flash",
- generation_config=generation_config,
- safety_settings={
- HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
- }
- )
- return model
- except Exception as e:
- Common.logger("ai").info(f'视频创建缓存内容,并返回生成模型异常信息{e}')
- print(f"视频创建缓存内容,并返回生成模型异常信息{e}")
- return f"视频创建缓存内容,并返回生成模型异常信息{e}"
- async def analyze_video(self, model, questions, sample_data):
- try:
- chat_session = model.start_chat(history=[])
- message_content = {
- "parts": [
- self.video_file,
- str(questions) +
- "输出返回格式样例:\n" + str(sample_data)
- ]
- }
- response = chat_session.send_message( message_content )
- return response
- except Exception as e:
- Common.logger("ai").info(f'视频处理请求失败:{e}')
- print( f"视频处理请求失败:{e}" )
- return f"视频处理请求失败:{e}"
- def video_duration(self, filename):
- cap = cv2.VideoCapture( filename )
- if cap.isOpened():
- rate = cap.get( 5 )
- frame_num = cap.get( 7 )
- duration = frame_num / rate
- return int(duration)
- return 0
- async def main(video_path, api_key, prompt, sample_data):
- """主函数,执行视频上传、缓存创建、问题生成"""
- attempt = 0
- max_attempts = 1
- while attempt < max_attempts:
- try:
- # 初始化视频分析类
- analyzer = VideoAnalyzer(api_key )
- for file in genai.list_files():
- # file.delete()
- print( " ", file.name )
- duration = analyzer.video_duration( video_path )
- print( f"视频时长为{duration}秒" )
- if int( duration ) >= 600 or int( duration ) == 0:
- return f"视频时长过长/视频时长为:{duration}秒"
- save_path = await analyzer.download_video(video_path)
- if not save_path or save_path == None:
- if os.path.exists( save_path ):
- os.remove( save_path )
- print( f"文件已删除: {save_path}" )
- print("视频下载失败")
- return "视频下载失败"
- # 上传并处理视频
- upload_response = await analyzer.upload_video( save_path )
- if upload_response:
- await analyzer.delete_video()
- return upload_response
- # 创建缓存模型
- model =await analyzer.create_cache()
- if isinstance( model, str ):
- await analyzer.delete_video()
- return model
- print("创建缓存模型成功")
- # sample_data = {
- # "一、基础信息": {
- # "视觉/音乐/文字": "",
- # "内容选题": "",
- # "视频主题": ""
- # },
- # "二、主体和场景": {
- # "视频主体": "",
- # "视频场景": []
- # },
- # "三、情感与风格": {},
- # "四、视频传播性与观众": {
- # "片尾引导": {},
- # "传播性判断": "",
- # "观众画像": {}
- # },
- # "五、音画细节": {
- # "音频细节": {},
- # "视频水印": {},
- # "视频字幕": {},
- # "视频口播": ""
- # },
- # "六、人物与场景": {
- # "知名人物": {},
- # "人物年龄段": "",
- # "场景描述": []
- # },
- # "七、时效性与分类": {
- # "时效性": {},
- # "视频一级分类": "",
- # "二级分类": ["品类- 、分数-", "品类- 、分数-", "品类- 、分数-"]
- # }
- # }
- response =await analyzer.analyze_video( model, prompt, sample_data )
- if isinstance( response, str ):
- await analyzer.delete_video()
- return response
- print( response.usage_metadata )
- print(response.text)
- if os.path.exists( save_path ):
- os.remove( save_path )
- print( f"文件已删除: {save_path}" )
- await analyzer.delete_video()
- return response.text
- except Exception as e:
- attempt += 1 # 增加尝试次数
- if attempt < max_attempts:
- print(f"重试第 {attempt} 次...")
- else:
- print( "达到最大重试次数,处理失败" )
- Common.logger( "ai" ).info( f'视频分析处理失败异常信息{e}' )
- return f"视频分析处理失败:{e}"
- if __name__ == "__main__":
- proxy_url = 'http://127.0.0.1:1081'
- os.environ["http_proxy"] = proxy_url
- os.environ["https_proxy"] = proxy_url
- # video_path = 'http://temp.yishihui.com/longvideo/transcode/video/vpc/20240926/66510681PACx7zsp2wDBHJlicE.mp4'
- video_path = 'http://temp.yishihui.com/longvideo/transcode/video/vpc/20240605/68754804SJz5E9JNe5hAdSkRwF.mp4'
- api_key = "AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8"
- asyncio.run(main(video_path,api_key))
|