video_identifier.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 视频识别脚本
  5. 主要功能:使用 Gemini API 从三个维度分析视频内容
  6. 1. ASR (Automatic Speech Recognition) - 语音转文字
  7. 2. OCR - 识别视频画面中的文字
  8. 3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
  9. """
  10. import os
  11. import json
  12. import time
  13. import sys
  14. import uuid
  15. import requests
  16. from typing import Dict, Any, List, Optional
  17. from dotenv import load_dotenv
  18. from utils.logging_config import get_logger
  19. # 创建 logger
  20. logger = get_logger('VideoIdentifier')
  21. # 导入自定义模块
  22. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  23. from llm.openrouter import OpenRouterProcessor, OpenRouterModel
  24. # 导入Google Generative AI
  25. import google.generativeai as genai
  26. from google.generativeai.types import HarmCategory, HarmBlockThreshold
  27. # 缓存目录配置
  28. CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
  29. # 缓存文件最大保留时间(秒)
  30. CACHE_MAX_AGE = 3600 # 1小时
  31. class VideoIdentifier:
  32. def __init__(self):
  33. # 加载环境变量
  34. load_dotenv()
  35. # 延迟配置Gemini,在真正使用时再设置
  36. self._configured = False
  37. # 初始化缓存清理时间
  38. self.last_cache_cleanup = time.time()
  39. # 统一的系统提示词 - 三个维度分析
  40. self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果:
  41. 1. ASR (Automatic Speech Recognition) - 语音转文字:
  42. - 仅提取视频中的语音内容,转换为文字
  43. - 保持原始语音的准确性和完整性
  44. - 不要添加分析、解释或评论
  45. 2. 关键帧提取与描述(包含OCR文字识别):
  46. - 将视频按照画面场景变化分解为多个关键时间片段
  47. - 对每个时间片段进行以下分析:
  48. * 画面的主要视觉元素和内容, 20个字以内
  49. * 画面中出现的所有文字内容(OCR识别),**注意忽略语音的字幕**
  50. - 每个时间片段应包含:
  51. * content: 画面内容的详细描述,15个字以内
  52. * ocr_content: 该时间段画面中出现的文字内容,仅做画面内文字提取,不要提取字幕文字,不要做任何解释或总结
  53. 请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
  54. {
  55. "asr_content": "提取的语音文字内容",
  56. "iframe_details": [
  57. {
  58. "time_start": "开始时间(秒)",
  59. "time_end": "结束时间(秒)",
  60. "content": "该时间段画面内容的详细描述",
  61. "ocr_content": "该时间段画面中出现的文字内容"
  62. }
  63. ]
  64. }"""
  65. def _ensure_configured(self):
  66. """确保Gemini已配置"""
  67. if not self._configured:
  68. self.api_key = os.getenv('GEMINI_API_KEY')
  69. print(f"配置Gemini: {self.api_key}")
  70. genai.configure(api_key=self.api_key)
  71. self._configured = True
  72. def download_video(self, video_url: str) -> Optional[str]:
  73. """下载视频到本地缓存"""
  74. file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
  75. try:
  76. # 确保缓存目录存在
  77. try:
  78. os.makedirs(CACHE_DIR, exist_ok=True)
  79. except Exception as e:
  80. print(f'创建缓存目录失败: {e}')
  81. return None
  82. # 尝试下载视频
  83. for attempt in range(3):
  84. try:
  85. response = requests.get(url=video_url, timeout=60)
  86. if response.status_code == 200:
  87. try:
  88. with open(file_path, 'wb') as f:
  89. f.write(response.content)
  90. # print(f'视频下载成功: {video_url} -> {file_path}')
  91. return file_path
  92. except Exception as e:
  93. print(f'视频保存失败: {e}')
  94. # 保存失败时清理已创建的文件
  95. if os.path.exists(file_path):
  96. try:
  97. os.remove(file_path)
  98. print(f'已清理下载失败的文件: {file_path}')
  99. except:
  100. pass
  101. return None
  102. else:
  103. print(f'视频下载失败,状态码: {response.status_code}')
  104. if attempt == 2: # 最后一次尝试失败
  105. print(f'所有下载尝试都失败了')
  106. return None
  107. except Exception as e:
  108. print(f'下载尝试 {attempt + 1} 失败: {e}')
  109. if attempt < 2: # 不是最后一次尝试
  110. time.sleep(1)
  111. continue
  112. else:
  113. print(f'所有下载尝试都失败了')
  114. return None
  115. except Exception as e:
  116. print(f'下载过程异常: {e}')
  117. return None
  118. return None
  119. def cleanup_cache(self):
  120. """清理过期的缓存文件"""
  121. try:
  122. current_time = time.time()
  123. # 每小时清理一次缓存
  124. if current_time - self.last_cache_cleanup < 3600:
  125. return
  126. if not os.path.exists(CACHE_DIR):
  127. return
  128. cleaned_count = 0
  129. for filename in os.listdir(CACHE_DIR):
  130. file_path = os.path.join(CACHE_DIR, filename)
  131. if os.path.isfile(file_path):
  132. file_age = current_time - os.path.getmtime(file_path)
  133. if file_age > CACHE_MAX_AGE:
  134. try:
  135. os.remove(file_path)
  136. cleaned_count += 1
  137. except Exception as e:
  138. print(f'清理缓存文件失败: {file_path}, 错误: {e}')
  139. if cleaned_count > 0:
  140. print(f'已清理 {cleaned_count} 个过期缓存文件')
  141. self.last_cache_cleanup = current_time
  142. except Exception as e:
  143. print(f'清理缓存失败: {e}')
  144. def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
  145. """上传视频到Gemini进行分析"""
  146. max_retries = 3
  147. retry_delay = 5
  148. for attempt in range(max_retries):
  149. try:
  150. # print(f" 开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
  151. # print(f" 文件路径: {video_path}")
  152. # 1. 文件检查
  153. if not os.path.exists(video_path):
  154. print(f" 错误: 文件不存在")
  155. return None
  156. file_size = os.path.getsize(video_path)
  157. # print(f" 文件大小: {file_size / (1024*1024):.2f} MB")
  158. if file_size == 0:
  159. print(f" 错误: 文件大小为0")
  160. return None
  161. # 2. 文件权限检查
  162. try:
  163. with open(video_path, 'rb') as f:
  164. # 尝试读取文件开头,检查是否可读
  165. f.read(1024)
  166. # print(f" 文件权限: 可读")
  167. except Exception as e:
  168. print(f" 错误: 文件无法读取 - {e}")
  169. return None
  170. # 4. 尝试上传文件
  171. # print(f" 开始上传文件...")
  172. try:
  173. video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
  174. # print(f" 文件上传请求已发送,文件ID: {video_file.name}")
  175. except Exception as e:
  176. print(f" 错误: 文件上传请求失败 - {e}")
  177. print(f" 错误类型: {type(e).__name__}")
  178. print(f" 错误详情: {str(e)}")
  179. # 如果是网络相关错误,尝试重试
  180. if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
  181. if attempt < max_retries - 1:
  182. print(f" 网络错误,等待 {retry_delay} 秒后重试...")
  183. time.sleep(retry_delay)
  184. retry_delay *= 2 # 指数退避
  185. continue
  186. else:
  187. print(f" 所有重试都失败了")
  188. return None
  189. else:
  190. # 非网络错误,直接返回
  191. return None
  192. # 5. 等待文件处理完成
  193. print(f" 等待文件处理完成...")
  194. max_wait_time = 120 # 最大等待2分钟
  195. wait_count = 0
  196. while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
  197. time.sleep(2) # 每2秒检查一次
  198. wait_count += 2
  199. try:
  200. # 获取最新状态
  201. video_file = genai.get_file(name=video_file.name)
  202. current_state = video_file.state.name
  203. # print(f" 状态: {current_state} ({wait_count}秒)")
  204. # 检查是否有错误状态
  205. if current_state in ['FAILED', 'ERROR', 'INVALID']:
  206. print(f" 错误: 文件处理失败,状态: {current_state}")
  207. if hasattr(video_file, 'error'):
  208. print(f" 错误详情: {video_file.error}")
  209. # 如果是处理失败,尝试重试
  210. if attempt < max_retries - 1:
  211. print(f" 文件处理失败,等待 {retry_delay} 秒后重试...")
  212. time.sleep(retry_delay)
  213. retry_delay *= 2
  214. break # 跳出等待循环,进行重试
  215. else:
  216. return None
  217. except Exception as e:
  218. print(f" 警告: 获取文件状态失败 - {e}")
  219. if wait_count > 60: # 超过1分钟后,尝试继续
  220. print(f" 继续等待...")
  221. continue
  222. else:
  223. print(f" 错误: 无法获取文件状态")
  224. return None
  225. # 6. 检查最终状态
  226. if video_file.state.name == 'ACTIVE':
  227. print(f' 视频上传成功: {video_file.name}')
  228. # print(f" 最终状态: {video_file.state.name}")
  229. return video_file
  230. else:
  231. print(f' 错误: 视频文件上传失败')
  232. # print(f" 最终状态: {video_file.state.name}")
  233. # print(f" 等待时间: {wait_count}秒")
  234. # 尝试获取更多错误信息
  235. try:
  236. file_info = genai.get_file(name=video_file.name)
  237. # print(f" 文件信息: {file_info}")
  238. except Exception as e:
  239. print(f" 无法获取文件详细信息: {e}")
  240. # 如果不是最后一次尝试,进行重试
  241. if attempt < max_retries - 1:
  242. print(f" 上传失败,等待 {retry_delay} 秒后重试...")
  243. time.sleep(retry_delay)
  244. retry_delay *= 2
  245. continue
  246. else:
  247. return None
  248. except Exception as e:
  249. error_type = type(e).__name__
  250. error_msg = str(e)
  251. print(f' 错误: 视频上传到Gemini失败')
  252. print(f" 错误类型: {error_type}")
  253. print(f" 错误信息: {error_msg}")
  254. # 针对特定错误的处理建议
  255. if "Broken pipe" in error_msg:
  256. print(f" 诊断: Broken pipe 错误通常表示:")
  257. print(f" - 网络连接不稳定")
  258. print(f" - 服务器连接中断")
  259. print(f" - 防火墙或代理问题")
  260. print(f" 建议:")
  261. print(f" - 检查网络连接")
  262. print(f" - 尝试使用VPN或更换网络")
  263. print(f" - 检查防火墙设置")
  264. elif "Connection" in error_msg:
  265. print(f" 诊断: 连接错误")
  266. print(f" 建议: 检查网络连接和API密钥")
  267. elif "Timeout" in error_msg:
  268. print(f" 诊断: 超时错误")
  269. print(f" 建议: 网络较慢,可以增加超时时间")
  270. elif "Permission" in error_msg:
  271. print(f" 诊断: 权限错误")
  272. print(f" 建议: 检查API密钥和权限设置")
  273. # 如果是网络相关错误,尝试重试
  274. if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
  275. if attempt < max_retries - 1:
  276. print(f" 网络错误,等待 {retry_delay} 秒后重试...")
  277. time.sleep(retry_delay)
  278. retry_delay *= 2
  279. continue
  280. else:
  281. print(f" 所有重试都失败了")
  282. return None
  283. else:
  284. # 非网络错误,直接返回
  285. print(f" 非网络错误,不进行重试")
  286. return None
  287. return None
  288. def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
  289. """提取视频URL列表"""
  290. video_data = []
  291. video_url_list = formatted_content.get('video_url_list', [])
  292. for video_item in video_url_list:
  293. if isinstance(video_item, dict) and 'video_url' in video_item:
  294. video_data.append({
  295. 'url': video_item['video_url'],
  296. 'duration': video_item.get('video_duration', 0)
  297. })
  298. return video_data
  299. def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
  300. """处理视频识别的主函数"""
  301. # 提取视频URL
  302. video_data = self.extract_video_urls(formatted_content)
  303. if not video_data:
  304. return []
  305. # 使用 OpenRouter 批量处理,避免逐个上传/分析
  306. return self.analyze_videos_with_openrouter(video_data)
  307. def analyze_videos_with_openrouter(self, video_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  308. """使用 Gemini 并发(最多5条)完成上传+分析的完整流程"""
  309. try:
  310. # 系统提示:严格限制为"仅提取内容,不做分析" [[memory:7272937]]
  311. system_prompt = self.unified_system_prompt
  312. # 保持输入顺序的结果数组
  313. results: List[Dict[str, Any]] = [{} for _ in range(len(video_data))]
  314. url_to_index = {item['url']: idx for idx, item in enumerate(video_data)}
  315. def complete_video_job(item: Dict[str, Any]) -> Dict[str, Any]:
  316. """完整的视频处理流程:下载->上传->分析->清理"""
  317. print(f"开始处理视频: {item}")
  318. url = item.get('url', '')
  319. duration = item.get('duration', 0)
  320. video_file = None
  321. # 确保Gemini已配置
  322. self._ensure_configured()
  323. logger.info(f"配置Gemini: {self.api_key}")
  324. try:
  325. # 1. 下载视频
  326. video_path = self.download_video(url)
  327. if not video_path:
  328. return {
  329. 'url': url, 'duration': duration, 'asr_content': '视频下载失败', 'iframe_details': []
  330. }
  331. # 2. 上传到 Gemini
  332. video_file = self.upload_video_to_gemini(video_path)
  333. # 清理本地缓存文件
  334. try:
  335. if video_path and os.path.exists(video_path):
  336. os.remove(video_path)
  337. except Exception:
  338. pass
  339. if not video_file:
  340. return {
  341. 'url': url, 'duration': duration, 'asr_content': '视频上传失败', 'iframe_details': []
  342. }
  343. # 3. 使用 Gemini 直接分析视频文件
  344. model = genai.GenerativeModel(
  345. model_name='gemini-2.5-flash',
  346. generation_config=genai.GenerationConfig(
  347. response_mime_type='application/json',
  348. temperature=0.3,
  349. max_output_tokens=40960
  350. ),
  351. safety_settings={
  352. HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
  353. HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
  354. }
  355. )
  356. response = model.generate_content(
  357. contents=[video_file, system_prompt],
  358. request_options={'timeout': 240}
  359. )
  360. if hasattr(response, '_error') and response._error:
  361. raise Exception(f"生成错误: {response._error}")
  362. # 安全获取文本:避免在无 Part 时访问 response.text 抛错
  363. def safe_extract_text(resp: Any) -> str:
  364. try:
  365. # 优先从 candidates 结构中提取
  366. candidates = getattr(resp, 'candidates', None)
  367. if candidates and len(candidates) > 0:
  368. first = candidates[0]
  369. # 记录 finish_reason 以便错误信息更清晰
  370. finish_reason = getattr(first, 'finish_reason', None)
  371. # parts 路径
  372. content = getattr(first, 'content', None)
  373. parts = getattr(content, 'parts', None) if content else None
  374. if parts and len(parts) > 0:
  375. # 兼容 text 或直接包含的内容
  376. # 常见 part 为 {text: str}
  377. part0 = parts[0]
  378. text = getattr(part0, 'text', None) if hasattr(part0, 'text') else part0.get('text') if isinstance(part0, dict) else None
  379. if isinstance(text, str) and text.strip():
  380. return text
  381. # 若无 parts 或为空,根据 finish_reason 返回清晰错误
  382. reason = str(finish_reason) if finish_reason is not None else 'unknown'
  383. raise ValueError(f"无有效输出内容,finish_reason={reason}")
  384. # 退化到 resp.text(可能抛错)
  385. if hasattr(resp, 'text') and isinstance(resp.text, str) and resp.text.strip():
  386. return resp.text
  387. raise ValueError("响应中没有可用的文本内容")
  388. except Exception as ex:
  389. raise ex
  390. try:
  391. text_payload = safe_extract_text(response).strip()
  392. parsed = json.loads(text_payload)
  393. if not isinstance(parsed, dict):
  394. raise ValueError("响应格式错误:非字典结构")
  395. # 确保包含所有必需字段
  396. required_fields = ['asr_content', 'iframe_details']
  397. for field in required_fields:
  398. if field not in parsed:
  399. if field == 'iframe_details':
  400. parsed[field] = [{
  401. 'time_start': 0,
  402. 'time_end': 0,
  403. 'content': f'{field}分析失败',
  404. 'ocr_content': f'{field}分析失败'
  405. }]
  406. else:
  407. parsed[field] = f"{field}分析失败"
  408. asr = parsed.get('asr_content', '')
  409. frames = parsed.get('iframe_details', [])
  410. if not isinstance(frames, list):
  411. frames = []
  412. return {'url': url, 'duration': duration, 'asr_content': asr, 'iframe_details': frames}
  413. except json.JSONDecodeError as e:
  414. print(f"JSON解析失败: {e}")
  415. return {
  416. 'url': url, 'duration': duration,
  417. 'asr_content': 'ASR分析失败:JSON解析错误',
  418. 'iframe_details': [{
  419. 'time_start': 0, 'time_end': 0,
  420. 'content': '关键帧分析失败:JSON解析错误',
  421. 'ocr_content': '关键帧分析失败:JSON解析错误'
  422. }]
  423. }
  424. except Exception as e:
  425. # 捕获无 Part 或 finish_reason 封锁等导致的无法提取文本问题
  426. err_msg = str(e)
  427. return {
  428. 'url': url, 'duration': duration,
  429. 'asr_content': f'处理失败: {err_msg}',
  430. 'iframe_details': [{
  431. 'time_start': 0, 'time_end': 0,
  432. 'content': f'处理失败: {err_msg}',
  433. 'ocr_content': f'处理失败: {err_msg}'
  434. }]
  435. }
  436. except Exception as e:
  437. return {
  438. 'url': url, 'duration': duration,
  439. 'asr_content': f'处理失败: {str(e)}',
  440. 'iframe_details': [{
  441. 'time_start': 0, 'time_end': 0,
  442. 'content': f'处理失败: {str(e)}',
  443. 'ocr_content': f'处理失败: {str(e)}'
  444. }]
  445. }
  446. finally:
  447. # 4. 清理 Gemini 文件
  448. if video_file and hasattr(video_file, 'name'):
  449. try:
  450. genai.delete_file(name=video_file.name)
  451. except Exception:
  452. pass
  453. # 单独遍历处理所有视频
  454. for i, item in enumerate(video_data):
  455. result = complete_video_job(item)
  456. results[i] = result
  457. return results
  458. except Exception as e:
  459. print(f"OpenRouter 批量视频分析失败: {e}")
  460. return [{
  461. 'url': item.get('url', ''),
  462. 'duration': item.get('duration', 0),
  463. 'asr_content': f'处理失败: {str(e)}',
  464. 'iframe_details': []
  465. } for item in video_data]
  466. def main():
  467. """测试函数"""
  468. # 模拟数据
  469. test_content = {
  470. "video_url_list": [
  471. {
  472. "video_url": "http://rescdn.yishihui.com/pipeline/video/489e7c31-4e7c-44cc-872d-b1b1dd42b12d.mp4",
  473. "video_duration": 187
  474. },
  475. {
  476. "video_url": "http://temp.yishihui.com/pipeline/video/43d11b20-6273-4ece-a146-94f63a3992a8.mp4",
  477. "video_duration": 100
  478. },
  479. # {
  480. # "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250731/57463792ND5eu5PAj95sVLi2gB.mp4",
  481. # "video_duration": 100
  482. # },
  483. # {
  484. # "video_url": "http://temp.yishihui.com/longvideo/transcode/crawler_local/video/prod/20250912/2c278614bd39fc2668f210d752141cb678956536.mp4",
  485. # "video_duration": 100
  486. # },
  487. # {
  488. # "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250809/5870d4dc9ba18ce57e5af27b81ff1398.mp4",
  489. # "video_duration": 100
  490. # },
  491. # {
  492. # "video_url": "http://temp.yishihui.com/pipeline/video/202769cd-68a5-41a2-82d9-620d2c72a225.mp4",
  493. # "video_duration": 100
  494. # }
  495. ]
  496. }
  497. identifier = VideoIdentifier()
  498. result = identifier.process_videos(test_content)
  499. print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
  500. if __name__ == '__main__':
  501. main()