video_identifier.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 视频识别脚本
  5. 主要功能:使用 Gemini API 从三个维度分析视频内容
  6. 1. ASR (Automatic Speech Recognition) - 语音转文字
  7. 2. OCR - 识别视频画面中的文字
  8. 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
  9. """
  10. import os
  11. import json
  12. import time
  13. import sys
  14. import uuid
  15. import requests
  16. from typing import Dict, Any, List, Optional
  17. from dotenv import load_dotenv
  18. # 导入自定义模块
  19. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  20. # 导入Google Generative AI
  21. import google.generativeai as genai
  22. from google.generativeai.types import HarmCategory, HarmBlockThreshold
  23. # 缓存目录配置
  24. CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
  25. # 缓存文件最大保留时间(秒)
  26. CACHE_MAX_AGE = 3600 # 1小时
  27. class VideoIdentifier:
  28. def __init__(self):
  29. # 加载环境变量
  30. load_dotenv()
  31. # 获取API密钥
  32. self.api_key = os.getenv('GEMINI_API_KEY')
  33. if not self.api_key:
  34. raise ValueError("未找到GEMINI_API_KEY环境变量")
  35. # 配置Gemini
  36. genai.configure(api_key=self.api_key)
  37. # 初始化缓存清理时间
  38. self.last_cache_cleanup = time.time()
  39. # 统一的系统提示词 - 三个维度分析
  40. self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
  41. 1. ASR (Automatic Speech Recognition) - 语音转文字:
  42. - 仅提取视频中的语音内容,转换为文字
  43. - 保持原始语音的准确性和完整性
  44. - 不要添加分析、解释或评论
  45. 2. 关键帧提取与描述(包含OCR文字识别):
  46. - 将视频按照画面场景变化分解为多个关键时间片段
  47. - 对每个时间片段进行以下分析:
  48. * 画面的主要视觉元素和内容, 20个字以内
  49. * 画面中出现的所有文字内容(OCR识别),**注意忽略语音的字幕**
  50. - 每个时间片段应包含:
  51. * content: 画面内容的详细描述,15个字以内
  52. * ocr_content: 该时间段画面中出现的文字内容,仅做画面内文字提取,不要提取字幕文字,不要做任何解释或总结
  53. 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
  54. {
  55. "asr_content": "提取的语音文字内容",
  56. "iframe_details": [
  57. {
  58. "time_start": "开始时间(秒)",
  59. "time_end": "结束时间(秒)",
  60. "content": "该时间段画面内容的详细描述",
  61. "ocr_content": "该时间段画面中出现的文字内容"
  62. }
  63. ]
  64. }"""
  65. def download_video(self, video_url: str) -> Optional[str]:
  66. """下载视频到本地缓存"""
  67. file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
  68. try:
  69. # 确保缓存目录存在
  70. try:
  71. os.makedirs(CACHE_DIR, exist_ok=True)
  72. except Exception as e:
  73. print(f'创建缓存目录失败: {e}')
  74. return None
  75. # 尝试下载视频
  76. for attempt in range(3):
  77. try:
  78. response = requests.get(url=video_url, timeout=60)
  79. if response.status_code == 200:
  80. try:
  81. with open(file_path, 'wb') as f:
  82. f.write(response.content)
  83. # print(f'视频下载成功: {video_url} -> {file_path}')
  84. return file_path
  85. except Exception as e:
  86. print(f'视频保存失败: {e}')
  87. # 保存失败时清理已创建的文件
  88. if os.path.exists(file_path):
  89. try:
  90. os.remove(file_path)
  91. print(f'已清理下载失败的文件: {file_path}')
  92. except:
  93. pass
  94. return None
  95. else:
  96. print(f'视频下载失败,状态码: {response.status_code}')
  97. if attempt == 2: # 最后一次尝试失败
  98. print(f'所有下载尝试都失败了')
  99. return None
  100. except Exception as e:
  101. print(f'下载尝试 {attempt + 1} 失败: {e}')
  102. if attempt < 2: # 不是最后一次尝试
  103. time.sleep(1)
  104. continue
  105. else:
  106. print(f'所有下载尝试都失败了')
  107. return None
  108. except Exception as e:
  109. print(f'下载过程异常: {e}')
  110. return None
  111. return None
  112. def cleanup_cache(self):
  113. """清理过期的缓存文件"""
  114. try:
  115. current_time = time.time()
  116. # 每小时清理一次缓存
  117. if current_time - self.last_cache_cleanup < 3600:
  118. return
  119. if not os.path.exists(CACHE_DIR):
  120. return
  121. cleaned_count = 0
  122. for filename in os.listdir(CACHE_DIR):
  123. file_path = os.path.join(CACHE_DIR, filename)
  124. if os.path.isfile(file_path):
  125. file_age = current_time - os.path.getmtime(file_path)
  126. if file_age > CACHE_MAX_AGE:
  127. try:
  128. os.remove(file_path)
  129. cleaned_count += 1
  130. except Exception as e:
  131. print(f'清理缓存文件失败: {file_path}, 错误: {e}')
  132. if cleaned_count > 0:
  133. print(f'已清理 {cleaned_count} 个过期缓存文件')
  134. self.last_cache_cleanup = current_time
  135. except Exception as e:
  136. print(f'清理缓存失败: {e}')
  137. def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
  138. """上传视频到Gemini进行分析"""
  139. max_retries = 3
  140. retry_delay = 5
  141. for attempt in range(max_retries):
  142. try:
  143. # print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
  144. # print(f" 文件路径: {video_path}")
  145. # 1. 文件检查
  146. if not os.path.exists(video_path):
  147. print(f" 错误: 文件不存在")
  148. return None
  149. file_size = os.path.getsize(video_path)
  150. # print(f" 文件大小: {file_size / (1024*1024):.2f} MB")
  151. if file_size == 0:
  152. print(f" 错误: 文件大小为0")
  153. return None
  154. # 2. 文件权限检查
  155. try:
  156. with open(video_path, 'rb') as f:
  157. # 尝试读取文件开头,检查是否可读
  158. f.read(1024)
  159. # print(f" 文件权限: 可读")
  160. except Exception as e:
  161. print(f" 错误: 文件无法读取 - {e}")
  162. return None
  163. # 4. 尝试上传文件
  164. # print(f" 开始上传文件...")
  165. try:
  166. video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
  167. # print(f" 文件上传请求已发送,文件ID: {video_file.name}")
  168. except Exception as e:
  169. print(f" 错误: 文件上传请求失败 - {e}")
  170. print(f" 错误类型: {type(e).__name__}")
  171. print(f" 错误详情: {str(e)}")
  172. # 如果是网络相关错误,尝试重试
  173. if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
  174. if attempt < max_retries - 1:
  175. print(f" 网络错误,等待 {retry_delay} 秒后重试...")
  176. time.sleep(retry_delay)
  177. retry_delay *= 2 # 指数退避
  178. continue
  179. else:
  180. print(f" 所有重试都失败了")
  181. return None
  182. else:
  183. # 非网络错误,直接返回
  184. return None
  185. # 5. 等待文件处理完成
  186. print(f" 等待文件处理完成...")
  187. max_wait_time = 120 # 最大等待2分钟
  188. wait_count = 0
  189. while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
  190. time.sleep(2) # 每2秒检查一次
  191. wait_count += 2
  192. try:
  193. # 获取最新状态
  194. video_file = genai.get_file(name=video_file.name)
  195. current_state = video_file.state.name
  196. # print(f" 状态: {current_state} ({wait_count}秒)")
  197. # 检查是否有错误状态
  198. if current_state in ['FAILED', 'ERROR', 'INVALID']:
  199. print(f" 错误: 文件处理失败,状态: {current_state}")
  200. if hasattr(video_file, 'error'):
  201. print(f" 错误详情: {video_file.error}")
  202. # 如果是处理失败,尝试重试
  203. if attempt < max_retries - 1:
  204. print(f" 文件处理失败,等待 {retry_delay} 秒后重试...")
  205. time.sleep(retry_delay)
  206. retry_delay *= 2
  207. break # 跳出等待循环,进行重试
  208. else:
  209. return None
  210. except Exception as e:
  211. print(f" 警告: 获取文件状态失败 - {e}")
  212. if wait_count > 60: # 超过1分钟后,尝试继续
  213. print(f" 继续等待...")
  214. continue
  215. else:
  216. print(f" 错误: 无法获取文件状态")
  217. return None
  218. # 6. 检查最终状态
  219. if video_file.state.name == 'ACTIVE':
  220. print(f' 视频上传成功: {video_file.name}')
  221. # print(f" 最终状态: {video_file.state.name}")
  222. return video_file
  223. else:
  224. print(f' 错误: 视频文件上传失败')
  225. # print(f" 最终状态: {video_file.state.name}")
  226. # print(f" 等待时间: {wait_count}秒")
  227. # 尝试获取更多错误信息
  228. try:
  229. file_info = genai.get_file(name=video_file.name)
  230. # print(f" 文件信息: {file_info}")
  231. except Exception as e:
  232. print(f" 无法获取文件详细信息: {e}")
  233. # 如果不是最后一次尝试,进行重试
  234. if attempt < max_retries - 1:
  235. print(f" 上传失败,等待 {retry_delay} 秒后重试...")
  236. time.sleep(retry_delay)
  237. retry_delay *= 2
  238. continue
  239. else:
  240. return None
  241. except Exception as e:
  242. error_type = type(e).__name__
  243. error_msg = str(e)
  244. print(f' 错误: 视频上传到Gemini失败')
  245. print(f" 错误类型: {error_type}")
  246. print(f" 错误信息: {error_msg}")
  247. # 针对特定错误的处理建议
  248. if "Broken pipe" in error_msg:
  249. print(f" 诊断: Broken pipe 错误通常表示:")
  250. print(f" - 网络连接不稳定")
  251. print(f" - 服务器连接中断")
  252. print(f" - 防火墙或代理问题")
  253. print(f" 建议:")
  254. print(f" - 检查网络连接")
  255. print(f" - 尝试使用VPN或更换网络")
  256. print(f" - 检查防火墙设置")
  257. elif "Connection" in error_msg:
  258. print(f" 诊断: 连接错误")
  259. print(f" 建议: 检查网络连接和API密钥")
  260. elif "Timeout" in error_msg:
  261. print(f" 诊断: 超时错误")
  262. print(f" 建议: 网络较慢,可以增加超时时间")
  263. elif "Permission" in error_msg:
  264. print(f" 诊断: 权限错误")
  265. print(f" 建议: 检查API密钥和权限设置")
  266. # 如果是网络相关错误,尝试重试
  267. if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
  268. if attempt < max_retries - 1:
  269. print(f" 网络错误,等待 {retry_delay} 秒后重试...")
  270. time.sleep(retry_delay)
  271. retry_delay *= 2
  272. continue
  273. else:
  274. print(f" 所有重试都失败了")
  275. return None
  276. else:
  277. # 非网络错误,直接返回
  278. print(f" 非网络错误,不进行重试")
  279. return None
  280. return None
  281. def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]:
  282. """使用Gemini API分析视频内容"""
  283. try:
  284. # 创建Gemini模型
  285. model = genai.GenerativeModel(
  286. model_name='gemini-2.5-flash',
  287. generation_config=genai.GenerationConfig(
  288. response_mime_type='application/json',
  289. temperature=0.3,
  290. max_output_tokens=40960
  291. ),
  292. safety_settings={
  293. HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
  294. HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
  295. }
  296. )
  297. # 生成内容
  298. response = model.generate_content(
  299. contents=[video_file, self.unified_system_prompt],
  300. request_options={'timeout': 240}
  301. )
  302. # print(f"response: {response.text}")
  303. # 检查错误
  304. if hasattr(response, '_error') and response._error:
  305. raise Exception(f"生成错误: {response._error}")
  306. # 解析JSON响应
  307. try:
  308. result = json.loads(response.text.strip())
  309. # print(f"[视频分析] 响应: {result}")
  310. if not isinstance(result, dict):
  311. raise ValueError("响应格式错误:非字典结构")
  312. # 确保包含所有必需字段
  313. required_fields = ['asr_content', 'iframe_details']
  314. for field in required_fields:
  315. if field not in result:
  316. if field == 'iframe_details':
  317. result[field] = [{
  318. 'time_start': 0,
  319. 'time_end': 0,
  320. 'content': f'{field}分析失败',
  321. 'ocr_content': f'{field}分析失败'
  322. }]
  323. else:
  324. result[field] = f"{field}分析失败"
  325. return result
  326. except json.JSONDecodeError as e:
  327. print(f"JSON解析失败: {e}")
  328. return {
  329. 'asr_content': 'ASR分析失败:JSON解析错误',
  330. 'iframe_details': [{
  331. 'time_start': 0,
  332. 'time_end': 0,
  333. 'content': '关键帧分析失败:JSON解析错误',
  334. 'ocr_content': '关键帧分析失败:JSON解析错误'
  335. }]
  336. }
  337. else:
  338. return {
  339. 'asr_content': 'ASR分析失败:API无响应',
  340. 'iframe_details': [{
  341. 'time_start': 0,
  342. 'time_end': 0,
  343. 'content': '关键帧分析失败:API无响应',
  344. 'ocr_content': '关键帧分析失败:API无响应'
  345. }]
  346. }
  347. except Exception as e:
  348. return {
  349. 'asr_content': f'ASR分析失败: {str(e)}',
  350. 'iframe_details': [{
  351. 'time_start': 0,
  352. 'time_end': 0,
  353. 'content': f'关键帧分析失败: {str(e)}',
  354. 'ocr_content': f'关键帧分析失败: {str(e)}'
  355. }]
  356. }
  357. def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
  358. """提取视频URL列表"""
  359. video_data = []
  360. video_url_list = formatted_content.get('video_url_list', [])
  361. for video_item in video_url_list:
  362. if isinstance(video_item, dict) and 'video_url' in video_item:
  363. video_data.append({
  364. 'url': video_item['video_url'],
  365. 'duration': video_item.get('video_duration', 0)
  366. })
  367. return video_data
  368. def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
  369. """处理单个视频的完整流程"""
  370. # print(f"开始处理视频: {video_info['url'][:50]}...")
  371. video_path = None
  372. video_file = None
  373. try:
  374. # 1. 下载视频
  375. # print(" 1. 下载视频...")
  376. video_path = self.download_video(video_info['url'])
  377. if not video_path:
  378. # print(" 视频下载失败")
  379. return {
  380. 'url': video_info['url'],
  381. 'duration': video_info['duration'],
  382. 'asr_content': '视频下载失败',
  383. 'iframe_details': [{
  384. 'time_start': 0,
  385. 'time_end': 0,
  386. 'content': '视频下载失败',
  387. 'ocr_content': '视频下载失败'
  388. }]
  389. }
  390. # 2. 上传到Gemini
  391. # print(" 2. 上传视频到Gemini...")
  392. video_file = self.upload_video_to_gemini(video_path)
  393. if not video_file:
  394. # print(" 视频上传到Gemini失败")
  395. # 上传失败时也要清理缓存文件
  396. if video_path and os.path.exists(video_path):
  397. try:
  398. os.remove(video_path)
  399. # print(f" 上传失败,缓存文件已清理: {video_path}")
  400. except Exception as e:
  401. print(f" 清理缓存文件失败: {e}")
  402. return {
  403. 'url': video_info['url'],
  404. 'duration': video_info['duration'],
  405. 'asr_content': '视频上传失败',
  406. 'iframe_details': [{
  407. 'time_start': 0,
  408. 'time_end': 0,
  409. 'content': '视频上传失败',
  410. 'ocr_content': '视频上传失败'
  411. }]
  412. }
  413. # 3. 使用Gemini分析
  414. # print(" 3. 使用Gemini分析视频内容...")
  415. analysis_result = self.analyze_video_with_gemini(video_file, video_info)
  416. # 4. 组合结果
  417. final_result = {
  418. 'url': video_info['url'],
  419. 'duration': video_info['duration'],
  420. 'asr_content': analysis_result.get('asr_content', 'ASR分析失败'),
  421. 'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
  422. }
  423. # print(" 视频分析完成")
  424. return final_result
  425. except Exception as e:
  426. print(f" 视频处理异常: {e}")
  427. # 异常情况下也要清理缓存文件
  428. if video_path and os.path.exists(video_path):
  429. try:
  430. os.remove(video_path)
  431. print(f" 异常处理,缓存文件已清理: {video_path}")
  432. except Exception as e:
  433. print(f" 清理缓存文件失败: {e}")
  434. return {
  435. 'url': video_info['url'],
  436. 'duration': video_info['duration'],
  437. 'asr_content': f'处理异常: {str(e)}',
  438. 'iframe_details': [{
  439. 'time_start': 0,
  440. 'time_end': 0,
  441. 'content': f'处理异常: {str(e)}',
  442. 'ocr_content': f'处理异常: {str(e)}'
  443. }],
  444. 'analysis_timestamp': int(time.time() * 1000)
  445. }
  446. finally:
  447. # 清理临时文件
  448. if video_path and os.path.exists(video_path):
  449. try:
  450. os.remove(video_path)
  451. print(f" 临时文件已清理: {video_path}")
  452. except Exception as e:
  453. print(f" 清理临时文件失败: {e}")
  454. # 清理Gemini文件
  455. if video_file and hasattr(video_file, 'name'):
  456. try:
  457. genai.delete_file(name=video_file.name)
  458. # print(f" Gemini文件已清理: {video_file.name}")
  459. except Exception as e:
  460. print(f" 清理Gemini文件失败: {e}")
  461. def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
  462. """处理视频识别的主函数"""
  463. # 定期清理缓存
  464. self.cleanup_cache()
  465. # 提取视频URL
  466. video_data = self.extract_video_urls(formatted_content)
  467. if not video_data:
  468. return []
  469. # 逐个处理视频
  470. results = []
  471. for i, video_info in enumerate(video_data):
  472. result = self.process_video_single(video_info)
  473. results.append(result)
  474. # 添加延迟避免API限制
  475. if i < len(video_data) - 1: # 不是最后一个视频
  476. time.sleep(2)
  477. return results
  478. def main():
  479. """测试函数"""
  480. # 模拟数据
  481. test_content = {
  482. "video_url_list": [
  483. {
  484. "video_url": "http://rescdn.yishihui.com/pipeline/video/6ab92036-a166-491d-935e-eeeb7c0f2779.mp4",
  485. "video_duration": 187
  486. }
  487. ]
  488. }
  489. identifier = VideoIdentifier()
  490. result = identifier.process_videos(test_content)
  491. # print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
  492. if __name__ == '__main__':
  493. main()