Преглед на файлове

视频理解脚本完善

jihuaqiang преди 1 месец
родител
ревизия
e5aaed820e
променени са 2 файла, в които са добавени 451 реда и са изтрити 87 реда
  1. 1 1
      content_indentify/indentify.py
  2. 450 86
      content_indentify/video_identifier.py

+ 1 - 1
content_indentify/indentify.py

@@ -181,7 +181,7 @@ class ContentIdentifier:
                 'title': title,
                 'content': content,
                 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
-                'videos': recognition_result.get('video_analysis', {}).get('videos_comprehension', []),
+                'videos': recognition_result.get('video_analysis', {}),
                 'meta': {
                     'author': author,
                     'like_count': like_count,

+ 450 - 86
content_indentify/video_identifier.py

@@ -2,20 +2,30 @@
 # -*- coding: utf-8 -*-
 """
 视频识别脚本
-主要功能:使用 Gemini API 分析视频内容
+主要功能:使用 Gemini API 从三个维度分析视频内容
+1. ASR (Automatic Speech Recognition) - 语音转文字
+2. OCR - 识别视频画面中的文字
+3. 关键帧提取与描述 - 提取视频关键帧并进行图像描述
 """
 
 import os
 import json
 import time
 import sys
+import uuid
+import requests
 from typing import Dict, Any, List, Optional
 from dotenv import load_dotenv
 
 # 导入自定义模块
 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
-from gemini import GeminiProcessor
+# 导入Google Generative AI
+import google.generativeai as genai
+from google.generativeai.types import HarmCategory, HarmBlockThreshold
+
+# 缓存目录配置
+CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
 
 
 class VideoIdentifier:
@@ -23,18 +33,347 @@ class VideoIdentifier:
         # 加载环境变量
         load_dotenv()
         
-        # 初始化Gemini客户端
-        self.gemini = GeminiProcessor()
+        # 获取API密钥
+        self.api_key = os.getenv('GEMINI_API_KEY')
+        if not self.api_key:
+            raise ValueError("未找到GEMINI_API_KEY环境变量")
+        
+        # 配置Gemini
+        genai.configure(api_key=self.api_key)
         
-        # 系统提示词
-        self.video_system_prompt = """你是一个专业的视频内容分析专家。请分析视频中的内容,包括:
-1. 视频的主要内容和主题
-2. 视频中的文字内容(如果有)
-3. 视频的风格和特点
-4. 视频可能表达的情感或意图
-5. 视频的背景音乐或语音内容(如果有)
+        # 统一的系统提示词 - 三个维度分析
+        self.unified_system_prompt = """你是一个专业的视频内容分析专家。请从以下两个维度分析视频内容,并以JSON格式输出结果
+
+1. ASR (Automatic Speech Recognition) - 语音转文字:
+   - 仅提取视频中的语音内容,转换为文字
+   - 保持原始语音的准确性和完整性
+   - 不要添加分析、解释或评论
 
-请用简洁、准确的语言描述视频内容,重点关注文字内容和主要视觉元素。"""
+2. 关键帧提取与描述(包含OCR文字识别):
+   - 将视频分解为多个关键时间片段
+   - 对每个时间片段进行以下分析:
+     * 画面的主要视觉元素和内容
+     * 画面的构图和色彩特点
+     * 画面中的人物、物体、场景
+     * 画面中出现的所有文字内容(OCR识别)
+   - 每个时间片段应包含:
+     * content: 画面内容的详细描述
+     * ocr_content: 该时间段画面中出现的文字内容,仅做文字提取,不要做任何解释或总结
+
+请严格按照以下JSON格式输出,使用中文输出,不要添加任何其他文字:
+{
+    "asr_content": "提取的语音文字内容",
+    "iframe_details": [
+        {
+            "time_start": "开始时间(秒)",
+            "time_end": "结束时间(秒)",
+            "content": "该时间段画面内容的详细描述",
+            "ocr_content": "该时间段画面中出现的文字内容"
+        }
+    ]
+}"""
+    
+    def download_video(self, video_url: str) -> Optional[str]:
+        """下载视频到本地缓存"""
+        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+        try:
+            # 确保缓存目录存在
+            try:
+                os.makedirs(CACHE_DIR, exist_ok=True)
+            except Exception as e:
+                print(f'创建缓存目录失败: {e}')
+                return None
+            
+            # 尝试下载视频
+            for attempt in range(3):
+                try:
+                    response = requests.get(url=video_url, timeout=60)
+                    if response.status_code == 200:
+                        try:
+                            with open(file_path, 'wb') as f:
+                                f.write(response.content)
+                            print(f'视频下载成功: {video_url} -> {file_path}')
+                            return file_path
+                        except Exception as e:
+                            print(f'视频保存失败: {e}')
+                            return None
+                    else:
+                        print(f'视频下载失败,状态码: {response.status_code}')
+                except Exception as e:
+                    print(f'下载尝试 {attempt + 1} 失败: {e}')
+                    if attempt < 2:  # 不是最后一次尝试
+                        time.sleep(1)
+                        continue
+                    else:
+                        print(f'所有下载尝试都失败了')
+                        return None
+                        
+        except Exception as e:
+            print(f'下载过程异常: {e}')
+            return None
+        
+        return None
+    
+    def upload_video_to_gemini(self, video_path: str) -> Optional[Any]:
+        """上传视频到Gemini进行分析"""
+        max_retries = 3
+        retry_delay = 5
+        
+        for attempt in range(max_retries):
+            try:
+                print(f"  开始上传视频到Gemini... (尝试 {attempt + 1}/{max_retries})")
+                print(f"    文件路径: {video_path}")
+                
+                # 1. 文件检查
+                if not os.path.exists(video_path):
+                    print(f"    错误: 文件不存在")
+                    return None
+                
+                file_size = os.path.getsize(video_path)
+                print(f"    文件大小: {file_size / (1024*1024):.2f} MB")
+                
+                if file_size == 0:
+                    print(f"    错误: 文件大小为0")
+                    return None
+                
+                # 2. 文件权限检查
+                try:
+                    with open(video_path, 'rb') as f:
+                        # 尝试读取文件开头,检查是否可读
+                        f.read(1024)
+                    print(f"    文件权限: 可读")
+                except Exception as e:
+                    print(f"    错误: 文件无法读取 - {e}")
+                    return None
+                
+                # 3. 网络连接检查
+                try:
+                    print(f"    检查网络连接...")
+                    # 测试基本网络连接
+                    test_response = requests.get("https://generativelanguage.googleapis.com", timeout=10)
+                    print(f"    网络连接: 正常 (状态码: {test_response.status_code})")
+                except Exception as e:
+                    print(f"    警告: 网络连接测试失败 - {e}")
+                    print(f"    继续尝试上传...")
+                
+                # 4. 尝试上传文件
+                print(f"    开始上传文件...")
+                try:
+                    video_file = genai.upload_file(path=video_path, mime_type='video/mp4')
+                    print(f"    文件上传请求已发送,文件ID: {video_file.name}")
+                except Exception as e:
+                    print(f"    错误: 文件上传请求失败 - {e}")
+                    print(f"    错误类型: {type(e).__name__}")
+                    print(f"    错误详情: {str(e)}")
+                    
+                    # 如果是网络相关错误,尝试重试
+                    if any(keyword in str(e).lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
+                        if attempt < max_retries - 1:
+                            print(f"    网络错误,等待 {retry_delay} 秒后重试...")
+                            time.sleep(retry_delay)
+                            retry_delay *= 2  # 指数退避
+                            continue
+                        else:
+                            print(f"    所有重试都失败了")
+                            return None
+                    else:
+                        # 非网络错误,直接返回
+                        return None
+                
+                # 5. 等待文件处理完成
+                print(f"    等待文件处理完成...")
+                max_wait_time = 120  # 最大等待2分钟
+                wait_count = 0
+                
+                while video_file.state.name == 'PROCESSING' and wait_count < max_wait_time:
+                    time.sleep(2)  # 每2秒检查一次
+                    wait_count += 2
+                    
+                    try:
+                        # 获取最新状态
+                        video_file = genai.get_file(name=video_file.name)
+                        current_state = video_file.state.name
+                        print(f"      状态: {current_state} ({wait_count}秒)")
+                        
+                        # 检查是否有错误状态
+                        if current_state in ['FAILED', 'ERROR', 'INVALID']:
+                            print(f"    错误: 文件处理失败,状态: {current_state}")
+                            if hasattr(video_file, 'error'):
+                                print(f"    错误详情: {video_file.error}")
+                            
+                            # 如果是处理失败,尝试重试
+                            if attempt < max_retries - 1:
+                                print(f"    文件处理失败,等待 {retry_delay} 秒后重试...")
+                                time.sleep(retry_delay)
+                                retry_delay *= 2
+                                break  # 跳出等待循环,进行重试
+                            else:
+                                return None
+                                
+                    except Exception as e:
+                        print(f"      警告: 获取文件状态失败 - {e}")
+                        if wait_count > 60:  # 超过1分钟后,尝试继续
+                            print(f"      继续等待...")
+                            continue
+                        else:
+                            print(f"    错误: 无法获取文件状态")
+                            return None
+                
+                # 6. 检查最终状态
+                if video_file.state.name == 'ACTIVE':
+                    print(f'    视频上传成功: {video_file.name}')
+                    print(f"    最终状态: {video_file.state.name}")
+                    return video_file
+                else:
+                    print(f'    错误: 视频文件上传失败')
+                    print(f"    最终状态: {video_file.state.name}")
+                    print(f"    等待时间: {wait_count}秒")
+                    
+                    # 尝试获取更多错误信息
+                    try:
+                        file_info = genai.get_file(name=video_file.name)
+                        print(f"    文件信息: {file_info}")
+                    except Exception as e:
+                        print(f"    无法获取文件详细信息: {e}")
+                    
+                    # 如果不是最后一次尝试,进行重试
+                    if attempt < max_retries - 1:
+                        print(f"    上传失败,等待 {retry_delay} 秒后重试...")
+                        time.sleep(retry_delay)
+                        retry_delay *= 2
+                        continue
+                    else:
+                        return None
+                        
+            except Exception as e:
+                error_type = type(e).__name__
+                error_msg = str(e)
+                
+                print(f'    错误: 视频上传到Gemini失败')
+                print(f"    错误类型: {error_type}")
+                print(f"    错误信息: {error_msg}")
+                
+                # 针对特定错误的处理建议
+                if "Broken pipe" in error_msg:
+                    print(f"    诊断: Broken pipe 错误通常表示:")
+                    print(f"      - 网络连接不稳定")
+                    print(f"      - 服务器连接中断")
+                    print(f"      - 防火墙或代理问题")
+                    print(f"    建议:")
+                    print(f"      - 检查网络连接")
+                    print(f"      - 尝试使用VPN或更换网络")
+                    print(f"      - 检查防火墙设置")
+                elif "Connection" in error_msg:
+                    print(f"    诊断: 连接错误")
+                    print(f"    建议: 检查网络连接和API密钥")
+                elif "Timeout" in error_msg:
+                    print(f"    诊断: 超时错误")
+                    print(f"    建议: 网络较慢,可以增加超时时间")
+                elif "Permission" in error_msg:
+                    print(f"    诊断: 权限错误")
+                    print(f"    建议: 检查API密钥和权限设置")
+                
+                # 如果是网络相关错误,尝试重试
+                if any(keyword in error_msg.lower() for keyword in ['broken pipe', 'connection', 'timeout', 'network']):
+                    if attempt < max_retries - 1:
+                        print(f"    网络错误,等待 {retry_delay} 秒后重试...")
+                        time.sleep(retry_delay)
+                        retry_delay *= 2
+                        continue
+                    else:
+                        print(f"    所有重试都失败了")
+                        return None
+                else:
+                    # 非网络错误,直接返回
+                    return None
+        
+        return None
+    
+    def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]:
+        """使用Gemini API分析视频内容"""
+        
+        try:
+            # 创建Gemini模型
+            model = genai.GenerativeModel(
+                model_name='gemini-2.0-flash',
+                generation_config=genai.GenerationConfig(
+                    response_mime_type='application/json',
+                    temperature=0.3,
+                    max_output_tokens=20480
+                ),
+                safety_settings={
+                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+                }
+            )
+            
+            # 生成内容
+            response = model.generate_content(
+                contents=[video_file, self.unified_system_prompt],
+                request_options={'timeout': 300}
+            )
+            
+            # 检查错误
+            if hasattr(response, '_error') and response._error:
+                raise Exception(f"生成错误: {response._error}")
+            
+            # 解析JSON响应
+            try:
+                result = json.loads(response.text.strip())
+                print(f"[视频分析] 响应: {result}")
+                
+                if not isinstance(result, dict):
+                    raise ValueError("响应格式错误:非字典结构")
+                
+                # 确保包含所有必需字段
+                required_fields = ['asr_content', 'iframe_details']
+                for field in required_fields:
+                    if field not in result:
+                        if field == 'iframe_details':
+                            result[field] = [{
+                                'time_start': 0,
+                                'time_end': 0,
+                                'content': f'{field}分析失败',
+                                'ocr_content': f'{field}分析失败'
+                            }]
+                        else:
+                            result[field] = f"{field}分析失败"
+                
+                return result
+                
+            except json.JSONDecodeError as e:
+                print(f"JSON解析失败: {e}")
+                return {
+                    'asr_content': 'ASR分析失败:JSON解析错误',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '关键帧分析失败:JSON解析错误',
+                        'ocr_content': '关键帧分析失败:JSON解析错误'
+                    }]
+                }
+                
+            else:
+                return {
+                    'asr_content': 'ASR分析失败:API无响应',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '关键帧分析失败:API无响应',
+                        'ocr_content': '关键帧分析失败:API无响应'
+                    }]
+                }
+                
+        except Exception as e:
+            return {
+                'asr_content': f'ASR分析失败: {str(e)}',
+                'iframe_details': [{
+                    'time_start': 0,
+                    'time_end': 0,
+                    'content': f'关键帧分析失败: {str(e)}',
+                    'ocr_content': f'关键帧分析失败: {str(e)}'
+                }]
+            }
     
     def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
         """提取视频URL列表"""
