#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 视频识别脚本 主要功能:使用 Gemini API 从三个维度分析视频内容 1. ASR (Automatic Speech Recognition) - 语音转文字 2. OCR - 识别视频画面中的文字 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述 """ import os import json import time import sys import uuid import requests from typing import Dict, Any, List, Optional from dotenv import load_dotenv # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入Google Generative AI import google.generativeai as genai from google.generativeai.types import HarmCategory, HarmBlockThreshold # 缓存目录配置 CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache') class VideoIdentifier: def __init__(self): # 加载环境变量 load_dotenv() # 获取API密钥 self.api_key = os.getenv('GEMINI_API_KEY') if not self.api_key: raise ValueError("未找到GEMINI_API_KEY环境变量") # 配置Gemini genai.configure(api_key=self.api_key) # 统一的系统提示词 - 三个维度分析 self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果: 1. ASR (Automatic Speech Recognition) - 语音转文字: - 仅提取视频中的语音内容,转换为文字 - 保持原始语音的准确性和完整性 - 不要添加分析、解释或评论 2. 关键帧提取与描述(包含OCR文字识别): - 将视频分解为多个关键时间片段 - 对每个时间片段进行以下分析: * 画面的主要视觉元素和内容 * 画面的构图和色彩特点 * 画面中的人物、物体、场景 * 画面中出现的所有文字内容(OCR识别) - 每个时间片段应包含: * content: 画面内容的详细描述 * ocr_content: 该时间段画面中出现的文字内容,仅做文字提取,不要做任何解释或总结 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字: { "asr_content": "提取的语音文字内容", "iframe_details": [ { "time_start": "开始时间(秒)", "time_end": "结束时间(秒)", "content": "该时间段画面内容的详细描述", "ocr_content": "该时间段画面中出现的文字内容" } ] }""" def download_video(self, video_url: str) -> Optional[str]: """下载视频到本地缓存""" file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4') try: # 确保缓存目录存在 try: os.makedirs(CACHE_DIR, exist_ok=True) except Exception as e: print(f'创建缓存目录失败: {e}') return None # 尝试下载视频 for attempt in range(3): try: response = requests.get(url=video_url, timeout=60) if response.status_code == 200: try: with open(file_path, 'wb') as f: f.write(response.content) print(f'视频下载成功: {video_url} -> {file_path}') return file_path except Exception as e: print(f'视频保存失败: {e}') return None else: print(f'视频下载失败,状态码: {response.status_code}') except Exception as e: print(f'下载尝试 {attempt + 1} 失败: {e}') if attempt < 2: # 不是最后一次尝试 time.sleep(1) continue else: print(f'所有下载尝试都失败了') return None except Exception as e: print(f'下载过程异常: {e}') return None return None def upload_video_to_gemini(self, video_path: str) -> Optional[Any]: """上传视频到Gemini进行分析""" max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})") print(f" 文件路径: {video_path}") # 1. 文件检查 if not os.path.exists(video_path): print(f" 错误: 文件不存在") return None file_size = os.path.getsize(video_path) print(f" 文件大小: {file_size / (1024*1024):.2f} MB") if file_size == 0: print(f" 错误: 文件大小为0") return None # 2. 文件权限检查 try: with open(video_path, 'rb') as f: # 尝试读取文件开头,检查是否可读 f.read(1024) print(f" 文件权限: 可读") except Exception as e: print(f" 错误: 文件无法读取 - {e}") return None # 3. 网络连接检查 try: print(f" 检查网络连接...") # 测试基本网络连接 test_response = requests.get("https://generativelanguage.googleapis.com", timeout=10) print(f" 网络连接: 正常 (状态码: {test_response.status_code})") except Exception as e: print(f" 警告: 网络连接测试失败 - {e}") print(f" 继续尝试上传...") # 4. 尝试上传文件 print(f" 开始上传文件...") try: video_file = genai.upload_file(path=video_path, mime_type='video/mp4') print(f" 文件上传请求已发送,文件ID: {video_file.name}") except Exception as e: print(f" 错误: 文件上传请求失败 - {e}") print(f" 错误类型: {type(e).__name__}") print(f" 错误详情: {str(e)}") # 如果是网络相关错误,尝试重试 if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']): if attempt < max_retries - 1: print(f" 网络错误,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 # 指数退避 continue else: print(f" 所有重试都失败了") return None else: # 非网络错误,直接返回 return None # 5. 等待文件处理完成 print(f" 等待文件处理完成...") max_wait_time = 120 # 最大等待2分钟 wait_count = 0 while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time: time.sleep(2) # 每2秒检查一次 wait_count += 2 try: # 获取最新状态 video_file = genai.get_file(name=video_file.name) current_state = video_file.state.name print(f" 状态: {current_state} ({wait_count}秒)") # 检查是否有错误状态 if current_state in ['FAILED', 'ERROR', 'INVALID']: print(f" 错误: 文件处理失败,状态: {current_state}") if hasattr(video_file, 'error'): print(f" 错误详情: {video_file.error}") # 如果是处理失败,尝试重试 if attempt < max_retries - 1: print(f" 文件处理失败,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 break # 跳出等待循环,进行重试 else: return None except Exception as e: print(f" 警告: 获取文件状态失败 - {e}") if wait_count > 60: # 超过1分钟后,尝试继续 print(f" 继续等待...") continue else: print(f" 错误: 无法获取文件状态") return None # 6. 检查最终状态 if video_file.state.name == 'ACTIVE': print(f' 视频上传成功: {video_file.name}') print(f" 最终状态: {video_file.state.name}") return video_file else: print(f' 错误: 视频文件上传失败') print(f" 最终状态: {video_file.state.name}") print(f" 等待时间: {wait_count}秒") # 尝试获取更多错误信息 try: file_info = genai.get_file(name=video_file.name) print(f" 文件信息: {file_info}") except Exception as e: print(f" 无法获取文件详细信息: {e}") # 如果不是最后一次尝试,进行重试 if attempt < max_retries - 1: print(f" 上传失败,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 continue else: return None except Exception as e: error_type = type(e).__name__ error_msg = str(e) print(f' 错误: 视频上传到Gemini失败') print(f" 错误类型: {error_type}") print(f" 错误信息: {error_msg}") # 针对特定错误的处理建议 if "Broken pipe" in error_msg: print(f" 诊断: Broken pipe 错误通常表示:") print(f" - 网络连接不稳定") print(f" - 服务器连接中断") print(f" - 防火墙或代理问题") print(f" 建议:") print(f" - 检查网络连接") print(f" - 尝试使用VPN或更换网络") print(f" - 检查防火墙设置") elif "Connection" in error_msg: print(f" 诊断: 连接错误") print(f" 建议: 检查网络连接和API密钥") elif "Timeout" in error_msg: print(f" 诊断: 超时错误") print(f" 建议: 网络较慢,可以增加超时时间") elif "Permission" in error_msg: print(f" 诊断: 权限错误") print(f" 建议: 检查API密钥和权限设置") # 如果是网络相关错误,尝试重试 if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']): if attempt < max_retries - 1: print(f" 网络错误,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 continue else: print(f" 所有重试都失败了") return None else: # 非网络错误,直接返回 return None return None def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]: """使用Gemini API分析视频内容""" try: # 创建Gemini模型 model = genai.GenerativeModel( model_name='gemini-2.0-flash', generation_config=genai.GenerationConfig( response_mime_type='application/json', temperature=0.3, max_output_tokens=20480 ), safety_settings={ HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, } ) # 生成内容 response = model.generate_content( contents=[video_file, self.unified_system_prompt], request_options={'timeout': 300} ) # 检查错误 if hasattr(response, '_error') and response._error: raise Exception(f"生成错误: {response._error}") # 解析JSON响应 try: result = json.loads(response.text.strip()) print(f"[视频分析] 响应: {result}") if not isinstance(result, dict): raise ValueError("响应格式错误:非字典结构") # 确保包含所有必需字段 required_fields = ['asr_content', 'iframe_details'] for field in required_fields: if field not in result: if field == 'iframe_details': result[field] = [{ 'time_start': 0, 'time_end': 0, 'content': f'{field}分析失败', 'ocr_content': f'{field}分析失败' }] else: result[field] = f"{field}分析失败" return result except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") return { 'asr_content': 'ASR分析失败:JSON解析错误', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': '关键帧分析失败:JSON解析错误', 'ocr_content': '关键帧分析失败:JSON解析错误' }] } else: return { 'asr_content': 'ASR分析失败:API无响应', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': '关键帧分析失败:API无响应', 'ocr_content': '关键帧分析失败:API无响应' }] } except Exception as e: return { 'asr_content': f'ASR分析失败: {str(e)}', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': f'关键帧分析失败: {str(e)}', 'ocr_content': f'关键帧分析失败: {str(e)}' }] } def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]: """提取视频URL列表""" video_data = [] video_url_list = formatted_content.get('video_url_list', []) for video_item in video_url_list: if isinstance(video_item, dict) and 'video_url' in video_item: video_data.append({ 'url': video_item['video_url'], 'duration': video_item.get('video_duration', 0) }) return video_data def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]: """处理单个视频的完整流程""" print(f"开始处理视频: {video_info['url'][:50]}...") video_path = None video_file = None try: # 1. 下载视频 print(" 1. 下载视频...") video_path = self.download_video(video_info['url']) if not video_path: print(" 视频下载失败") return { 'url': video_info['url'], 'duration': video_info['duration'], 'asr_content': '视频下载失败', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': '视频下载失败', 'ocr_content': '视频下载失败' }] } # 2. 上传到Gemini print(" 2. 上传视频到Gemini...") video_file = self.upload_video_to_gemini(video_path) if not video_file: print(" 视频上传到Gemini失败") return { 'url': video_info['url'], 'duration': video_info['duration'], 'asr_content': '视频上传失败', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': '视频上传失败', 'ocr_content': '视频上传失败' }] } # 3. 使用Gemini分析 print(" 3. 使用Gemini分析视频内容...") analysis_result = self.analyze_video_with_gemini(video_file, video_info) # 4. 组合结果 final_result = { 'url': video_info['url'], 'duration': video_info['duration'], 'asr_content': analysis_result.get('asr_content', 'ASR分析失败'), 'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'), } print(" 视频分析完成") return final_result except Exception as e: print(f" 视频处理异常: {e}") return { 'url': video_info['url'], 'duration': video_info['duration'], 'asr_content': f'处理异常: {str(e)}', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': f'处理异常: {str(e)}', 'ocr_content': f'处理异常: {str(e)}' }], 'analysis_timestamp': int(time.time() * 1000) } finally: # 清理临时文件 if video_path and os.path.exists(video_path): try: os.remove(video_path) print(f" 临时文件已清理: {video_path}") except Exception as e: print(f" 清理临时文件失败: {e}") # 清理Gemini文件 if video_file and hasattr(video_file, 'name'): try: genai.delete_file(name=video_file.name) print(f" Gemini文件已清理: {video_file.name}") except Exception as e: print(f" 清理Gemini文件失败: {e}") def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]: """处理视频识别的主函数""" print("开始视频识别处理...") # 提取视频URL video_data = self.extract_video_urls(formatted_content) print(f"提取到 {len(video_data)} 个视频") if not video_data: print("没有视频需要分析") return [] # 逐个处理视频 results = [] for i, video_info in enumerate(video_data): print(f"\n处理视频 {i+1}/{len(video_data)}") result = self.process_video_single(video_info) results.append(result) # 添加延迟避免API限制 if i < len(video_data) - 1: # 不是最后一个视频 time.sleep(2) if results: print(f"\n视频识别完成,共分析 {len(results)} 个视频") print("分析维度:ASR、关键帧提取") else: print("视频识别失败") return results def main(): """测试函数""" # 模拟数据 test_content = { "video_url_list": [ { "video_url": "https://vd9.bdstatic.com/mda-rf03dz9qrusbwrrb/mb/720p/mv_cae264_backtrack_720p_normal/1748751326307005666/mda-rf03dz9qrusbwrrb.mp4?v_from_s=hkapp-haokan-hbe&auth_key=1755078490-0-0-94814ae256d196c133940bc5fa7054ea&bcevod_channel=searchbox_feed&cr=2&cd=0&pd=1&pt=3&logid=2890204804&vid=12887026108358975692&klogid=2890204804&abtest=", "video_duration": 187 } ] } identifier = VideoIdentifier() result = identifier.process_videos(test_content) print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}") if __name__ == '__main__': main()