123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 视频识别脚本
- 主要功能:使用 Gemini API 从三个维度分析视频内容
- 1. ASR (Automatic Speech Recognition) - 语音转文字
- 2. OCR - 识别视频画面中的文字
- 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
- """
- import os
- import json
- import time
- import sys
- import uuid
- import requests
- from typing import Dict, Any, List, Optional
- from dotenv import load_dotenv
- from concurrent.futures import ThreadPoolExecutor, as_completed
- # 导入自定义模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- from llm.openrouter import OpenRouterProcessor, OpenRouterModel
- # 导入Google Generative AI
- import google.generativeai as genai
- from google.generativeai.types import HarmCategory, HarmBlockThreshold
- # 缓存目录配置
- CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
- # 缓存文件最大保留时间(秒)
- CACHE_MAX_AGE = 3600 # 1小时
- class VideoIdentifier:
- def __init__(self):
- # 加载环境变量
- load_dotenv()
-
- # 获取API密钥
- self.api_key = os.getenv('GEMINI_API_KEY')
- if not self.api_key:
- raise ValueError("未找到GEMINI_API_KEY环境变量")
-
- # 配置Gemini
- genai.configure(api_key=self.api_key)
-
- # 初始化缓存清理时间
- self.last_cache_cleanup = time.time()
-
- # 统一的系统提示词 - 三个维度分析
- self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
- 1. ASR (Automatic Speech Recognition) - 语音转文字:
- - 仅提取视频中的语音内容,转换为文字
- - 保持原始语音的准确性和完整性
- - 不要添加分析、解释或评论
- 2. 关键帧提取与描述(包含OCR文字识别):
- - 将视频按照画面场景变化分解为多个关键时间片段
- - 对每个时间片段进行以下分析:
- * 画面的主要视觉元素和内容, 20个字以内
- * 画面中出现的所有文字内容(OCR识别),**注意忽略语音的字幕**
- - 每个时间片段应包含:
- * content: 画面内容的详细描述,15个字以内
- * ocr_content: 该时间段画面中出现的文字内容,仅做画面内文字提取,不要提取字幕文字,不要做任何解释或总结
- 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
- {
- "asr_content": "提取的语音文字内容",
- "iframe_details": [
- {
- "time_start": "开始时间(秒)",
- "time_end": "结束时间(秒)",
- "content": "该时间段画面内容的详细描述",
- "ocr_content": "该时间段画面中出现的文字内容"
- }
- ]
- }"""
-
- def download_video(self, video_url: str) -> Optional[str]:
- """下载视频到本地缓存"""
- file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
- try:
- # 确保缓存目录存在
- try:
- os.makedirs(CACHE_DIR, exist_ok=True)
- except Exception as e:
- print(f'创建缓存目录失败: {e}')
- return None
-
- # 尝试下载视频
- for attempt in range(3):
- try:
- response = requests.get(url=video_url, timeout=60)
- if response.status_code == 200:
- try:
- with open(file_path, 'wb') as f:
- f.write(response.content)
- # print(f'视频下载成功: {video_url} -> {file_path}')
- return file_path
- except Exception as e:
- print(f'视频保存失败: {e}')
- # 保存失败时清理已创建的文件
- if os.path.exists(file_path):
- try:
- os.remove(file_path)
- print(f'已清理下载失败的文件: {file_path}')
- except:
- pass
- return None
- else:
- print(f'视频下载失败,状态码: {response.status_code}')
- if attempt == 2: # 最后一次尝试失败
- print(f'所有下载尝试都失败了')
- return None
- except Exception as e:
- print(f'下载尝试 {attempt + 1} 失败: {e}')
- if attempt < 2: # 不是最后一次尝试
- time.sleep(1)
- continue
- else:
- print(f'所有下载尝试都失败了')
- return None
-
- except Exception as e:
- print(f'下载过程异常: {e}')
- return None
-
- return None
-
- def cleanup_cache(self):
- """清理过期的缓存文件"""
- try:
- current_time = time.time()
- # 每小时清理一次缓存
- if current_time - self.last_cache_cleanup < 3600:
- return
-
- if not os.path.exists(CACHE_DIR):
- return
-
- cleaned_count = 0
- for filename in os.listdir(CACHE_DIR):
- file_path = os.path.join(CACHE_DIR, filename)
- if os.path.isfile(file_path):
- file_age = current_time - os.path.getmtime(file_path)
- if file_age > CACHE_MAX_AGE:
- try:
- os.remove(file_path)
- cleaned_count += 1
- except Exception as e:
- print(f'清理缓存文件失败: {file_path}, 错误: {e}')
-
- if cleaned_count > 0:
- print(f'已清理 {cleaned_count} 个过期缓存文件')
-
- self.last_cache_cleanup = current_time
-
- except Exception as e:
- print(f'清理缓存失败: {e}')
-
- def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
- """上传视频到Gemini进行分析"""
- max_retries = 3
- retry_delay = 5
-
- for attempt in range(max_retries):
- try:
- # print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
- # print(f" 文件路径: {video_path}")
-
- # 1. 文件检查
- if not os.path.exists(video_path):
- print(f" 错误: 文件不存在")
- return None
-
- file_size = os.path.getsize(video_path)
- # print(f" 文件大小: {file_size / (1024*1024):.2f} MB")
-
- if file_size == 0:
- print(f" 错误: 文件大小为0")
- return None
-
- # 2. 文件权限检查
- try:
- with open(video_path, 'rb') as f:
- # 尝试读取文件开头,检查是否可读
- f.read(1024)
- # print(f" 文件权限: 可读")
- except Exception as e:
- print(f" 错误: 文件无法读取 - {e}")
- return None
-
- # 4. 尝试上传文件
- # print(f" 开始上传文件...")
- try:
- video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
- # print(f" 文件上传请求已发送,文件ID: {video_file.name}")
- except Exception as e:
- print(f" 错误: 文件上传请求失败 - {e}")
- print(f" 错误类型: {type(e).__name__}")
- print(f" 错误详情: {str(e)}")
-
- # 如果是网络相关错误,尝试重试
- if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
- if attempt < max_retries - 1:
- print(f" 网络错误,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2 # 指数退避
- continue
- else:
- print(f" 所有重试都失败了")
- return None
- else:
- # 非网络错误,直接返回
- return None
-
- # 5. 等待文件处理完成
- print(f" 等待文件处理完成...")
- max_wait_time = 120 # 最大等待2分钟
- wait_count = 0
-
- while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
- time.sleep(2) # 每2秒检查一次
- wait_count += 2
-
- try:
- # 获取最新状态
- video_file = genai.get_file(name=video_file.name)
- current_state = video_file.state.name
- # print(f" 状态: {current_state} ({wait_count}秒)")
-
- # 检查是否有错误状态
- if current_state in ['FAILED', 'ERROR', 'INVALID']:
- print(f" 错误: 文件处理失败,状态: {current_state}")
- if hasattr(video_file, 'error'):
- print(f" 错误详情: {video_file.error}")
-
- # 如果是处理失败,尝试重试
- if attempt < max_retries - 1:
- print(f" 文件处理失败,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2
- break # 跳出等待循环,进行重试
- else:
- return None
-
- except Exception as e:
- print(f" 警告: 获取文件状态失败 - {e}")
- if wait_count > 60: # 超过1分钟后,尝试继续
- print(f" 继续等待...")
- continue
- else:
- print(f" 错误: 无法获取文件状态")
- return None
-
- # 6. 检查最终状态
- if video_file.state.name == 'ACTIVE':
- print(f' 视频上传成功: {video_file.name}')
- # print(f" 最终状态: {video_file.state.name}")
- return video_file
- else:
- print(f' 错误: 视频文件上传失败')
- # print(f" 最终状态: {video_file.state.name}")
- # print(f" 等待时间: {wait_count}秒")
-
- # 尝试获取更多错误信息
- try:
- file_info = genai.get_file(name=video_file.name)
- # print(f" 文件信息: {file_info}")
- except Exception as e:
- print(f" 无法获取文件详细信息: {e}")
-
- # 如果不是最后一次尝试,进行重试
- if attempt < max_retries - 1:
- print(f" 上传失败,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2
- continue
- else:
- return None
-
- except Exception as e:
- error_type = type(e).__name__
- error_msg = str(e)
-
- print(f' 错误: 视频上传到Gemini失败')
- print(f" 错误类型: {error_type}")
- print(f" 错误信息: {error_msg}")
-
- # 针对特定错误的处理建议
- if "Broken pipe" in error_msg:
- print(f" 诊断: Broken pipe 错误通常表示:")
- print(f" - 网络连接不稳定")
- print(f" - 服务器连接中断")
- print(f" - 防火墙或代理问题")
- print(f" 建议:")
- print(f" - 检查网络连接")
- print(f" - 尝试使用VPN或更换网络")
- print(f" - 检查防火墙设置")
- elif "Connection" in error_msg:
- print(f" 诊断: 连接错误")
- print(f" 建议: 检查网络连接和API密钥")
- elif "Timeout" in error_msg:
- print(f" 诊断: 超时错误")
- print(f" 建议: 网络较慢,可以增加超时时间")
- elif "Permission" in error_msg:
- print(f" 诊断: 权限错误")
- print(f" 建议: 检查API密钥和权限设置")
-
- # 如果是网络相关错误,尝试重试
- if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
- if attempt < max_retries - 1:
- print(f" 网络错误,等待 {retry_delay} 秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2
- continue
- else:
- print(f" 所有重试都失败了")
- return None
- else:
- # 非网络错误,直接返回
- print(f" 非网络错误,不进行重试")
- return None
-
- return None
-
- def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
- """提取视频URL列表"""
- video_data = []
- video_url_list = formatted_content.get('video_url_list', [])
-
- for video_item in video_url_list:
- if isinstance(video_item, dict) and 'video_url' in video_item:
- video_data.append({
- 'url': video_item['video_url'],
- 'duration': video_item.get('video_duration', 0)
- })
-
- return video_data
-
- def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
- """处理视频识别的主函数"""
-
- # 提取视频URL
- video_data = self.extract_video_urls(formatted_content)
- if not video_data:
- return []
- # 使用 OpenRouter 批量处理,避免逐个上传/分析
- return self.analyze_videos_with_openrouter(video_data)
- def analyze_videos_with_openrouter(self, video_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """使用 Gemini 并发(最多5条)完成上传+分析的完整流程"""
- try:
- # 系统提示:严格限制为"仅提取内容,不做分析" [[memory:7272937]]
- system_prompt = self.unified_system_prompt
- # 保持输入顺序的结果数组
- results: List[Dict[str, Any]] = [{} for _ in range(len(video_data))]
- url_to_index = {item['url']: idx for idx, item in enumerate(video_data)}
- def complete_video_job(item: Dict[str, Any]) -> Dict[str, Any]:
- """完整的视频处理流程:下载->上传->分析->清理"""
- print(f"开始处理视频: {item}")
- url = item.get('url', '')
- duration = item.get('duration', 0)
- video_file = None
-
- try:
- # 1. 下载视频
- video_path = self.download_video(url)
- if not video_path:
- return {
- 'url': url, 'duration': duration, 'asr_content': '视频下载失败', 'iframe_details': []
- }
- # 2. 上传到 Gemini
- video_file = self.upload_video_to_gemini(video_path)
-
- # 清理本地缓存文件
- try:
- if video_path and os.path.exists(video_path):
- os.remove(video_path)
- except Exception:
- pass
- if not video_file:
- return {
- 'url': url, 'duration': duration, 'asr_content': '视频上传失败', 'iframe_details': []
- }
- # 3. 使用 Gemini 直接分析视频文件
- model = genai.GenerativeModel(
- model_name='gemini-2.5-flash',
- generation_config=genai.GenerationConfig(
- response_mime_type='application/json',
- temperature=0.3,
- max_output_tokens=40960
- ),
- safety_settings={
- HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
- HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
- }
- )
- response = model.generate_content(
- contents=[video_file, system_prompt],
- request_options={'timeout': 240}
- )
- if hasattr(response, '_error') and response._error:
- raise Exception(f"生成错误: {response._error}")
- # 安全获取文本:避免在无 Part 时访问 response.text 抛错
- def safe_extract_text(resp: Any) -> str:
- try:
- # 优先从 candidates 结构中提取
- candidates = getattr(resp, 'candidates', None)
- if candidates and len(candidates) > 0:
- first = candidates[0]
- # 记录 finish_reason 以便错误信息更清晰
- finish_reason = getattr(first, 'finish_reason', None)
- # parts 路径
- content = getattr(first, 'content', None)
- parts = getattr(content, 'parts', None) if content else None
- if parts and len(parts) > 0:
- # 兼容 text 或直接包含的内容
- # 常见 part 为 {text: str}
- part0 = parts[0]
- text = getattr(part0, 'text', None) if hasattr(part0, 'text') else part0.get('text') if isinstance(part0, dict) else None
- if isinstance(text, str) and text.strip():
- return text
- # 若无 parts 或为空,根据 finish_reason 返回清晰错误
- reason = str(finish_reason) if finish_reason is not None else 'unknown'
- raise ValueError(f"无有效输出内容,finish_reason={reason}")
- # 退化到 resp.text(可能抛错)
- if hasattr(resp, 'text') and isinstance(resp.text, str) and resp.text.strip():
- return resp.text
- raise ValueError("响应中没有可用的文本内容")
- except Exception as ex:
- raise ex
- try:
- text_payload = safe_extract_text(response).strip()
- parsed = json.loads(text_payload)
- if not isinstance(parsed, dict):
- raise ValueError("响应格式错误:非字典结构")
-
- # 确保包含所有必需字段
- required_fields = ['asr_content', 'iframe_details']
- for field in required_fields:
- if field not in parsed:
- if field == 'iframe_details':
- parsed[field] = [{
- 'time_start': 0,
- 'time_end': 0,
- 'content': f'{field}分析失败',
- 'ocr_content': f'{field}分析失败'
- }]
- else:
- parsed[field] = f"{field}分析失败"
-
- asr = parsed.get('asr_content', '')
- frames = parsed.get('iframe_details', [])
- if not isinstance(frames, list):
- frames = []
- return {'url': url, 'duration': duration, 'asr_content': asr, 'iframe_details': frames}
-
- except json.JSONDecodeError as e:
- print(f"JSON解析失败: {e}")
- return {
- 'url': url, 'duration': duration,
- 'asr_content': 'ASR分析失败:JSON解析错误',
- 'iframe_details': [{
- 'time_start': 0, 'time_end': 0,
- 'content': '关键帧分析失败:JSON解析错误',
- 'ocr_content': '关键帧分析失败:JSON解析错误'
- }]
- }
- except Exception as e:
- # 捕获无 Part 或 finish_reason 封锁等导致的无法提取文本问题
- err_msg = str(e)
- return {
- 'url': url, 'duration': duration,
- 'asr_content': f'处理失败: {err_msg}',
- 'iframe_details': [{
- 'time_start': 0, 'time_end': 0,
- 'content': f'处理失败: {err_msg}',
- 'ocr_content': f'处理失败: {err_msg}'
- }]
- }
-
- except Exception as e:
- return {
- 'url': url, 'duration': duration,
- 'asr_content': f'处理失败: {str(e)}',
- 'iframe_details': [{
- 'time_start': 0, 'time_end': 0,
- 'content': f'处理失败: {str(e)}',
- 'ocr_content': f'处理失败: {str(e)}'
- }]
- }
- finally:
- # 4. 清理 Gemini 文件
- if video_file and hasattr(video_file, 'name'):
- try:
- genai.delete_file(name=video_file.name)
- except Exception:
- pass
- # 并发处理所有视频(每个线程完成完整流程)
- with ThreadPoolExecutor(max_workers=5) as pool:
- future_to_item = {pool.submit(complete_video_job, item): item for item in video_data}
-
- for future in as_completed(list(future_to_item.keys())):
- result = future.result()
- url = result['url']
- idx = url_to_index[url]
- results[idx] = result
- return results
- except Exception as e:
- print(f"OpenRouter 批量视频分析失败: {e}")
- return [{
- 'url': item.get('url', ''),
- 'duration': item.get('duration', 0),
- 'asr_content': f'处理失败: {str(e)}',
- 'iframe_details': []
- } for item in video_data]
- def main():
- """测试函数"""
- # 模拟数据
- test_content = {
- "video_url_list": [
- {
- "video_url": "http://rescdn.yishihui.com/pipeline/video/489e7c31-4e7c-44cc-872d-b1b1dd42b12d.mp4",
- "video_duration": 187
- },
- # {
- # "video_url": "http://temp.yishihui.com/pipeline/video/43d11b20-6273-4ece-a146-94f63a3992a8.mp4",
- # "video_duration": 100
- # },
- # {
- # "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250731/57463792ND5eu5PAj95sVLi2gB.mp4",
- # "video_duration": 100
- # },
- # {
- # "video_url": "http://temp.yishihui.com/longvideo/transcode/crawler_local/video/prod/20250912/2c278614bd39fc2668f210d752141cb678956536.mp4",
- # "video_duration": 100
- # },
- # {
- # "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250809/5870d4dc9ba18ce57e5af27b81ff1398.mp4",
- # "video_duration": 100
- # },
- # {
- # "video_url": "http://temp.yishihui.com/pipeline/video/202769cd-68a5-41a2-82d9-620d2c72a225.mp4",
- # "video_duration": 100
- # }
- ]
- }
-
- identifier = VideoIdentifier()
- result = identifier.process_videos(test_content)
-
- print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
- if __name__ == '__main__':
- main()
|