@@ -50,74 +389,94 @@ class VideoIdentifier:
         
         return video_data
     
-    def analyze_videos_with_gemini(self, video_data: List[Dict[str, Any]]) -> Dict[str, Any]:
-        """使用Gemini API分析视频内容"""
+    def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
+        """处理单个视频的完整流程"""
+        print(f"开始处理视频: {video_info['url'][:50]}...")
+        
+        video_path = None
+        video_file = None
         try:
-            if not video_data:
-                return {"videos_comprehension": [], "error": "没有视频需要分析"}
+            # 1. 下载视频
+            print("  1. 下载视频...")
+            video_path = self.download_video(video_info['url'])
+            if not video_path:
+                print("  视频下载失败")
+                return {
+                    'url': video_info['url'],
+                    'duration': video_info['duration'],
+                    'asr_content': '视频下载失败',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '视频下载失败',
+                        'ocr_content': '视频下载失败'
+                    }]
+                }
             
-            print(f"正在使用Gemini API分析 {len(video_data)} 个视频...")
+            # 2. 上传到Gemini
+            print("  2. 上传视频到Gemini...")
+            video_file = self.upload_video_to_gemini(video_path)
+            if not video_file:
+                print("  视频上传到Gemini失败")
+                return {
+                    'url': video_info['url'],
+                    'duration': video_info['duration'],
+                    'asr_content': '视频上传失败',
+                    'iframe_details': [{
+                        'time_start': 0,
+                        'time_end': 0,
+                        'content': '视频上传失败',
+                        'ocr_content': '视频上传失败'
+                    }]
+                }
             
