Kaynağa Gözat

多进程处理图片和视频理解

jihuaqiang 2 hafta önce
ebeveyn
işleme
d86d768678
2 değiştirilmiş dosya ile 238 ekleme ve 275 silme
  1. 72 65
      tools/indentify/image_identifier.py
  2. 166 210
      tools/indentify/video_identifier.py

+ 72 - 65
tools/indentify/image_identifier.py

@@ -15,11 +15,28 @@ import google.generativeai as genai
 from PIL import Image
 import requests
 from io import BytesIO
+from concurrent.futures import ThreadPoolExecutor, as_completed
 
 # 导入自定义模块
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+from llm.openrouter import OpenRouterProcessor, OpenRouterModel
 
+ # 构建OCR提示词
+prompt = """
+#### 人设
+你是一名图像文字理解专家,请对输入的文章图片进行精准的文字提取和结构化整理。
 
+#### 任务要求如下:
+1. 仅提取图片中可见的文字内容,不需要改写、总结或推理隐藏信息。
+2. 如果图片包含结构(如表格、图表、标题、段落等),请按结构输出。
+3. 所有提取的内容需保持原始顺序和排版上下文的逻辑。
+4. 不需要进行OCR校正,只需要原样提取图中文字。
+5. 舍弃图片中和标题不相关的文字
+6. 对于结构不明确或自由排列的文字,按照从上到下、从左到右的顺序依次提取。
+
+#### 输出格式
+1. 仅输出提取的文字即可,不需要其他说明性的文字
+"""
 class ImageIdentifier:
     def __init__(self):
         # 加载环境变量
@@ -55,74 +72,60 @@ class ImageIdentifier:
         
         return image_urls
     
-    def analyze_image_with_gemini(self, image: Image.Image) -> Dict[str, Any]:
-        """使用Gemini API分析单张图片内容"""
-        try:
-            # 构建OCR提示词
-            prompt = """
-            #### 人设
-            你是一名图像文字理解专家,请对输入的文章图片进行精准的文字提取和结构化整理。
-
-            #### 任务要求如下:
-            1. 仅提取图片中可见的文字内容,不需要改写、总结或推理隐藏信息。
-            2. 如果图片包含结构(如表格、图表、标题、段落等),请按结构输出。
-            3. 所有提取的内容需保持原始顺序和排版上下文的逻辑。
-            4. 不需要进行OCR校正,只需要原样提取图中文字。
-            5. 舍弃图片中和标题不相关的文字
-            6. 对于结构不明确或自由排列的文字,按照从上到下、从左到右的顺序依次提取。
-            """
-            
-            response = self.model.generate_content([prompt, image])
-            
-            return {
-                "content": response.text,
-                "success": True
-            }
-            
-        except Exception as e:
-            print(f"Gemini API调用失败: {e}")
-            return {
-                "text_content": "",
-                "success": False,
-                "error": str(e)
-            }
-    
     def analyze_images_with_gemini(self, image_urls: List[str]) -> Dict[str, Any]:
-        """使用Gemini API分析多张图片内容"""
+        """使用 Gemini 并发(最多5条)提取图片文字(仅内容提取)"""
         try:
             if not image_urls:
                 return {"images_comprehension": [], "error": "没有图片需要分析"}
