123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 视频识别脚本
- 主要功能:使用 Gemini API 从三个维度分析视频内容
- 1. ASR (Automatic Speech Recognition) - 语音转文字
- 2. OCR - 识别视频画面中的文字
- 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
- """
- import os
- import json
- import time
- import sys
- import uuid
- import requests
- from typing import Dict, Any, List, Optional
- from dotenv import load_dotenv
- # 导入自定义模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- # 导入Google Generative AI
- import google.generativeai as genai
- from google.generativeai.types import HarmCategory, HarmBlockThreshold
- # 缓存目录配置
- CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
- class VideoIdentifier:
- def __init__(self):
- # 加载环境变量
- load_dotenv()
-
- # 获取API密钥
- self.api_key = os.getenv('GEMINI_API_KEY')
- if not self.api_key:
- raise ValueError("未找到GEMINI_API_KEY环境变量")
-
- # 配置Gemini
- genai.configure(api_key=self.api_key)
-
- # 统一的系统提示词 - 三个维度分析
- self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
- 1. ASR (Automatic Speech Recognition) - 语音转文字:
- - 仅提取视频中的语音内容,转换为文字
- - 保持原始语音的准确性和完整性
- - 不要添加分析、解释或评论
- 2. 关键帧提取与描述(包含OCR文字识别):
- - 将视频分解为多个关键时间片段
- - 对每个时间片段进行以下分析:
- * 画面的主要视觉元素和内容
- * 画面的构图和色彩特点
- * 画面中的人物、物体、场景
- * 画面中出现的所有文字内容(OCR识别)
- - 每个时间片段应包含:
- * content: 画面内容的详细描述
- * ocr_content: 该时间段画面中出现的文字内容,仅做文字提取,不要做任何解释或总结
- 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
- {
- "asr_content": "提取的语音文字内容",
- "iframe_details": [
- {
- "time_start": "开始时间(秒)",
- "time_end": "结束时间(秒)",
- "content": "该时间段画面内容的详细描述",
- "ocr_content": "该时间段画面中出现的文字内容"
- }
- ]
- }"""
-
- def download_video(self, video_url: str) -> Optional[str]:
- """下载视频到本地缓存"""
- file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
- try:
- # 确保缓存目录存在
- try:
- os.makedirs(CACHE_DIR, exist_ok=True)
- except Exception as e:
- print(f'创建缓存目录失败: {e}')
- return None
-
- # 尝试下载视频
- for attempt in range(3):
- try:
- response = requests.get(url=video_url, timeout=60)
- if response.status_code == 200:
- try:
- with open(file_path, 'wb') as f:
- f.write(response.content)
- print(f'视频下载成功: {video_url} -> {file_path}')
- return file_path
- except Exception as e:
- print(f'视频保存失败: {e}')
- return None
- else:
- print(f'视频下载失败,状态码: {response.status_code}')
- except Exception as e:
- print(f'下载尝试 {attempt + 1} 失败: {e}')
- if attempt < 2: # 不是最后一次尝试
- time.sleep(1)
- continue
- else:
- print(f'所有下载尝试都失败了')
- return None
-
- except Exception as e:
- print(f'下载过程异常: {e}')
- return None
-
- return None
-
- def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
- """上传视频到Gemini进行分析"""
- max_retries = 3
- retry_delay = 5
-
- for attempt in range(max_retries):
- try:
- print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
- print(f" 文件路径: {video_path}")
-
- # 1. 文件检查
- if not os.path.exists(video_path):
- print(f" 错误: 文件不存在")
- return None
-
- file_size = os.path.getsize(video_path)
- print(f" 文件大小: {file_size / (1024*1024):.2f} MB")
-
- if file_size == 0:
- print(f" 错误: 文件大小为0")
- return None
-
- # 2. 文件权限检查
- try:
- with open(video_path, 'rb') as f:
- # 尝试读取文件开头,检查是否可读
- f.read(1024)
- print(f" 文件权限: 可读")
- except Exception as e:
- print(f" 错误: 文件无法读取 - {e}")
- return None
-
- # 3. 网络连接检查
- try:
- print(f" 检查网络连接...")
- # 测试基本网络连接
- test_response = requests.get("https://generativelanguage.googleapis.com", timeout=10)
- print(f" 网络连接: 正常 (状态码: {test_response.status_code})")
- except Exception as e:
- print(f" 警告: 网络连接测试失败 - {e}")
- print(f" 继续尝试上传...")
-
- # 4. 尝试上传文件
- print(f" 开始上传文件...")
- try:
- video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
- print(f" 文件上传请求已发送,文件ID: {video_file.name}")
- except Exception as e:
- print(f" 错误: 文件上传请求失败 - {e}")
- print(f" 错误类型: {type(e).__name__}")
- print(f" 错误详情: {str(e)}")
-
- # 如果是网络相关错误,尝试重试
- if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
- if attempt < max_retries - 1:
- print(f" 网络错误,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2 # 指数退避
- continue
- else:
- print(f" 所有重试都失败了")
- return None
- else:
- # 非网络错误,直接返回
- return None
-
- # 5. 等待文件处理完成
- print(f" 等待文件处理完成...")
- max_wait_time = 120 # 最大等待2分钟
- wait_count = 0
-
- while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
- time.sleep(2) # 每2秒检查一次
- wait_count += 2
-
- try:
- # 获取最新状态
- video_file = genai.get_file(name=video_file.name)
- current_state = video_file.state.name
- print(f" 状态: {current_state} ({wait_count}秒)")
-
- # 检查是否有错误状态
- if current_state in ['FAILED', 'ERROR', 'INVALID']:
- print(f" 错误: 文件处理失败,状态: {current_state}")
- if hasattr(video_file, 'error'):
- print(f" 错误详情: {video_file.error}")
-
- # 如果是处理失败,尝试重试
- if attempt < max_retries - 1:
- print(f" 文件处理失败,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2
- break # 跳出等待循环,进行重试
- else:
- return None
-
- except Exception as e:
- print(f" 警告: 获取文件状态失败 - {e}")
- if wait_count > 60: # 超过1分钟后,尝试继续
- print(f" 继续等待...")
- continue
- else:
- print(f" 错误: 无法获取文件状态")
- return None
-
- # 6. 检查最终状态
- if video_file.state.name == 'ACTIVE':
- print(f' 视频上传成功: {video_file.name}')
- print(f" 最终状态: {video_file.state.name}")
- return video_file
- else:
- print(f' 错误: 视频文件上传失败')
- print(f" 最终状态: {video_file.state.name}")
- print(f" 等待时间: {wait_count}秒")
-
- # 尝试获取更多错误信息
- try:
- file_info = genai.get_file(name=video_file.name)
- print(f" 文件信息: {file_info}")
- except Exception as e:
- print(f" 无法获取文件详细信息: {e}")
-
- # 如果不是最后一次尝试,进行重试
- if attempt < max_retries - 1:
- print(f" 上传失败,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2
- continue
- else:
- return None
-
- except Exception as e:
- error_type = type(e).__name__
- error_msg = str(e)
-
- print(f' 错误: 视频上传到Gemini失败')
- print(f" 错误类型: {error_type}")
- print(f" 错误信息: {error_msg}")
-
- # 针对特定错误的处理建议
- if "Broken pipe" in error_msg:
- print(f" 诊断: Broken pipe 错误通常表示:")
- print(f" - 网络连接不稳定")
- print(f" - 服务器连接中断")
- print(f" - 防火墙或代理问题")
- print(f" 建议:")
- print(f" - 检查网络连接")
- print(f" - 尝试使用VPN或更换网络")
- print(f" - 检查防火墙设置")
- elif "Connection" in error_msg:
- print(f" 诊断: 连接错误")
- print(f" 建议: 检查网络连接和API密钥")
- elif "Timeout" in error_msg:
- print(f" 诊断: 超时错误")
- print(f" 建议: 网络较慢,可以增加超时时间")
- elif "Permission" in error_msg:
- print(f" 诊断: 权限错误")
- print(f" 建议: 检查API密钥和权限设置")
-
- # 如果是网络相关错误,尝试重试
- if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
- if attempt < max_retries - 1:
- print(f" 网络错误,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2
- continue
- else:
- print(f" 所有重试都失败了")
- return None
- else:
- # 非网络错误,直接返回
- return None
-
- return None
-
- def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]:
- """使用Gemini API分析视频内容"""
-
- try:
- # 创建Gemini模型
- model = genai.GenerativeModel(
- model_name='gemini-2.0-flash',
- generation_config=genai.GenerationConfig(
- response_mime_type='application/json',
- temperature=0.3,
- max_output_tokens=20480
- ),
- safety_settings={
- HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
- HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
- }
- )
-
- # 生成内容
- response = model.generate_content(
- contents=[video_file, self.unified_system_prompt],
- request_options={'timeout': 300}
- )
-
- # 检查错误
- if hasattr(response, '_error') and response._error:
- raise Exception(f"生成错误: {response._error}")
-
- # 解析JSON响应
- try:
- result = json.loads(response.text.strip())
- print(f"[视频分析] 响应: {result}")
-
- if not isinstance(result, dict):
- raise ValueError("响应格式错误:非字典结构")
-
- # 确保包含所有必需字段
- required_fields = ['asr_content', 'iframe_details']
- for field in required_fields:
- if field not in result:
- if field == 'iframe_details':
- result[field] = [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': f'{field}分析失败',
- 'ocr_content': f'{field}分析失败'
- }]
- else:
- result[field] = f"{field}分析失败"
-
- return result
-
- except json.JSONDecodeError as e:
- print(f"JSON解析失败: {e}")
- return {
- 'asr_content': 'ASR分析失败:JSON解析错误',
- 'iframe_details': [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': '关键帧分析失败:JSON解析错误',
- 'ocr_content': '关键帧分析失败:JSON解析错误'
- }]
- }
-
- else:
- return {
- 'asr_content': 'ASR分析失败:API无响应',
- 'iframe_details': [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': '关键帧分析失败:API无响应',
- 'ocr_content': '关键帧分析失败:API无响应'
- }]
- }
-
- except Exception as e:
- return {
- 'asr_content': f'ASR分析失败: {str(e)}',
- 'iframe_details': [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': f'关键帧分析失败: {str(e)}',
- 'ocr_content': f'关键帧分析失败: {str(e)}'
- }]
- }
-
- def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
- """提取视频URL列表"""
- video_data = []
- video_url_list = formatted_content.get('video_url_list', [])
-
- for video_item in video_url_list:
- if isinstance(video_item, dict) and 'video_url' in video_item:
- video_data.append({
- 'url': video_item['video_url'],
- 'duration': video_item.get('video_duration', 0)
- })
-
- return video_data
-
- def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
- """处理单个视频的完整流程"""
- print(f"开始处理视频: {video_info['url'][:50]}...")
-
- video_path = None
- video_file = None
- try:
- # 1. 下载视频
- print(" 1. 下载视频...")
- video_path = self.download_video(video_info['url'])
- if not video_path:
- print(" 视频下载失败")
- return {
- 'url': video_info['url'],
- 'duration': video_info['duration'],
- 'asr_content': '视频下载失败',
- 'iframe_details': [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': '视频下载失败',
- 'ocr_content': '视频下载失败'
- }]
- }
-
- # 2. 上传到Gemini
- print(" 2. 上传视频到Gemini...")
- video_file = self.upload_video_to_gemini(video_path)
- if not video_file:
- print(" 视频上传到Gemini失败")
- return {
- 'url': video_info['url'],
- 'duration': video_info['duration'],
- 'asr_content': '视频上传失败',
- 'iframe_details': [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': '视频上传失败',
- 'ocr_content': '视频上传失败'
- }]
- }
-
- # 3. 使用Gemini分析
- print(" 3. 使用Gemini分析视频内容...")
- analysis_result = self.analyze_video_with_gemini(video_file, video_info)
-
- # 4. 组合结果
- final_result = {
- 'url': video_info['url'],
- 'duration': video_info['duration'],
- 'asr_content': analysis_result.get('asr_content', 'ASR分析失败'),
- 'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
- }
-
- print(" 视频分析完成")
- return final_result
-
- except Exception as e:
- print(f" 视频处理异常: {e}")
- return {
- 'url': video_info['url'],
- 'duration': video_info['duration'],
- 'asr_content': f'处理异常: {str(e)}',
- 'iframe_details': [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': f'处理异常: {str(e)}',
- 'ocr_content': f'处理异常: {str(e)}'
- }],
- 'analysis_timestamp': int(time.time() * 1000)
- }
- finally:
- # 清理临时文件
- if video_path and os.path.exists(video_path):
- try:
- os.remove(video_path)
- print(f" 临时文件已清理: {video_path}")
- except Exception as e:
- print(f" 清理临时文件失败: {e}")
-
- # 清理Gemini文件
- if video_file and hasattr(video_file, 'name'):
- try:
- genai.delete_file(name=video_file.name)
- print(f" Gemini文件已清理: {video_file.name}")
- except Exception as e:
- print(f" 清理Gemini文件失败: {e}")
-
- def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
- """处理视频识别的主函数"""
- print("开始视频识别处理...")
-
- # 提取视频URL
- video_data = self.extract_video_urls(formatted_content)
- print(f"提取到 {len(video_data)} 个视频")
-
- if not video_data:
- print("没有视频需要分析")
- return []
-
- # 逐个处理视频
- results = []
- for i, video_info in enumerate(video_data):
- print(f"\n处理视频 {i+1}/{len(video_data)}")
- result = self.process_video_single(video_info)
- results.append(result)
-
- # 添加延迟避免API限制
- if i < len(video_data) - 1: # 不是最后一个视频
- time.sleep(2)
-
- if results:
- print(f"\n视频识别完成,共分析 {len(results)} 个视频")
- print("分析维度:ASR、关键帧提取")
- else:
- print("视频识别失败")
-
- return results
- def main():
- """测试函数"""
- # 模拟数据
- test_content = {
- "video_url_list": [
- {
- "video_url": "https://vd9.bdstatic.com/mda-rf03dz9qrusbwrrb/mb/720p/mv_cae264_backtrack_720p_normal/1748751326307005666/mda-rf03dz9qrusbwrrb.mp4?v_from_s=hkapp-haokan-hbe&auth_key=1755078490-0-0-94814ae256d196c133940bc5fa7054ea&bcevod_channel=searchbox_feed&cr=2&cd=0&pd=1&pt=3&logid=2890204804&vid=12887026108358975692&klogid=2890204804&abtest=",
- "video_duration": 187
- }
- ]
- }
-
- identifier = VideoIdentifier()
- result = identifier.process_videos(test_content)
-
- print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
- if __name__ == '__main__':
- main()
|