-            videos_comprehension = []
-            for i, video in enumerate(video_data):
-                print(f"  分析视频 {i+1}/{len(video_data)}: {video['url'][:50]}...")
-                
-                # 构建分析提示
-                prompt = f"""请分析以下视频内容:
-视频时长: {video['duration']}秒
-视频链接: {video['url']}
-
-请从以下角度分析视频内容:
-1. 视频的主要内容和主题
-2. 视频中的文字内容(如果有)
-3. 视频的风格和特点
-4. 视频可能表达的情感或意图
-5. 视频的背景音乐或语音内容(如果有)
-
-请用简洁、准确的语言描述视频内容。"""
-                
-                # 调用Gemini API
-                try:
-                    response = self.gemini.process(
-                        content=prompt,
-                        system_prompt=self.video_system_prompt,
-                        model_name="gemini-2.5-flash"
-                    )
-                    
-                    if response:
-                        videos_comprehension.append({
-                            'video_url': video['url'],
-                            'duration': video['duration'],
-                            'comprehension': response,
-                            'analysis_timestamp': int(time.time() * 1000)
-                        })
-                    else:
-                        videos_comprehension.append({
-                            'video_url': video['url'],
-                            'duration': video['duration'],
-                            'comprehension': 'Gemini API分析失败',
-                            'analysis_timestamp': int(time.time() * 1000)
-                        })
-                    
-                    # 添加延迟避免API限制
-                    time.sleep(1)
-                    
-                except Exception as e:
-                    print(f"  视频 {i+1} 分析失败: {e}")
-                    videos_comprehension.append({
-                        'video_url': video['url'],
-                        'duration': video['duration'],
-                        'comprehension': f'分析失败: {str(e)}',
-                        'analysis_timestamp': int(time.time() * 1000)
-                    })
+            # 3. 使用Gemini分析
+            print("  3. 使用Gemini分析视频内容...")
+            analysis_result = self.analyze_video_with_gemini(video_file, video_info)
             
