video_identifier.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 视频识别脚本
  5. 主要功能:使用 Gemini API 从三个维度分析视频内容
  6. 1. ASR (Automatic Speech Recognition) - 语音转文字
  7. 2. OCR - 识别视频画面中的文字
  8. 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
  9. """
  10. import os
  11. import json
  12. import time
  13. import sys
  14. import uuid
  15. import requests
  16. from typing import Dict, Any, List, Optional
  17. from dotenv import load_dotenv
  18. # 导入自定义模块
  19. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  20. # 导入Google Generative AI
  21. import google.generativeai as genai
  22. from google.generativeai.types import HarmCategory, HarmBlockThreshold
  23. # 缓存目录配置
  24. CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
  25. class VideoIdentifier:
  26. def __init__(self):
  27. # 加载环境变量
  28. load_dotenv()
  29. # 获取API密钥
  30. self.api_key = os.getenv('GEMINI_API_KEY')
  31. if not self.api_key:
  32. raise ValueError("未找到GEMINI_API_KEY环境变量")
  33. # 配置Gemini
  34. genai.configure(api_key=self.api_key)
  35. # 统一的系统提示词 - 三个维度分析
  36. self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
  37. 1. ASR (Automatic Speech Recognition) - 语音转文字:
  38. - 仅提取视频中的语音内容,转换为文字
  39. - 保持原始语音的准确性和完整性
  40. - 不要添加分析、解释或评论
  41. 2. 关键帧提取与描述(包含OCR文字识别):
  42. - 将视频分解为多个关键时间片段
  43. - 对每个时间片段进行以下分析:
  44. * 画面的主要视觉元素和内容
  45. * 画面的构图和色彩特点
  46. * 画面中的人物、物体、场景
  47. * 画面中出现的所有文字内容(OCR识别)
  48. - 每个时间片段应包含:
  49. * content: 画面内容的详细描述
  50. * ocr_content: 该时间段画面中出现的文字内容,仅做文字提取,不要做任何解释或总结
  51. 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
  52. {
  53. "asr_content": "提取的语音文字内容",
  54. "iframe_details": [
  55. {
  56. "time_start": "开始时间(秒)",
  57. "time_end": "结束时间(秒)",
  58. "content": "该时间段画面内容的详细描述",
  59. "ocr_content": "该时间段画面中出现的文字内容"
  60. }
  61. ]
  62. }"""
  63. def download_video(self, video_url: str) -> Optional[str]:
  64. """下载视频到本地缓存"""
  65. file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
  66. try:
  67. # 确保缓存目录存在
  68. try:
  69. os.makedirs(CACHE_DIR, exist_ok=True)
  70. except Exception as e:
  71. print(f'创建缓存目录失败: {e}')
  72. return None
  73. # 尝试下载视频
  74. for attempt in range(3):
  75. try:
  76. response = requests.get(url=video_url, timeout=60)
  77. if response.status_code == 200:
  78. try:
  79. with open(file_path, 'wb') as f:
  80. f.write(response.content)
  81. print(f'视频下载成功: {video_url} -> {file_path}')
  82. return file_path
  83. except Exception as e:
  84. print(f'视频保存失败: {e}')
  85. return None
  86. else:
  87. print(f'视频下载失败,状态码: {response.status_code}')
  88. except Exception as e:
  89. print(f'下载尝试 {attempt + 1} 失败: {e}')
  90. if attempt < 2: # 不是最后一次尝试
  91. time.sleep(1)
  92. continue
  93. else:
  94. print(f'所有下载尝试都失败了')
  95. return None
  96. except Exception as e:
  97. print(f'下载过程异常: {e}')
  98. return None
  99. return None
  100. def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
  101. """上传视频到Gemini进行分析"""
  102. max_retries = 3
  103. retry_delay = 5
  104. for attempt in range(max_retries):
  105. try:
  106. print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
  107. print(f" 文件路径: {video_path}")
  108. # 1. 文件检查
  109. if not os.path.exists(video_path):
  110. print(f" 错误: 文件不存在")
  111. return None
  112. file_size = os.path.getsize(video_path)
  113. print(f" 文件大小: {file_size / (1024*1024):.2f} MB")
  114. if file_size == 0:
  115. print(f" 错误: 文件大小为0")
  116. return None
  117. # 2. 文件权限检查
  118. try:
  119. with open(video_path, 'rb') as f:
  120. # 尝试读取文件开头,检查是否可读
  121. f.read(1024)
  122. print(f" 文件权限: 可读")
  123. except Exception as e:
  124. print(f" 错误: 文件无法读取 - {e}")
  125. return None
  126. # 3. 网络连接检查
  127. try:
  128. print(f" 检查网络连接...")
  129. # 测试基本网络连接
  130. test_response = requests.get("https://generativelanguage.googleapis.com", timeout=10)
  131. print(f" 网络连接: 正常 (状态码: {test_response.status_code})")
  132. except Exception as e:
  133. print(f" 警告: 网络连接测试失败 - {e}")
  134. print(f" 继续尝试上传...")
  135. # 4. 尝试上传文件
  136. print(f" 开始上传文件...")
  137. try:
  138. video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
  139. print(f" 文件上传请求已发送,文件ID: {video_file.name}")
  140. except Exception as e:
  141. print(f" 错误: 文件上传请求失败 - {e}")
  142. print(f" 错误类型: {type(e).__name__}")
  143. print(f" 错误详情: {str(e)}")
  144. # 如果是网络相关错误,尝试重试
  145. if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
  146. if attempt < max_retries - 1:
  147. print(f" 网络错误,等待 {retry_delay} 秒后重试...")
  148. time.sleep(retry_delay)
  149. retry_delay *= 2 # 指数退避
  150. continue
  151. else:
  152. print(f" 所有重试都失败了")
  153. return None
  154. else:
  155. # 非网络错误,直接返回
  156. return None
  157. # 5. 等待文件处理完成
  158. print(f" 等待文件处理完成...")
  159. max_wait_time = 120 # 最大等待2分钟
  160. wait_count = 0
  161. while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
  162. time.sleep(2) # 每2秒检查一次
  163. wait_count += 2
  164. try:
  165. # 获取最新状态
  166. video_file = genai.get_file(name=video_file.name)
  167. current_state = video_file.state.name
  168. print(f" 状态: {current_state} ({wait_count}秒)")
  169. # 检查是否有错误状态
  170. if current_state in ['FAILED', 'ERROR', 'INVALID']:
  171. print(f" 错误: 文件处理失败,状态: {current_state}")
  172. if hasattr(video_file, 'error'):
  173. print(f" 错误详情: {video_file.error}")
  174. # 如果是处理失败,尝试重试
  175. if attempt < max_retries - 1:
  176. print(f" 文件处理失败,等待 {retry_delay} 秒后重试...")
  177. time.sleep(retry_delay)
  178. retry_delay *= 2
  179. break # 跳出等待循环,进行重试
  180. else:
  181. return None
  182. except Exception as e:
  183. print(f" 警告: 获取文件状态失败 - {e}")
  184. if wait_count > 60: # 超过1分钟后,尝试继续
  185. print(f" 继续等待...")
  186. continue
  187. else:
  188. print(f" 错误: 无法获取文件状态")
  189. return None
  190. # 6. 检查最终状态
  191. if video_file.state.name == 'ACTIVE':
  192. print(f' 视频上传成功: {video_file.name}')
  193. print(f" 最终状态: {video_file.state.name}")
  194. return video_file
  195. else:
  196. print(f' 错误: 视频文件上传失败')
  197. print(f" 最终状态: {video_file.state.name}")
  198. print(f" 等待时间: {wait_count}秒")
  199. # 尝试获取更多错误信息
  200. try:
  201. file_info = genai.get_file(name=video_file.name)
  202. print(f" 文件信息: {file_info}")
  203. except Exception as e:
  204. print(f" 无法获取文件详细信息: {e}")
  205. # 如果不是最后一次尝试,进行重试
  206. if attempt < max_retries - 1:
  207. print(f" 上传失败,等待 {retry_delay} 秒后重试...")
  208. time.sleep(retry_delay)
  209. retry_delay *= 2
  210. continue
  211. else:
  212. return None
  213. except Exception as e:
  214. error_type = type(e).__name__
  215. error_msg = str(e)
  216. print(f' 错误: 视频上传到Gemini失败')
  217. print(f" 错误类型: {error_type}")
  218. print(f" 错误信息: {error_msg}")
  219. # 针对特定错误的处理建议
  220. if "Broken pipe" in error_msg:
  221. print(f" 诊断: Broken pipe 错误通常表示:")
  222. print(f" - 网络连接不稳定")
  223. print(f" - 服务器连接中断")
  224. print(f" - 防火墙或代理问题")
  225. print(f" 建议:")
  226. print(f" - 检查网络连接")
  227. print(f" - 尝试使用VPN或更换网络")
  228. print(f" - 检查防火墙设置")
  229. elif "Connection" in error_msg:
  230. print(f" 诊断: 连接错误")
  231. print(f" 建议: 检查网络连接和API密钥")
  232. elif "Timeout" in error_msg:
  233. print(f" 诊断: 超时错误")
  234. print(f" 建议: 网络较慢,可以增加超时时间")
  235. elif "Permission" in error_msg:
  236. print(f" 诊断: 权限错误")
  237. print(f" 建议: 检查API密钥和权限设置")
  238. # 如果是网络相关错误,尝试重试
  239. if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
  240. if attempt < max_retries - 1:
  241. print(f" 网络错误,等待 {retry_delay} 秒后重试...")
  242. time.sleep(retry_delay)
  243. retry_delay *= 2
  244. continue
  245. else:
  246. print(f" 所有重试都失败了")
  247. return None
  248. else:
  249. # 非网络错误,直接返回
  250. return None
  251. return None
  252. def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]:
  253. """使用Gemini API分析视频内容"""
  254. try:
  255. # 创建Gemini模型
  256. model = genai.GenerativeModel(
  257. model_name='gemini-2.0-flash',
  258. generation_config=genai.GenerationConfig(
  259. response_mime_type='application/json',
  260. temperature=0.3,
  261. max_output_tokens=20480
  262. ),
  263. safety_settings={
  264. HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
  265. HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
  266. }
  267. )
  268. # 生成内容
  269. response = model.generate_content(
  270. contents=[video_file, self.unified_system_prompt],
  271. request_options={'timeout': 300}
  272. )
  273. # 检查错误
  274. if hasattr(response, '_error') and response._error:
  275. raise Exception(f"生成错误: {response._error}")
  276. # 解析JSON响应
  277. try:
  278. result = json.loads(response.text.strip())
  279. print(f"[视频分析] 响应: {result}")
  280. if not isinstance(result, dict):
  281. raise ValueError("响应格式错误:非字典结构")
  282. # 确保包含所有必需字段
  283. required_fields = ['asr_content', 'iframe_details']
  284. for field in required_fields:
  285. if field not in result:
  286. if field == 'iframe_details':
  287. result[field] = [{
  288. 'time_start': 0,
  289. 'time_end': 0,
  290. 'content': f'{field}分析失败',
  291. 'ocr_content': f'{field}分析失败'
  292. }]
  293. else:
  294. result[field] = f"{field}分析失败"
  295. return result
  296. except json.JSONDecodeError as e:
  297. print(f"JSON解析失败: {e}")
  298. return {
  299. 'asr_content': 'ASR分析失败:JSON解析错误',
  300. 'iframe_details': [{
  301. 'time_start': 0,
  302. 'time_end': 0,
  303. 'content': '关键帧分析失败:JSON解析错误',
  304. 'ocr_content': '关键帧分析失败:JSON解析错误'
  305. }]
  306. }
  307. else:
  308. return {
  309. 'asr_content': 'ASR分析失败:API无响应',
  310. 'iframe_details': [{
  311. 'time_start': 0,
  312. 'time_end': 0,
  313. 'content': '关键帧分析失败:API无响应',
  314. 'ocr_content': '关键帧分析失败:API无响应'
  315. }]
  316. }
  317. except Exception as e:
  318. return {
  319. 'asr_content': f'ASR分析失败: {str(e)}',
  320. 'iframe_details': [{
  321. 'time_start': 0,
  322. 'time_end': 0,
  323. 'content': f'关键帧分析失败: {str(e)}',
  324. 'ocr_content': f'关键帧分析失败: {str(e)}'
  325. }]
  326. }
  327. def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
  328. """提取视频URL列表"""
  329. video_data = []
  330. video_url_list = formatted_content.get('video_url_list', [])
  331. for video_item in video_url_list:
  332. if isinstance(video_item, dict) and 'video_url' in video_item:
  333. video_data.append({
  334. 'url': video_item['video_url'],
  335. 'duration': video_item.get('video_duration', 0)
  336. })
  337. return video_data
  338. def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
  339. """处理单个视频的完整流程"""
  340. print(f"开始处理视频: {video_info['url'][:50]}...")
  341. video_path = None
  342. video_file = None
  343. try:
  344. # 1. 下载视频
  345. print(" 1. 下载视频...")
  346. video_path = self.download_video(video_info['url'])
  347. if not video_path:
  348. print(" 视频下载失败")
  349. return {
  350. 'url': video_info['url'],
  351. 'duration': video_info['duration'],
  352. 'asr_content': '视频下载失败',
  353. 'iframe_details': [{
  354. 'time_start': 0,
  355. 'time_end': 0,
  356. 'content': '视频下载失败',
  357. 'ocr_content': '视频下载失败'
  358. }]
  359. }
  360. # 2. 上传到Gemini
  361. print(" 2. 上传视频到Gemini...")
  362. video_file = self.upload_video_to_gemini(video_path)
  363. if not video_file:
  364. print(" 视频上传到Gemini失败")
  365. return {
  366. 'url': video_info['url'],
  367. 'duration': video_info['duration'],
  368. 'asr_content': '视频上传失败',
  369. 'iframe_details': [{
  370. 'time_start': 0,
  371. 'time_end': 0,
  372. 'content': '视频上传失败',
  373. 'ocr_content': '视频上传失败'
  374. }]
  375. }
  376. # 3. 使用Gemini分析
  377. print(" 3. 使用Gemini分析视频内容...")
  378. analysis_result = self.analyze_video_with_gemini(video_file, video_info)
  379. # 4. 组合结果
  380. final_result = {
  381. 'url': video_info['url'],
  382. 'duration': video_info['duration'],
  383. 'asr_content': analysis_result.get('asr_content', 'ASR分析失败'),
  384. 'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
  385. }
  386. print(" 视频分析完成")
  387. return final_result
  388. except Exception as e:
  389. print(f" 视频处理异常: {e}")
  390. return {
  391. 'url': video_info['url'],
  392. 'duration': video_info['duration'],
  393. 'asr_content': f'处理异常: {str(e)}',
  394. 'iframe_details': [{
  395. 'time_start': 0,
  396. 'time_end': 0,
  397. 'content': f'处理异常: {str(e)}',
  398. 'ocr_content': f'处理异常: {str(e)}'
  399. }],
  400. 'analysis_timestamp': int(time.time() * 1000)
  401. }
  402. finally:
  403. # 清理临时文件
  404. if video_path and os.path.exists(video_path):
  405. try:
  406. os.remove(video_path)
  407. print(f" 临时文件已清理: {video_path}")
  408. except Exception as e:
  409. print(f" 清理临时文件失败: {e}")
  410. # 清理Gemini文件
  411. if video_file and hasattr(video_file, 'name'):
  412. try:
  413. genai.delete_file(name=video_file.name)
  414. print(f" Gemini文件已清理: {video_file.name}")
  415. except Exception as e:
  416. print(f" 清理Gemini文件失败: {e}")
  417. def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
  418. """处理视频识别的主函数"""
  419. print("开始视频识别处理...")
  420. # 提取视频URL
  421. video_data = self.extract_video_urls(formatted_content)
  422. print(f"提取到 {len(video_data)} 个视频")
  423. if not video_data:
  424. print("没有视频需要分析")
  425. return []
  426. # 逐个处理视频
  427. results = []
  428. for i, video_info in enumerate(video_data):
  429. print(f"\n处理视频 {i+1}/{len(video_data)}")
  430. result = self.process_video_single(video_info)
  431. results.append(result)
  432. # 添加延迟避免API限制
  433. if i < len(video_data) - 1: # 不是最后一个视频
  434. time.sleep(2)
  435. if results:
  436. print(f"\n视频识别完成,共分析 {len(results)} 个视频")
  437. print("分析维度:ASR、关键帧提取")
  438. else:
  439. print("视频识别失败")
  440. return results
  441. def main():
  442. """测试函数"""
  443. # 模拟数据
  444. test_content = {
  445. "video_url_list": [
  446. {
  447. "video_url": "https://vd9.bdstatic.com/mda-rf03dz9qrusbwrrb/mb/720p/mv_cae264_backtrack_720p_normal/1748751326307005666/mda-rf03dz9qrusbwrrb.mp4?v_from_s=hkapp-haokan-hbe&auth_key=1755078490-0-0-94814ae256d196c133940bc5fa7054ea&bcevod_channel=searchbox_feed&cr=2&cd=0&pd=1&pt=3&logid=2890204804&vid=12887026108358975692&klogid=2890204804&abtest=",
  448. "video_duration": 187
  449. }
  450. ]
  451. }
  452. identifier = VideoIdentifier()
  453. result = identifier.process_videos(test_content)
  454. print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
  455. if __name__ == '__main__':
  456. main()