-            
-            results = []
-            
-            for i, image_url in enumerate(image_urls):
-                
-                # 下载图片
-                image = self.download_image(image_url)
-                if image is None:
-                    results.append({
-                        "url": image_url,
-                        "content": "",
-                        "success": False,
-                        "error": "图片下载失败"
-                    })
-                    continue
-                
-                # 分析图片
-                result = self.analyze_image_with_gemini(image)
-                result["url"] = image_url
-                results.append(result)
-                
-                # 添加延迟避免API限制
-                time.sleep(1)
-            
-            return {
-                "images_comprehension": results
-            }
-                
+
+            # 系统提示:严格限制为"仅提取文字,不做分析" [[memory:7272937]]
+            system_prompt = prompt
+
+            # 保持输入顺序
+            results: List[Dict[str, Any]] = [{} for _ in range(len(image_urls))]
+
+            def analyze_image_job(idx_and_url) -> Dict[str, Any]:
+                idx, url = idx_and_url
+                try:
+                    # 下载图片
+                    image = self.download_image(url)
+                    if image is None:
+                        return {"idx": idx, "url": url, "content": "", "success": False, "error": "图片下载失败"}
+
+                    # 使用 Gemini 直接分析图片
+                    response = self.model.generate_content([system_prompt, image])
+                    
+                    if response.text:
+                        return {"idx": idx, "url": url, "content": response.text, "success": True}
+                    else:
+                        return {"idx": idx, "url": url, "content": "", "success": False, "error": "识别失败或无内容返回"}
+                        
+                except Exception as e:
+                    return {"idx": idx, "url": url, "content": "", "success": False, "error": str(e)}
+
+            # 并发最多5条
+            with ThreadPoolExecutor(max_workers=5) as executor:
+                future_to_index = {}
+                for idx, url in enumerate(image_urls):
+                    future = executor.submit(analyze_image_job, (idx, url))
+                    future_to_index[future] = idx
+
+                for future in as_completed(list(future_to_index.keys())):
+                    result = future.result()
+                    idx = result["idx"]
+                    results[idx] = {
+                        "url": result["url"],
+                        "content": result["content"],
+                        "success": result["success"]
+                    }
+                    if not result["success"]:
+                        results[idx]["error"] = result["error"]
+
+            return {"images_comprehension": results}
+
         except Exception as e:
-            print(f"Gemini API批量调用失败: {e}")
-            return {"images_comprehension": [], "error": f"Gemini API调用失败: {str(e)}"}
+            print(f"Gemini 并发调用失败: {e}")
+            return {"images_comprehension": [], "error": f"Gemini API 调用失败: {str(e)}"}
     
     def process_images(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
         """处理图片识别的主函数"""
@@ -150,6 +153,10 @@ def main():
     # 模拟数据
     test_content = {
         "image_url_list": [
+            {
+                "image_type": 2,
+                "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
+            },
             {
                 "image_type": 2,
                 "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
@@ -161,7 +168,7 @@ def main():
         identifier = ImageIdentifier()
         result = identifier.process_images(test_content)
         
-        # print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+        print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
     except Exception as e:
         print(f"初始化失败: {e}")
 

+ 166 - 210
tools/indentify/video_identifier.py

@@ -16,9 +16,11 @@ import uuid
 import requests
 from typing import Dict, Any, List, Optional
 from dotenv import load_dotenv
+from concurrent.futures import ThreadPoolExecutor, as_completed
 
 # 导入自定义模块
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+from llm.openrouter import OpenRouterProcessor, OpenRouterModel
 
 # 导入Google Generative AI
 import google.generativeai as genai
@@ -324,94 +326,6 @@ class VideoIdentifier:
         
         return None
     
-    def analyze_video_with_gemini(self, video_file: Any, video_info: Dict[str, Any]) -> Dict[str, Any]:
-        """使用Gemini API分析视频内容"""
-        
-        try:
-            # 创建Gemini模型
-            model = genai.GenerativeModel(
-                model_name='gemini-2.5-flash',
-                generation_config=genai.GenerationConfig(
-                    response_mime_type='application/json',
-                    temperature=0.3,
-                    max_output_tokens=40960
-                ),
-                safety_settings={
-                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
-                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
-                }
-            )
-            
-            # 生成内容
-            response = model.generate_content(
-                contents=[video_file, self.unified_system_prompt],
-                request_options={'timeout': 240}
-            )
-
-            # print(f"response: {response.text}")
-            
-            # 检查错误
-            if hasattr(response, '_error') and response._error:
-                raise Exception(f"生成错误: {response._error}")
-            
-            # 解析JSON响应
-            try:
-                result = json.loads(response.text.strip())
-                # print(f"[视频分析] 响应: {result}")
-                
-                if not isinstance(result, dict):
-                    raise ValueError("响应格式错误:非字典结构")
-                
-                # 确保包含所有必需字段
-                required_fields = ['asr_content', 'iframe_details']
-                for field in required_fields:
-                    if field not in result:
-                        if field == 'iframe_details':
-                            result[field] = [{
-                                'time_start': 0,
-                                'time_end': 0,
-                                'content': f'{field}分析失败',
-                                'ocr_content': f'{field}分析失败'
-                            }]
-                        else:
-                            result[field] = f"{field}分析失败"
-                
-                return result
-                
-            except json.JSONDecodeError as e:
-                print(f"JSON解析失败: {e}")
-                return {
-                    'asr_content': 'ASR分析失败:JSON解析错误',
-                    'iframe_details': [{
-                        'time_start': 0,
-                        'time_end': 0,
-                        'content': '关键帧分析失败:JSON解析错误',
-                        'ocr_content': '关键帧分析失败:JSON解析错误'
-                    }]
-                }
-                
-            else:
-                return {
-                    'asr_content': 'ASR分析失败:API无响应',
-                    'iframe_details': [{
-                        'time_start': 0,
-                        'time_end': 0,
-                        'content': '关键帧分析失败:API无响应',
-                        'ocr_content': '关键帧分析失败:API无响应'
-                    }]
-                }
-                
-        except Exception as e:
-            return {
-                'asr_content': f'ASR分析失败: {str(e)}',
-                'iframe_details': [{
-                    'time_start': 0,
-                    'time_end': 0,
-                    'content': f'关键帧分析失败: {str(e)}',
-                    'ocr_content': f'关键帧分析失败: {str(e)}'
-                }]
-            }
-    
     def extract_video_urls(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
         """提取视频URL列表"""
         video_data = []
@@ -426,132 +340,154 @@ class VideoIdentifier:
         
         return video_data
     
-    def process_video_single(self, video_info: Dict[str, Any]) -> Dict[str, Any]:
-        """处理单个视频的完整流程"""
-        # print(f"开始处理视频: {video_info['url'][:50]}...")
-        
-        video_path = None
-        video_file = None
-        try:
-            # 1. 下载视频
-            # print("  1. 下载视频...")
-            video_path = self.download_video(video_info['url'])
-            if not video_path:
-                # print("  视频下载失败")
-                return {
-                    'url': video_info['url'],
-                    'duration': video_info['duration'],
-                    'asr_content': '视频下载失败',
-                    'iframe_details': [{
-                        'time_start': 0,
-                        'time_end': 0,
-                        'content': '视频下载失败',
-                        'ocr_content': '视频下载失败'
-                    }]
-                }
-            
-            # 2. 上传到Gemini
-            # print("  2. 上传视频到Gemini...")
-            video_file = self.upload_video_to_gemini(video_path)
-            if not video_file:
-                # print("  视频上传到Gemini失败")
-                # 上传失败时也要清理缓存文件
-                if video_path and os.path.exists(video_path):
-                    try:
-                        os.remove(video_path)
-                        # print(f"  上传失败,缓存文件已清理: {video_path}")
-                    except Exception as e:
-                        print(f"  清理缓存文件失败: {e}")
-                
-                return {
-                    'url': video_info['url'],
-                    'duration': video_info['duration'],
-                    'asr_content': '视频上传失败',
-                    'iframe_details': [{
-                        'time_start': 0,
-                        'time_end': 0,
-                        'content': '视频上传失败',
-                        'ocr_content': '视频上传失败'
-                    }]
-                }
-            
-            # 3. 使用Gemini分析
-            # print("  3. 使用Gemini分析视频内容...")
-            analysis_result = self.analyze_video_with_gemini(video_file, video_info)
-            
-            # 4. 组合结果
-            final_result = {
-                'url': video_info['url'],
-                'duration': video_info['duration'],
-                'asr_content': analysis_result.get('asr_content', 'ASR分析失败'),
-                'iframe_details': analysis_result.get('iframe_details', '关键帧分析失败'),
-            }
-            
-            # print("  视频分析完成")
-            return final_result
-            
-        except Exception as e:
-            print(f"  视频处理异常: {e}")
-            # 异常情况下也要清理缓存文件
-            if video_path and os.path.exists(video_path):
-                try:
-                    os.remove(video_path)
-                    print(f"  异常处理,缓存文件已清理: {video_path}")
-                except Exception as e:
-                    print(f"  清理缓存文件失败: {e}")
-            
-            return {
-                'url': video_info['url'],
-                'duration': video_info['duration'],
-                'asr_content': f'处理异常: {str(e)}',
-                'iframe_details': [{
-                    'time_start': 0,
-                    'time_end': 0,
-                    'content': f'处理异常: {str(e)}',
-                    'ocr_content': f'处理异常: {str(e)}'
-                }],
-                'analysis_timestamp': int(time.time() * 1000)
-            }
-        finally:
-            # 清理临时文件
-            if video_path and os.path.exists(video_path):
-                try:
-                    os.remove(video_path)
-                    print(f"  临时文件已清理: {video_path}")
-                except Exception as e:
-                    print(f"  清理临时文件失败: {e}")
-            
-            # 清理Gemini文件
-            if video_file and hasattr(video_file, 'name'):
-                try:
-                    genai.delete_file(name=video_file.name)
-                    # print(f"  Gemini文件已清理: {video_file.name}")
-                except Exception as e:
-                    print(f"  清理Gemini文件失败: {e}")
-    
     def process_videos(self, formatted_content: Dict[str, Any]) -> List[Dict[str, Any]]:
         """处理视频识别的主函数"""
         
-        # 定期清理缓存
-        self.cleanup_cache()
-        
         # 提取视频URL
         video_data = self.extract_video_urls(formatted_content)
-        
         if not video_data:
             return []
-        
-        # 逐个处理视频
-        results = []
-        for i, video_info in enumerate(video_data):
-            result = self.process_video_single(video_info)
-            results.append(result)
-            
-            # 添加延迟避免API限制
-            if i < len(video_data) - 1:  # 不是最后一个视频
-                time.sleep(2)
-        
-        return results
+
+        # 使用 OpenRouter 批量处理,避免逐个上传/分析
+        return self.analyze_videos_with_openrouter(video_data)
+
+    def analyze_videos_with_openrouter(self, video_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """使用 Gemini 并发(最多5条)完成上传+分析的完整流程"""
+        try:
+            # 系统提示:严格限制为"仅提取内容,不做分析" [[memory:7272937]]
+            system_prompt = self.unified_system_prompt
+
+            # 保持输入顺序的结果数组
+            results: List[Dict[str, Any]] = [{} for _ in range(len(video_data))]
+            url_to_index = {item['url']: idx for idx, item in enumerate(video_data)}
+
+            def complete_video_job(item: Dict[str, Any]) -> Dict[str, Any]:
+                """完整的视频处理流程:下载->上传->分析->清理"""
+                print(f"开始处理视频: {item}")
+                url = item.get('url', '')
+                duration = item.get('duration', 0)
+                video_file = None
+                
+                try:
+                    # 1. 下载视频
+                    video_path = self.download_video(url)
+                    if not video_path:
+                        return {
+                            'url': url, 'duration': duration, 'asr_content': '视频下载失败', 'iframe_details': []
+                        }
+
+                    # 2. 上传到 Gemini
+                    video_file = self.upload_video_to_gemini(video_path)
+                    
+                    # 清理本地缓存文件
+                    try:
+                        if video_path and os.path.exists(video_path):
+                            os.remove(video_path)
+                    except Exception:
+                        pass
+
+                    if not video_file:
+                        return {
+                            'url': url, 'duration': duration, 'asr_content': '视频上传失败', 'iframe_details': []
+                        }
+
+                    # 3. 使用 Gemini 直接分析视频文件
+                    model = genai.GenerativeModel(
+                        model_name='gemini-2.5-flash',
+                        generation_config=genai.GenerationConfig(
+                            response_mime_type='application/json',
+                            temperature=0.3,
+                            max_output_tokens=40960
+                        ),
+                        safety_settings={
+                            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                            HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+                        }
+                    )
+                    print(f"开始内容分析: {video_file}")
+                    response = model.generate_content(
+                        contents=[video_file, system_prompt],
+                        request_options={'timeout': 240}
+                    )
+
+                    if hasattr(response, '_error') and response._error:
+                        raise Exception(f"生成错误: {response._error}")
+
+                    try:
+                        parsed = json.loads(response.text.strip())
+                        if not isinstance(parsed, dict):
+                            raise ValueError("响应格式错误:非字典结构")
+                        
+                        # 确保包含所有必需字段
+                        required_fields = ['asr_content', 'iframe_details']
+                        for field in required_fields:
+                            if field not in parsed:
+                                if field == 'iframe_details':
+                                    parsed[field] = [{
+                                        'time_start': 0,
+                                        'time_end': 0,
+                                        'content': f'{field}分析失败',
+                                        'ocr_content': f'{field}分析失败'
+                                    }]
+                                else:
+                                    parsed[field] = f"{field}分析失败"
+                        
+                        asr = parsed.get('asr_content', '')
+                        frames = parsed.get('iframe_details', [])
+                        if not isinstance(frames, list):
+                            frames = []
+                        return {'url': url, 'duration': duration, 'asr_content': asr, 'iframe_details': frames}
+                        
+                    except json.JSONDecodeError as e:
+                        print(f"JSON解析失败: {e}")
+                        return {
+                            'url': url, 'duration': duration,
+                            'asr_content': 'ASR分析失败:JSON解析错误',
+                            'iframe_details': [{
+                                'time_start': 0, 'time_end': 0,
+                                'content': '关键帧分析失败:JSON解析错误',
+                                'ocr_content': '关键帧分析失败:JSON解析错误'
+                            }]
+                        }
+                        
+                except Exception as e:
+                    return {
+                        'url': url, 'duration': duration,
+                        'asr_content': f'处理失败: {str(e)}',
+                        'iframe_details': [{
+                            'time_start': 0, 'time_end': 0,
+                            'content': f'处理失败: {str(e)}',
+                            'ocr_content': f'处理失败: {str(e)}'
+                        }]
+                    }
+                finally:
+                    # 4. 清理 Gemini 文件
+                    if video_file and hasattr(video_file, 'name'):
+                        try:
+                            genai.delete_file(name=video_file.name)
+                        except Exception:
+                            pass
+
+            # 并发处理所有视频(每个线程完成完整流程)
+            with ThreadPoolExecutor(max_workers=5) as pool:
+                future_to_item = {pool.submit(complete_video_job, item): item for item in video_data}
+                
+                for future in as_completed(list(future_to_item.keys())):
+                    result = future.result()
+                    url = result['url']
+                    idx = url_to_index[url]
+                    results[idx] = result
+
+            return results
+
+        except Exception as e:
+            print(f"OpenRouter 批量视频分析失败: {e}")
+            return [{
+                'url': item.get('url', ''),
+                'duration': item.get('duration', 0),
+                'asr_content': f'处理失败: {str(e)}',
+                'iframe_details': []
+            } for item in video_data]
 
 
 def main():
@@ -560,8 +496,28 @@ def main():
     test_content = {
         "video_url_list": [
             {
-                "video_url": "http://rescdn.yishihui.com/pipeline/video/6ab92036-a166-491d-935e-eeeb7c0f2779.mp4",
+                "video_url": "http://temp.yishihui.com/pipeline/video/202769cd-68a5-41a2-82d9-620d2c72a225.mp4",
                 "video_duration": 187
+            },
+            {
+                "video_url": "http://temp.yishihui.com/pipeline/video/43d11b20-6273-4ece-a146-94f63a3992a8.mp4",
+                "video_duration": 100
+            },
+            {
+                "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250731/57463792ND5eu5PAj95sVLi2gB.mp4",
+                "video_duration": 100
+            },
+            {
+                "video_url": "http://temp.yishihui.com/longvideo/transcode/crawler_local/video/prod/20250912/2c278614bd39fc2668f210d752141cb678956536.mp4",
+                "video_duration": 100
+            },
+            {
+                "video_url": "http://temp.yishihui.com/longvideo/transcode/video/vpc/20250809/5870d4dc9ba18ce57e5af27b81ff1398.mp4",
+                "video_duration": 100
+            },
+            {
+                "video_url": "http://temp.yishihui.com/pipeline/video/202769cd-68a5-41a2-82d9-620d2c72a225.mp4",
+                "video_duration": 100
             }
         ]
     }
@@ -569,7 +525,7 @@ def main():
     identifier = VideoIdentifier()
     result = identifier.process_videos(test_content)
     
-    # print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
+    print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
 
 
 if __name__ == '__main__':