-            return {"videos_comprehension": videos_comprehension}
+            # 4. 组合结果
+            final_result = {
+                'url': video_info['url'],
+                'duration': video_info['duration'],
+                'asr_content': analysis_result.get('asr_content', 'ASR分析失败'),
+                'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
+            }
+            
+            print("  视频分析完成")
+            return final_result
             
         except Exception as e:
-            print(f"Gemini API调用失败: {e}")
-            return {"videos_comprehension": [], "error": f"Gemini API调用失败: {str(e)}"}
+            print(f"  视频处理异常: {e}")
+            return {
+                'url': video_info['url'],
+                'duration': video_info['duration'],
+                'asr_content': f'处理异常: {str(e)}',
+                'iframe_details': [{
+                    'time_start': 0,
+                    'time_end': 0,
+                    'content': f'处理异常: {str(e)}',
+                    'ocr_content': f'处理异常: {str(e)}'
+                }],
+                'analysis_timestamp': int(time.time() * 1000)
+            }
+        finally:
+            # 清理临时文件
+            if video_path and os.path.exists(video_path):
+                try:
+                    os.remove(video_path)
+                    print(f"  临时文件已清理: {video_path}")
+                except Exception as e:
+                    print(f"  清理临时文件失败: {e}")
+            
+            # 清理Gemini文件
+            if video_file and hasattr(video_file, 'name'):
+                try:
+                    genai.delete_file(name=video_file.name)
+                    print(f"  Gemini文件已清理: {video_file.name}")
+                except Exception as e:
+                    print(f"  清理Gemini文件失败: {e}")
     
