image_identifier.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 图文识别脚本
  5. 主要功能:使用 Gemini API 进行图片OCR识别
  6. """
  7. import os
  8. import json
  9. import time
  10. import sys
  11. from typing import Dict, Any, List, Optional
  12. from dotenv import load_dotenv
  13. import google.generativeai as genai
  14. from PIL import Image
  15. import requests
  16. from io import BytesIO
  17. from concurrent.futures import ThreadPoolExecutor, as_completed
  18. # 导入自定义模块
  19. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  20. from llm.openrouter import OpenRouterProcessor, OpenRouterModel
  21. # 构建OCR提示词
  22. prompt = """
  23. #### 人设
  24. 你是一名图像文字理解专家,请对输入的文章图片进行精准的文字提取和结构化整理。
  25. #### 任务要求如下:
  26. 1. 仅提取图片中可见的文字内容,不需要改写、总结或推理隐藏信息。
  27. 2. 如果图片包含结构(如表格、图表、标题、段落等),请按结构输出。
  28. 3. 所有提取的内容需保持原始顺序和排版上下文的逻辑。
  29. 4. 不需要进行OCR校正,只需要原样提取图中文字。
  30. 5. 舍弃图片中和标题不相关的文字
  31. 6. 对于结构不明确或自由排列的文字,按照从上到下、从左到右的顺序依次提取。
  32. #### 输出格式
  33. 1. 仅输出提取的文字即可,不需要其他说明性的文字
  34. """
  35. class ImageIdentifier:
  36. def __init__(self):
  37. # 加载环境变量
  38. load_dotenv()
  39. # 延迟配置Gemini,在真正使用时再设置
  40. self._configured = False
  41. self.model = None
  42. def _ensure_configured(self):
  43. """确保Gemini已配置"""
  44. if not self._configured:
  45. self.api_key = os.getenv('GEMINI_API_KEY')
  46. print(f"配置Gemini: {self.api_key}")
  47. if not self.api_key:
  48. raise ValueError("请在环境变量中设置 GEMINI_API_KEY")
  49. genai.configure(api_key=self.api_key)
  50. self.model = genai.GenerativeModel('gemini-2.5-flash')
  51. self._configured = True
  52. def download_image(self, image_url: str) -> Optional[Image.Image]:
  53. """下载图片并转换为PIL Image对象"""
  54. try:
  55. response = requests.get(image_url, timeout=10)
  56. response.raise_for_status()
  57. image = Image.open(BytesIO(response.content))
  58. return image
  59. except Exception as e:
  60. print(f"下载图片失败 {image_url}: {e}")
  61. return None
  62. def extract_image_urls(self, formatted_content: Dict[str, Any]) -> List[str]:
  63. """提取图片URL列表"""
  64. image_urls = []
  65. image_url_list = formatted_content.get('image_url_list', [])
  66. for img_data in image_url_list:
  67. if isinstance(img_data, dict) and 'image_url' in img_data:
  68. image_urls.append(img_data['image_url'])
  69. return image_urls
  70. def analyze_images_with_gemini(self, image_urls: List[str]) -> Dict[str, Any]:
  71. """使用 Gemini 并发(最多5条)提取图片文字(仅内容提取)"""
  72. try:
  73. if not image_urls:
  74. return {"images_comprehension": [], "error": "没有图片需要分析"}
  75. # 系统提示:严格限制为"仅提取文字,不做分析" [[memory:7272937]]
  76. system_prompt = prompt
  77. # 保持输入顺序
  78. results: List[Dict[str, Any]] = [{} for _ in range(len(image_urls))]
  79. def analyze_image_job(idx_and_url) -> Dict[str, Any]:
  80. idx, url = idx_and_url
  81. try:
  82. # 下载图片
  83. image = self.download_image(url)
  84. if image is None:
  85. return {"idx": idx, "url": url, "content": "", "success": False, "error": "图片下载失败"}
  86. # 使用 Gemini 直接分析图片
  87. self._ensure_configured()
  88. response = self.model.generate_content([system_prompt, image])
  89. if response.text:
  90. return {"idx": idx, "url": url, "content": response.text, "success": True}
  91. else:
  92. return {"idx": idx, "url": url, "content": "", "success": False, "error": "识别失败或无内容返回"}
  93. except Exception as e:
  94. return {"idx": idx, "url": url, "content": "", "success": False, "error": str(e)}
  95. # 并发最多5条
  96. with ThreadPoolExecutor(max_workers=5) as executor:
  97. future_to_index = {}
  98. for idx, url in enumerate(image_urls):
  99. future = executor.submit(analyze_image_job, (idx, url))
  100. future_to_index[future] = idx
  101. for future in as_completed(list(future_to_index.keys())):
  102. result = future.result()
  103. idx = result["idx"]
  104. results[idx] = {
  105. "url": result["url"],
  106. "content": result["content"],
  107. "success": result["success"]
  108. }
  109. if not result["success"]:
  110. results[idx]["error"] = result["error"]
  111. return {"images_comprehension": results}
  112. except Exception as e:
  113. print(f"Gemini 并发调用失败: {e}")
  114. return {"images_comprehension": [], "error": f"Gemini API 调用失败: {str(e)}"}
  115. def process_images(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
  116. """处理图片识别的主函数"""
  117. # 提取图片URL
  118. image_urls = self.extract_image_urls(formatted_content)
  119. if not image_urls:
  120. print("没有图片需要分析")
  121. return {"images_comprehension": [], "error": "没有图片需要分析"}
  122. # 分析图片
  123. result = self.analyze_images_with_gemini(image_urls)
  124. if result.get("images_comprehension"):
  125. successful_count = sum(1 for img in result['images_comprehension'] if img.get('success', False))
  126. else:
  127. print("图片OCR识别失败")
  128. return result
  129. def main():
  130. """测试函数"""
  131. # 模拟数据
  132. test_content = {
  133. "image_url_list": [
  134. {
  135. "image_type": 2,
  136. "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
  137. },
  138. {
  139. "image_type": 2,
  140. "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
  141. }
  142. ]
  143. }
  144. try:
  145. identifier = ImageIdentifier()
  146. result = identifier.process_images(test_content)
  147. print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
  148. except Exception as e:
  149. print(f"初始化失败: {e}")
  150. if __name__ == '__main__':
  151. main()