image.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. """
  2. 图片处理共享工具
  3. 提供批量读图、降采样、网格拼图等通用逻辑。供 read_images、content 工具族
  4. 等共享,避免代码重复。
  5. 核心函数:
  6. - load_image: 从本地路径或 URL 加载为 PIL Image
  7. - downscale: 等比降采样到指定最大边长
  8. - build_image_grid: 将多张图片拼成带索引编号 + 标题的网格图
  9. - encode_base64: PIL Image → base64 字符串(默认 JPEG 以节省 token)
  10. """
  11. import asyncio
  12. import base64
  13. import io
  14. import math
  15. from pathlib import Path
  16. from typing import List, Optional, Sequence, Tuple
  17. import httpx
  18. from PIL import Image, ImageDraw, ImageFont
  19. # ── 网格拼图默认参数 ──
  20. DEFAULT_THUMB_SIZE = 250 # 每格缩略图边长
  21. DEFAULT_TEXT_HEIGHT = 80 # 每格下方文字区高度
  22. DEFAULT_GRID_COLS = 5 # 每行几格
  23. DEFAULT_PADDING = 12
  24. DEFAULT_BG_COLOR = (255, 255, 255)
  25. DEFAULT_TEXT_COLOR = (30, 30, 30)
  26. DEFAULT_INDEX_COLOR = (220, 60, 60)
  27. # ── 字体候选(跨平台中文支持) ──
  28. # 注意:macOS 的 PingFang.ttc 因为格式原因 PIL/FreeType 无法读取,
  29. # 必须使用 Hiragino 或 STHeiti 等其他中文字体。
  30. _FONT_CANDIDATES = [
  31. # macOS(按优先级)
  32. "/System/Library/Fonts/Hiragino Sans GB.ttc", # 冬青黑体,macOS 自带
  33. "/System/Library/Fonts/STHeiti Medium.ttc", # 华文黑体
  34. "/System/Library/Fonts/Supplemental/Arial Unicode.ttf",
  35. # Linux
  36. "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
  37. "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
  38. "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
  39. # Windows
  40. "msyh.ttc", # 微软雅黑
  41. "simhei.ttf", # 黑体
  42. "simsun.ttc", # 宋体
  43. ]
  44. def _load_fonts(title_size: int = 16, index_size: int = 32):
  45. """加载中文字体,全部失败则退回默认字体"""
  46. for path in _FONT_CANDIDATES:
  47. try:
  48. return (
  49. ImageFont.truetype(path, title_size),
  50. ImageFont.truetype(path, index_size),
  51. )
  52. except Exception:
  53. continue
  54. default = ImageFont.load_default()
  55. return default, default
  56. # ── 加载图片 ──
  57. async def _load_image_from_url(client: httpx.AsyncClient, url: str) -> Optional[Image.Image]:
  58. """下载单张图片,失败返回 None"""
  59. try:
  60. resp = await client.get(url, timeout=15.0)
  61. resp.raise_for_status()
  62. return Image.open(io.BytesIO(resp.content)).convert("RGB")
  63. except Exception:
  64. return None
  65. def _load_image_from_path(path: str) -> Optional[Image.Image]:
  66. """从本地路径加载图片,失败返回 None"""
  67. try:
  68. return Image.open(path).convert("RGB")
  69. except Exception:
  70. return None
  71. async def load_image(source: str, client: Optional[httpx.AsyncClient] = None) -> Optional[Image.Image]:
  72. """
  73. 通用图片加载:自动识别 URL 或本地路径。
  74. Args:
  75. source: HTTP(S) URL 或本地文件路径
  76. client: 可选的 httpx 客户端(URL 加载时复用连接)
  77. Returns:
  78. PIL Image 对象(RGB 模式),失败返回 None
  79. """
  80. if source.startswith(("http://", "https://")):
  81. if client is not None:
  82. return await _load_image_from_url(client, source)
  83. async with httpx.AsyncClient() as c:
  84. return await _load_image_from_url(c, source)
  85. else:
  86. # 本地路径:在 executor 中执行以避免阻塞事件循环
  87. loop = asyncio.get_event_loop()
  88. return await loop.run_in_executor(None, _load_image_from_path, source)
  89. async def load_images(sources: Sequence[str]) -> List[Tuple[str, Optional[Image.Image]]]:
  90. """
  91. 并发批量加载图片。
  92. Returns:
  93. [(source, image_or_none), ...] — 保留原始顺序,失败项值为 None
  94. """
  95. async with httpx.AsyncClient() as client:
  96. tasks = [load_image(src, client) for src in sources]
  97. images = await asyncio.gather(*tasks)
  98. return list(zip(sources, images))
  99. # ── 降采样 ──
  100. def downscale(image: Image.Image, max_dimension: int) -> Image.Image:
  101. """
  102. 等比降采样到最大边不超过 max_dimension。
  103. 如果图片已经足够小则原样返回。
  104. """
  105. if max(image.width, image.height) <= max_dimension:
  106. return image
  107. scale = max_dimension / max(image.width, image.height)
  108. new_size = (int(image.width * scale), int(image.height * scale))
  109. return image.resize(new_size, Image.LANCZOS)
  110. # ── 网格拼图 ──
  111. def build_image_grid(
  112. images: Sequence[Image.Image],
  113. labels: Optional[Sequence[str]] = None,
  114. columns: int = DEFAULT_GRID_COLS,
  115. thumb_size: int = DEFAULT_THUMB_SIZE,
  116. text_height: int = DEFAULT_TEXT_HEIGHT,
  117. padding: int = DEFAULT_PADDING,
  118. show_index: bool = True,
  119. ) -> Image.Image:
  120. """
  121. 将多张图片拼成带索引编号 + 标题的网格图。
  122. 每个单元格包含:
  123. - 左上角红底白字的序号(1, 2, 3...)
  124. - 等比缩放居中的缩略图
  125. - 下方的标题文字(可选,自动按像素宽度换行)
  126. Args:
  127. images: 待拼接的 PIL Image 列表
  128. labels: 每张图的标题(与 images 等长);None 则不显示标题
  129. columns: 每行几格
  130. thumb_size: 每个缩略图格子的边长
  131. text_height: 每格下方文字区高度(labels 为 None 时自动置 0)
  132. padding: 格子间距和画布边距
  133. show_index: 是否显示左上角序号
  134. Returns:
  135. 拼接后的 PIL Image
  136. """
  137. if not images:
  138. raise ValueError("images 不能为空")
  139. if labels is None:
  140. labels = [""] * len(images)
  141. text_height = 0
  142. elif len(labels) != len(images):
  143. raise ValueError(f"labels 长度 {len(labels)} 与 images {len(images)} 不匹配")
  144. count = len(images)
  145. cols = min(columns, count)
  146. rows = math.ceil(count / cols)
  147. cell_w = thumb_size + padding
  148. cell_h = thumb_size + text_height + padding
  149. canvas_w = cols * cell_w + padding
  150. canvas_h = rows * cell_h + padding
  151. canvas = Image.new("RGB", (canvas_w, canvas_h), DEFAULT_BG_COLOR)
  152. draw = ImageDraw.Draw(canvas)
  153. # 索引框按 thumb_size 比例缩放,保证视觉比例恒定(约 20% 占比)
  154. index_box_size = max(40, thumb_size // 5)
  155. index_font_size = int(index_box_size * 0.65)
  156. # 标题字体略与 thumb_size 相关,但下限保证小图时可读
  157. title_font_size = max(14, thumb_size // 18)
  158. font_title, font_index = _load_fonts(
  159. title_size=title_font_size,
  160. index_size=index_font_size,
  161. )
  162. for idx, (img, label) in enumerate(zip(images, labels), start=1):
  163. col = (idx - 1) % cols
  164. row = (idx - 1) // cols
  165. x = padding + col * cell_w
  166. y = padding + row * cell_h
  167. # 等比缩放居中
  168. scale = min(thumb_size / img.width, thumb_size / img.height)
  169. new_w = int(img.width * scale)
  170. new_h = int(img.height * scale)
  171. thumb = img.resize((new_w, new_h), Image.LANCZOS)
  172. offset_x = x + (thumb_size - new_w) // 2
  173. offset_y = y + (thumb_size - new_h) // 2
  174. canvas.paste(thumb, (offset_x, offset_y))
  175. # 左上角序号(跟随实际缩略图位置,大小按比例)
  176. if show_index:
  177. index_text = str(idx)
  178. idx_x = offset_x
  179. idx_y = offset_y
  180. draw.rectangle(
  181. [idx_x, idx_y, idx_x + index_box_size, idx_y + index_box_size],
  182. fill=DEFAULT_INDEX_COLOR,
  183. )
  184. bbox = draw.textbbox((0, 0), index_text, font=font_index)
  185. tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
  186. # 文本垂直对齐用 bbox 的 top 偏移修正(font bbox 的 top 可能不为 0)
  187. text_x = idx_x + (index_box_size - tw) // 2 - bbox[0]
  188. text_y = idx_y + (index_box_size - th) // 2 - bbox[1]
  189. draw.text((text_x, text_y), index_text, fill=(255, 255, 255), font=font_index)
  190. # 下方标题(自动按像素宽度换行)
  191. if label and text_height > 0:
  192. lines = _wrap_text_by_pixel(label, font_title, thumb_size, draw)
  193. for line_i, line in enumerate(lines):
  194. draw.text(
  195. (x, y + thumb_size + 6 + line_i * 22),
  196. line,
  197. fill=DEFAULT_TEXT_COLOR,
  198. font=font_title,
  199. )
  200. return canvas
  201. def _wrap_text_by_pixel(text: str, font, max_width: int, draw: ImageDraw.ImageDraw) -> List[str]:
  202. """按像素宽度自动换行,兼容中英文混排(逐字符判断)"""
  203. lines = []
  204. current = ""
  205. for ch in text:
  206. test = current + ch
  207. bbox = draw.textbbox((0, 0), test, font=font)
  208. if bbox[2] - bbox[0] > max_width:
  209. if current:
  210. lines.append(current)
  211. current = ch
  212. else:
  213. current = test
  214. if current:
  215. lines.append(current)
  216. return lines
  217. # ── 编码为 base64 ──
  218. def encode_base64(image: Image.Image, format: str = "JPEG", quality: int = 75) -> Tuple[str, str]:
  219. """
  220. 将 PIL Image 编码为 base64 字符串。
  221. Args:
  222. image: PIL Image 对象
  223. format: "JPEG" 或 "PNG"。JPEG 体积更小,推荐用于多模态 LLM 输入
  224. quality: JPEG 质量(1-100),PNG 忽略此参数
  225. Returns:
  226. (base64_data, media_type) 元组,如 ("iVBOR...", "image/png")
  227. """
  228. buf = io.BytesIO()
  229. save_kwargs = {"format": format}
  230. if format.upper() == "JPEG":
  231. # JPEG 不支持透明通道
  232. if image.mode in ("RGBA", "LA", "P"):
  233. image = image.convert("RGB")
  234. save_kwargs["quality"] = quality
  235. save_kwargs["optimize"] = True
  236. image.save(buf, **save_kwargs)
  237. data = base64.b64encode(buf.getvalue()).decode("utf-8")
  238. media_type = f"image/{format.lower()}"
  239. if format.upper() == "JPEG":
  240. media_type = "image/jpeg"
  241. return data, media_type