-    def process_videos(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
+    def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
         """处理视频识别的主函数"""
         print("开始视频识别处理...")
         
@@ -127,17 +486,26 @@ class VideoIdentifier:
         
         if not video_data:
             print("没有视频需要分析")
-            return {"videos_comprehension": [], "error": "没有视频需要分析"}
+            return []
         
-        # 分析视频
-        result = self.analyze_videos_with_gemini(video_data)
+        # 逐个处理视频
+        results = []
+        for i, video_info in enumerate(video_data):
+            print(f"\n处理视频 {i+1}/{len(video_data)}")
+            result = self.process_video_single(video_info)
+            results.append(result)
+            
+            # 添加延迟避免API限制
+            if i < len(video_data) - 1:  # 不是最后一个视频
+                time.sleep(2)
         
-        if result.get("videos_comprehension"):
-            print(f"视频识别完成,共分析 {len(result['videos_comprehension'])} 个视频")
+        if results:
+            print(f"\n视频识别完成,共分析 {len(results)} 个视频")
+            print("分析维度:ASR、关键帧提取")
         else:
             print("视频识别失败")
         
-        return result
+        return results
 
 
 def main():
@@ -146,18 +514,14 @@ def main():
     test_content = {
         "video_url_list": [
             {
-                "video_url": "http://example.com/video1.mp4",
-                "video_duration": 30
+                "video_url": "https://vd9.bdstatic.com/mda-rf03dz9qrusbwrrb/mb/720p/mv_cae264_backtrack_720p_normal/1748751326307005666/mda-rf03dz9qrusbwrrb.mp4?v_from_s=hkapp-haokan-hbe&auth_key=1755078490-0-0-94814ae256d196c133940bc5fa7054ea&bcevod_channel=searchbox_feed&cr=2&cd=0&pd=1&pt=3&logid=2890204804&vid=12887026108358975692&klogid=2890204804&abtest=",
+                "video_duration": 187
             }
         ]
     }
     
     identifier = VideoIdentifier()
-    result = identifier.process_videos(
-        test_content["title"],
-        test_content["body_text"],
-        test_content
-    )
+    result = identifier.process_videos(test_content)
     
     print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")