#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 视频识别脚本 主要功能:使用 Gemini API 从三个维度分析视频内容 1. ASR (Automatic Speech Recognition) - 语音转文字 2. OCR - 识别视频画面中的文字 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述 """ import os import json import time import sys import uuid import requests from typing import Dict, Any, List, Optional from dotenv import load_dotenv from concurrent.futures import ThreadPoolExecutor, as_completed # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from llm.openrouter import OpenRouterProcessor, OpenRouterModel # 导入Google Generative AI import google.generativeai as genai from google.generativeai.types import HarmCategory, HarmBlockThreshold # 缓存目录配置 CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache') # 缓存文件最大保留时间(秒) CACHE_MAX_AGE = 3600 # 1小时 class VideoIdentifier: def __init__(self): # 加载环境变量 load_dotenv() # 获取API密钥 self.api_key = os.getenv('GEMINI_API_KEY') if not self.api_key: raise ValueError("未找到GEMINI_API_KEY环境变量") # 配置Gemini genai.configure(api_key=self.api_key) # 初始化缓存清理时间 self.last_cache_cleanup = time.time() # 统一的系统提示词 - 三个维度分析 self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果: 1. ASR (Automatic Speech Recognition) - 语音转文字: - 仅提取视频中的语音内容,转换为文字 - 保持原始语音的准确性和完整性 - 不要添加分析、解释或评论 2. 关键帧提取与描述(包含OCR文字识别): - 将视频按照画面场景变化分解为多个关键时间片段 - 对每个时间片段进行以下分析: * 画面的主要视觉元素和内容, 20个字以内 * 画面中出现的所有文字内容(OCR识别),**注意忽略语音的字幕** - 每个时间片段应包含: * content: 画面内容的详细描述,15个字以内 * ocr_content: 该时间段画面中出现的文字内容,仅做画面内文字提取,不要提取字幕文字,不要做任何解释或总结 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字: { "asr_content": "提取的语音文字内容", "iframe_details": [ { "time_start": "开始时间(秒)", "time_end": "结束时间(秒)", "content": "该时间段画面内容的详细描述", "ocr_content": "该时间段画面中出现的文字内容" } ] }""" def download_video(self, video_url: str) -> Optional[str]: """下载视频到本地缓存""" file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4') try: # 确保缓存目录存在 try: os.makedirs(CACHE_DIR, exist_ok=True) except Exception as e: print(f'创建缓存目录失败: {e}') return None # 尝试下载视频 for attempt in range(3): try: response = requests.get(url=video_url, timeout=60) if response.status_code == 200: try: with open(file_path, 'wb') as f: f.write(response.content) # print(f'视频下载成功: {video_url} -> {file_path}') return file_path except Exception as e: print(f'视频保存失败: {e}') # 保存失败时清理已创建的文件 if os.path.exists(file_path): try: os.remove(file_path) print(f'已清理下载失败的文件: {file_path}') except: pass return None else: print(f'视频下载失败,状态码: {response.status_code}') if attempt == 2: # 最后一次尝试失败 print(f'所有下载尝试都失败了') return None except Exception as e: print(f'下载尝试 {attempt + 1} 失败: {e}') if attempt < 2: # 不是最后一次尝试 time.sleep(1) continue else: print(f'所有下载尝试都失败了') return None except Exception as e: print(f'下载过程异常: {e}') return None return None def cleanup_cache(self): """清理过期的缓存文件""" try: current_time = time.time() # 每小时清理一次缓存 if current_time - self.last_cache_cleanup < 3600: return if not os.path.exists(CACHE_DIR): return cleaned_count = 0 for filename in os.listdir(CACHE_DIR): file_path = os.path.join(CACHE_DIR, filename) if os.path.isfile(file_path): file_age = current_time - os.path.getmtime(file_path) if file_age > CACHE_MAX_AGE: try: os.remove(file_path) cleaned_count += 1 except Exception as e: print(f'清理缓存文件失败: {file_path}, 错误: {e}') if cleaned_count > 0: print(f'已清理 {cleaned_count} 个过期缓存文件') self.last_cache_cleanup = current_time except Exception as e: print(f'清理缓存失败: {e}') def upload_video_to_gemini(self, video_path: str) -> Optional[Any]: """上传视频到Gemini进行分析""" max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: # print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})") # print(f" 文件路径: {video_path}") # 1. 文件检查 if not os.path.exists(video_path): print(f" 错误: 文件不存在") return None file_size = os.path.getsize(video_path) # print(f" 文件大小: {file_size / (1024*1024):.2f} MB") if file_size == 0: print(f" 错误: 文件大小为0") return None # 2. 文件权限检查 try: with open(video_path, 'rb') as f: # 尝试读取文件开头,检查是否可读 f.read(1024) # print(f" 文件权限: 可读") except Exception as e: print(f" 错误: 文件无法读取 - {e}") return None # 4. 尝试上传文件 # print(f" 开始上传文件...") try: video_file = genai.upload_file(path=video_path, mime_type='video/mp4') # print(f" 文件上传请求已发送,文件ID: {video_file.name}") except Exception as e: print(f" 错误: 文件上传请求失败 - {e}") print(f" 错误类型: {type(e).__name__}") print(f" 错误详情: {str(e)}") # 如果是网络相关错误,尝试重试 if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']): if attempt < max_retries - 1: print(f" 网络错误,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 # 指数退避 continue else: print(f" 所有重试都失败了") return None else: # 非网络错误,直接返回 return None # 5. 等待文件处理完成 print(f" 等待文件处理完成...") max_wait_time = 120 # 最大等待2分钟 wait_count = 0 while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time: time.sleep(2) # 每2秒检查一次 wait_count += 2 try: # 获取最新状态 video_file = genai.get_file(name=video_file.name) current_state = video_file.state.name # print(f" 状态: {current_state} ({wait_count}秒)") # 检查是否有错误状态 if current_state in ['FAILED', 'ERROR', 'INVALID']: print(f" 错误: 文件处理失败,状态: {current_state}") if hasattr(video_file, 'error'): print(f" 错误详情: {video_file.error}") # 如果是处理失败,尝试重试 if attempt < max_retries - 1: print(f" 文件处理失败,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 break # 跳出等待循环,进行重试 else: return None except Exception as e: print(f" 警告: 获取文件状态失败 - {e}") if wait_count > 60: # 超过1分钟后,尝试继续 print(f" 继续等待...") continue else: print(f" 错误: 无法获取文件状态") return None # 6. 检查最终状态 if video_file.state.name == 'ACTIVE': print(f' 视频上传成功: {video_file.name}') # print(f" 最终状态: {video_file.state.name}") return video_file else: print(f' 错误: 视频文件上传失败') # print(f" 最终状态: {video_file.state.name}") # print(f" 等待时间: {wait_count}秒") # 尝试获取更多错误信息 try: file_info = genai.get_file(name=video_file.name) # print(f" 文件信息: {file_info}") except Exception as e: print(f" 无法获取文件详细信息: {e}") # 如果不是最后一次尝试,进行重试 if attempt < max_retries - 1: print(f" 上传失败,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 continue else: return None except Exception as e: error_type = type(e).__name__ error_msg = str(e) print(f' 错误: 视频上传到Gemini失败') print(f" 错误类型: {error_type}") print(f" 错误信息: {error_msg}") # 针对特定错误的处理建议 if "Broken pipe" in error_msg: print(f" 诊断: Broken pipe 错误通常表示:") print(f" - 网络连接不稳定") print(f" - 服务器连接中断") print(f" - 防火墙或代理问题") print(f" 建议:") print(f" - 检查网络连接") print(f" - 尝试使用VPN或更换网络") print(f" - 检查防火墙设置") elif "Connection" in error_msg: print(f" 诊断: 连接错误") print(f" 建议: 检查网络连接和API密钥") elif "Timeout" in error_msg: print(f" 诊断: 超时错误") print(f" 建议: 网络较慢,可以增加超时时间") elif "Permission" in error_msg: print(f" 诊断: 权限错误") print(f" 建议: 检查API密钥和权限设置") # 如果是网络相关错误,尝试重试 if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']): if attempt < max_retries - 1: print(f" 网络错误,等待 {retry_delay} 秒后重试...") time.sleep(retry_delay) retry_delay *= 2 continue else: print(f" 所有重试都失败了") return None else: # 非网络错误,直接返回 print(f" 非网络错误,不进行重试") return None return None def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]: """提取视频URL列表""" video_data = [] video_url_list = formatted_content.get('video_url_list', []) for video_item in video_url_list: if isinstance(video_item, dict) and 'video_url' in video_item: video_data.append({ 'url': video_item['video_url'], 'duration': video_item.get('video_duration', 0) }) return video_data def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]: """处理视频识别的主函数""" # 提取视频URL video_data = self.extract_video_urls(formatted_content) if not video_data: return [] # 使用 OpenRouter 批量处理,避免逐个上传/分析 return self.analyze_videos_with_openrouter(video_data) def analyze_videos_with_openrouter(self, video_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """使用 Gemini 并发(最多5条)完成上传+分析的完整流程""" try: # 系统提示:严格限制为"仅提取内容,不做分析" [[memory:7272937]] system_prompt = self.unified_system_prompt # 保持输入顺序的结果数组 results: List[Dict[str, Any]] = [{} for _ in range(len(video_data))] url_to_index = {item['url']: idx for idx, item in enumerate(video_data)} def complete_video_job(item: Dict[str, Any]) -> Dict[str, Any]: """完整的视频处理流程:下载->上传->分析->清理""" print(f"开始处理视频: {item}") url = item.get('url', '') duration = item.get('duration', 0) video_file = None try: # 1. 下载视频 video_path = self.download_video(url) if not video_path: return { 'url': url, 'duration': duration, 'asr_content': '视频下载失败', 'iframe_details': [] } # 2. 上传到 Gemini video_file = self.upload_video_to_gemini(video_path) # 清理本地缓存文件 try: if video_path and os.path.exists(video_path): os.remove(video_path) except Exception: pass if not video_file: return { 'url': url, 'duration': duration, 'asr_content': '视频上传失败', 'iframe_details': [] } # 3. 使用 Gemini 直接分析视频文件 model = genai.GenerativeModel( model_name='gemini-2.5-flash', generation_config=genai.GenerationConfig( response_mime_type='application/json', temperature=0.3, max_output_tokens=40960 ), safety_settings={ HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, } ) response = model.generate_content( contents=[video_file, system_prompt], request_options={'timeout': 240} ) if hasattr(response, '_error') and response._error: raise Exception(f"生成错误: {response._error}") # 安全获取文本:避免在无 Part 时访问 response.text 抛错 def safe_extract_text(resp: Any) -> str: try: # 优先从 candidates 结构中提取 candidates = getattr(resp, 'candidates', None) if candidates and len(candidates) > 0: first = candidates[0] # 记录 finish_reason 以便错误信息更清晰 finish_reason = getattr(first, 'finish_reason', None) # parts 路径 content = getattr(first, 'content', None) parts = getattr(content, 'parts', None) if content else None if parts and len(parts) > 0: # 兼容 text 或直接包含的内容 # 常见 part 为 {text: str} part0 = parts[0] text = getattr(part0, 'text', None) if hasattr(part0, 'text') else part0.get('text') if isinstance(part0, dict) else None if isinstance(text, str) and text.strip(): return text # 若无 parts 或为空,根据 finish_reason 返回清晰错误 reason = str(finish_reason) if finish_reason is not None else 'unknown' raise ValueError(f"无有效输出内容,finish_reason={reason}") # 退化到 resp.text(可能抛错) if hasattr(resp, 'text') and isinstance(resp.text, str) and resp.text.strip(): return resp.text raise ValueError("响应中没有可用的文本内容") except Exception as ex: raise ex try: text_payload = safe_extract_text(response).strip() parsed = json.loads(text_payload) if not isinstance(parsed, dict): raise ValueError("响应格式错误:非字典结构") # 确保包含所有必需字段 required_fields = ['asr_content', 'iframe_details'] for field in required_fields: if field not in parsed: if field == 'iframe_details': parsed[field] = [{ 'time_start': 0, 'time_end': 0, 'content': f'{field}分析失败', 'ocr_content': f'{field}分析失败' }] else: parsed[field] = f"{field}分析失败" asr = parsed.get('asr_content', '') frames = parsed.get('iframe_details', []) if not isinstance(frames, list): frames = [] return {'url': url, 'duration': duration, 'asr_content': asr, 'iframe_details': frames} except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") return { 'url': url, 'duration': duration, 'asr_content': 'ASR分析失败:JSON解析错误', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': '关键帧分析失败:JSON解析错误', 'ocr_content': '关键帧分析失败:JSON解析错误' }] } except Exception as e: # 捕获无 Part 或 finish_reason 封锁等导致的无法提取文本问题 err_msg = str(e) return { 'url': url, 'duration': duration, 'asr_content': f'处理失败: {err_msg}', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': f'处理失败: {err_msg}', 'ocr_content': f'处理失败: {err_msg}' }] } except Exception as e: return { 'url': url, 'duration': duration, 'asr_content': f'处理失败: {str(e)}', 'iframe_details': [{ 'time_start': 0, 'time_end': 0, 'content': f'处理失败: {str(e)}', 'ocr_content': f'处理失败: {str(e)}' }] } finally: # 4. 清理 Gemini 文件 if video_file and hasattr(video_file, 'name'): try: genai.delete_file(name=video_file.name) except Exception: pass # 并发处理所有视频(每个线程完成完整流程) with ThreadPoolExecutor(max_workers=5) as pool: future_to_item = {pool.submit(complete_video_job, item): item for item in video_data} for future in as_completed(list(future_to_item.keys())): result = future.result() url = result['url'] idx = url_to_index[url] results[idx] = result return results except Exception as e: print(f"OpenRouter 批量视频分析失败: {e}") return [{ 'url': item.get('url', ''), 'duration': item.get('duration', 0), 'asr_content': f'处理失败: {str(e)}', 'iframe_details': [] } for item in video_data] def main(): """测试函数""" # 模拟数据 test_content = { "video_url_list": [ { "video_url": "http://rescdn.yishihui.com/pipeline/video/489e7c31-4e7c-44cc-872d-b1b1dd42b12d.mp4", "video_duration": 187 }, # { # "video_url": "http://temp.yishihui.com/pipeline/video/43d11b20-6273-4ece-a146-94f63a3992a8.mp4", # "video_duration": 100 # }, # { # "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250731/57463792ND5eu5PAj95sVLi2gB.mp4", # "video_duration": 100 # }, # { # "video_url": "http://temp.yishihui.com/longvideo/transcode/crawler_local/video/prod/20250912/2c278614bd39fc2668f210d752141cb678956536.mp4", # "video_duration": 100 # }, # { # "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250809/5870d4dc9ba18ce57e5af27b81ff1398.mp4", # "video_duration": 100 # }, # { # "video_url": "http://temp.yishihui.com/pipeline/video/202769cd-68a5-41a2-82d9-620d2c72a225.mp4", # "video_duration": 100 # } ] } identifier = VideoIdentifier() result = identifier.process_videos(test_content) print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}") if __name__ == '__main__': main()