image_identifier.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 图文识别脚本
  5. 主要功能:使用 Gemini API 进行图片OCR识别
  6. """
  7. import os
  8. import json
  9. import time
  10. import sys
  11. from typing import Dict, Any, List, Optional
  12. from dotenv import load_dotenv
  13. import google.generativeai as genai
  14. from PIL import Image
  15. import requests
  16. from io import BytesIO
  17. from concurrent.futures import ThreadPoolExecutor, as_completed
  18. # 导入自定义模块
  19. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  20. from llm.openrouter import OpenRouterProcessor, OpenRouterModel
  21. # 构建OCR提示词
  22. prompt = """
  23. #### 人设
  24. 你是一名图像文字理解专家,请对输入的文章图片进行精准的文字提取和结构化整理。
  25. #### 任务要求如下:
  26. 1. 仅提取图片中可见的文字内容,不需要改写、总结或推理隐藏信息。
  27. 2. 如果图片包含结构(如表格、图表、标题、段落等),请按结构输出。
  28. 3. 所有提取的内容需保持原始顺序和排版上下文的逻辑。
  29. 4. 不需要进行OCR校正,只需要原样提取图中文字。
  30. 5. 舍弃图片中和标题不相关的文字
  31. 6. 对于结构不明确或自由排列的文字,按照从上到下、从左到右的顺序依次提取。
  32. #### 输出格式
  33. 1. 仅输出提取的文字即可,不需要其他说明性的文字
  34. """
  35. class ImageIdentifier:
  36. def __init__(self):
  37. # 加载环境变量
  38. load_dotenv()
  39. # 初始化Gemini API
  40. api_key = os.getenv('GEMINI_API_KEY')
  41. if not api_key:
  42. raise ValueError("请在环境变量中设置 GEMINI_API_KEY")
  43. genai.configure(api_key=api_key)
  44. self.model = genai.GenerativeModel('gemini-2.5-flash')
  45. def download_image(self, image_url: str) -> Optional[Image.Image]:
  46. """下载图片并转换为PIL Image对象"""
  47. try:
  48. response = requests.get(image_url, timeout=10)
  49. response.raise_for_status()
  50. image = Image.open(BytesIO(response.content))
  51. return image
  52. except Exception as e:
  53. print(f"下载图片失败 {image_url}: {e}")
  54. return None
  55. def extract_image_urls(self, formatted_content: Dict[str, Any]) -> List[str]:
  56. """提取图片URL列表"""
  57. image_urls = []
  58. image_url_list = formatted_content.get('image_url_list', [])
  59. for img_data in image_url_list:
  60. if isinstance(img_data, dict) and 'image_url' in img_data:
  61. image_urls.append(img_data['image_url'])
  62. return image_urls
  63. def analyze_images_with_gemini(self, image_urls: List[str]) -> Dict[str, Any]:
  64. """使用 Gemini 并发(最多5条)提取图片文字(仅内容提取)"""
  65. try:
  66. if not image_urls:
  67. return {"images_comprehension": [], "error": "没有图片需要分析"}
  68. # 系统提示:严格限制为"仅提取文字,不做分析" [[memory:7272937]]
  69. system_prompt = prompt
  70. # 保持输入顺序
  71. results: List[Dict[str, Any]] = [{} for _ in range(len(image_urls))]
  72. def analyze_image_job(idx_and_url) -> Dict[str, Any]:
  73. idx, url = idx_and_url
  74. try:
  75. # 下载图片
  76. image = self.download_image(url)
  77. if image is None:
  78. return {"idx": idx, "url": url, "content": "", "success": False, "error": "图片下载失败"}
  79. # 使用 Gemini 直接分析图片
  80. response = self.model.generate_content([system_prompt, image])
  81. if response.text:
  82. return {"idx": idx, "url": url, "content": response.text, "success": True}
  83. else:
  84. return {"idx": idx, "url": url, "content": "", "success": False, "error": "识别失败或无内容返回"}
  85. except Exception as e:
  86. return {"idx": idx, "url": url, "content": "", "success": False, "error": str(e)}
  87. # 并发最多5条
  88. with ThreadPoolExecutor(max_workers=5) as executor:
  89. future_to_index = {}
  90. for idx, url in enumerate(image_urls):
  91. future = executor.submit(analyze_image_job, (idx, url))
  92. future_to_index[future] = idx
  93. for future in as_completed(list(future_to_index.keys())):
  94. result = future.result()
  95. idx = result["idx"]
  96. results[idx] = {
  97. "url": result["url"],
  98. "content": result["content"],
  99. "success": result["success"]
  100. }
  101. if not result["success"]:
  102. results[idx]["error"] = result["error"]
  103. return {"images_comprehension": results}
  104. except Exception as e:
  105. print(f"Gemini 并发调用失败: {e}")
  106. return {"images_comprehension": [], "error": f"Gemini API 调用失败: {str(e)}"}
  107. def process_images(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
  108. """处理图片识别的主函数"""
  109. # 提取图片URL
  110. image_urls = self.extract_image_urls(formatted_content)
  111. if not image_urls:
  112. print("没有图片需要分析")
  113. return {"images_comprehension": [], "error": "没有图片需要分析"}
  114. # 分析图片
  115. result = self.analyze_images_with_gemini(image_urls)
  116. if result.get("images_comprehension"):
  117. successful_count = sum(1 for img in result['images_comprehension'] if img.get('success', False))
  118. else:
  119. print("图片OCR识别失败")
  120. return result
  121. def main():
  122. """测试函数"""
  123. # 模拟数据
  124. test_content = {
  125. "image_url_list": [
  126. {
  127. "image_type": 2,
  128. "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
  129. },
  130. {
  131. "image_type": 2,
  132. "image_url": "http://rescdn.yishihui.com/pipeline/image/ea4f33e9-9e36-4124-aaec-138ea9bcadd9.jpg"
  133. }
  134. ]
  135. }
  136. try:
  137. identifier = ImageIdentifier()
  138. result = identifier.process_images(test_content)
  139. print(f"识别结果: {json.dumps(result, ensure_ascii=False, indent=2)}")
  140. except Exception as e:
  141. print(f"初始化失败: {e}")
  142. if __name__ == '__main__':
  143. main()