read.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. """
  2. Read Tool - 文件读取工具
  3. 参考 OpenCode read.ts 完整实现。
  4. 核心功能:
  5. - 支持文本文件、图片、PDF
  6. - 分页读取(offset/limit)
  7. - 二进制文件检测
  8. - 行长度和字节限制
  9. """
  10. import os
  11. import base64
  12. import mimetypes
  13. from pathlib import Path
  14. from typing import Optional
  15. from urllib.parse import urlparse
  16. import httpx
  17. from agent.tools import tool, ToolResult, ToolContext
  18. # 常量(参考 opencode)
  19. DEFAULT_READ_LIMIT = 2000
  20. MAX_LINE_LENGTH = 2000
  21. MAX_BYTES = 50 * 1024 # 50KB
  22. @tool(description="读取单个文件内容,支持文本文件、图片、PDF 等多种格式,也支持 HTTP/HTTPS URL", hidden_params=["context"], groups=["core"])
  23. async def read_file(
  24. file_path: str,
  25. offset: int = 0,
  26. limit: int = DEFAULT_READ_LIMIT,
  27. context: Optional[ToolContext] = None
  28. ) -> ToolResult:
  29. """
  30. 读取单个文件内容
  31. 用于读取一个文本文件、PDF 或一张图片。如需批量读取多张图片(2 张以上)
  32. 并做对比/选图,请使用 read_images 工具,它支持自动降采样和网格拼图。
  33. 参考 OpenCode 实现
  34. Args:
  35. file_path: 文件路径(绝对路径、相对路径或 HTTP/HTTPS URL)
  36. offset: 起始行号(从 0 开始)
  37. limit: 读取行数(默认 2000 行)
  38. context: 工具上下文
  39. Returns:
  40. ToolResult: 文件内容
  41. """
  42. # 检测是否为 HTTP/HTTPS URL
  43. parsed = urlparse(file_path)
  44. if parsed.scheme in ("http", "https"):
  45. return await _read_from_url(file_path)
  46. # 解析路径
  47. path = Path(file_path)
  48. if not path.is_absolute():
  49. path = Path.cwd() / path
  50. # 检查文件是否存在
  51. if not path.exists():
  52. # 尝试提供建议(参考 opencode:44-60)
  53. parent_dir = path.parent
  54. if parent_dir.exists():
  55. candidates = [
  56. f for f in parent_dir.iterdir()
  57. if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower()
  58. ][:3]
  59. if candidates:
  60. suggestions = "\n".join(str(c) for c in candidates)
  61. return ToolResult(
  62. title=f"文件未找到: {path.name}",
  63. output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}",
  64. error="File not found"
  65. )
  66. return ToolResult(
  67. title="文件未找到",
  68. output=f"文件不存在: {file_path}",
  69. error="File not found"
  70. )
  71. # 检测文件类型
  72. mime_type, _ = mimetypes.guess_type(str(path))
  73. mime_type = mime_type or ""
  74. # 图片文件(参考 opencode:66-91)
  75. if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]:
  76. try:
  77. raw = path.read_bytes()
  78. b64_data = base64.b64encode(raw).decode("ascii")
  79. return ToolResult(
  80. title=path.name,
  81. output=f"图片文件: {path.name} (MIME: {mime_type}, {len(raw)} bytes)",
  82. metadata={"mime_type": mime_type, "truncated": False},
  83. images=[{
  84. "type": "base64",
  85. "media_type": mime_type,
  86. "data": b64_data,
  87. }],
  88. )
  89. except Exception as e:
  90. return ToolResult(
  91. title=path.name,
  92. output=f"图片文件读取失败: {path.name}: {e}",
  93. error=str(e),
  94. )
  95. # PDF 文件
  96. if mime_type == "application/pdf":
  97. return ToolResult(
  98. title=path.name,
  99. output=f"PDF 文件: {path.name}",
  100. metadata={"mime_type": mime_type, "truncated": False}
  101. )
  102. # 二进制文件检测(参考 opencode:156-211)
  103. if _is_binary_file(path):
  104. return ToolResult(
  105. title="二进制文件",
  106. output=f"无法读取二进制文件: {path.name}",
  107. error="Binary file"
  108. )
  109. # 读取文本文件(参考 opencode:96-143)
  110. try:
  111. with open(path, 'r', encoding='utf-8') as f:
  112. lines = f.readlines()
  113. total_lines = len(lines)
  114. end_line = min(offset + limit, total_lines)
  115. # 截取行并处理长度限制
  116. output_lines = []
  117. total_bytes = 0
  118. truncated_by_bytes = False
  119. for i in range(offset, end_line):
  120. line = lines[i].rstrip('\n\r')
  121. # 行长度限制(参考 opencode:104)
  122. if len(line) > MAX_LINE_LENGTH:
  123. line = line[:MAX_LINE_LENGTH] + "..."
  124. # 字节限制(参考 opencode:105-112)
  125. line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
  126. if total_bytes + line_bytes > MAX_BYTES:
  127. truncated_by_bytes = True
  128. break
  129. output_lines.append(line)
  130. total_bytes += line_bytes
  131. # 格式化输出(参考 opencode:114-134)
  132. formatted = []
  133. for idx, line in enumerate(output_lines):
  134. line_num = offset + idx + 1
  135. formatted.append(f"{line_num:5d}| {line}")
  136. output = "<file>\n" + "\n".join(formatted)
  137. last_read_line = offset + len(output_lines)
  138. has_more = total_lines > last_read_line
  139. truncated = has_more or truncated_by_bytes
  140. # 添加提示
  141. if truncated_by_bytes:
  142. output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  143. elif has_more:
  144. output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  145. else:
  146. output += f"\n\n(文件结束 - 共 {total_lines} 行)"
  147. output += "\n</file>"
  148. # 预览(前 20 行)
  149. preview = "\n".join(output_lines[:20])
  150. return ToolResult(
  151. title=path.name,
  152. output=output,
  153. metadata={
  154. "preview": preview,
  155. "truncated": truncated,
  156. "total_lines": total_lines,
  157. "read_lines": len(output_lines)
  158. }
  159. )
  160. except UnicodeDecodeError:
  161. return ToolResult(
  162. title="编码错误",
  163. output=f"无法解码文件(非 UTF-8 编码): {path.name}",
  164. error="Encoding error"
  165. )
  166. except Exception as e:
  167. return ToolResult(
  168. title="读取错误",
  169. output=f"读取文件时出错: {str(e)}",
  170. error=str(e)
  171. )
  172. def _is_binary_file(path: Path) -> bool:
  173. """
  174. 检测是否为二进制文件
  175. 参考 OpenCode 实现
  176. """
  177. # 常见二进制扩展名
  178. binary_exts = {
  179. '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class',
  180. '.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx',
  181. '.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat',
  182. '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
  183. }
  184. if path.suffix.lower() in binary_exts:
  185. return True
  186. # 检查文件内容
  187. try:
  188. file_size = path.stat().st_size
  189. if file_size == 0:
  190. return False
  191. # 读取前 4KB
  192. buffer_size = min(4096, file_size)
  193. with open(path, 'rb') as f:
  194. buffer = f.read(buffer_size)
  195. # 检测 null 字节
  196. if b'\x00' in buffer:
  197. return True
  198. # 统计非打印字符(参考 opencode:202-210)
  199. non_printable = 0
  200. for byte in buffer:
  201. if byte < 9 or (13 < byte < 32):
  202. non_printable += 1
  203. # 如果超过 30% 是非打印字符,认为是二进制
  204. return non_printable / len(buffer) > 0.3
  205. except Exception:
  206. return False
  207. async def _read_from_url(url: str) -> ToolResult:
  208. """
  209. 从 HTTP/HTTPS URL 读取文件内容。
  210. 主要用于图片等多媒体资源,自动转换为 base64。
  211. """
  212. try:
  213. async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
  214. response = await client.get(url)
  215. response.raise_for_status()
  216. content_type = response.headers.get("content-type", "")
  217. raw = response.content
  218. # 从 URL 提取文件名
  219. from urllib.parse import urlparse
  220. parsed = urlparse(url)
  221. filename = Path(parsed.path).name or "downloaded_file"
  222. # 图片文件
  223. if content_type.startswith("image/") or any(url.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
  224. mime_type = content_type.split(";")[0] if content_type else "image/jpeg"
  225. b64_data = base64.b64encode(raw).decode("ascii")
  226. return ToolResult(
  227. title=filename,
  228. output=f"图片文件: {filename} (URL: {url}, MIME: {mime_type}, {len(raw)} bytes)",
  229. metadata={"mime_type": mime_type, "url": url, "truncated": False},
  230. images=[{
  231. "type": "base64",
  232. "media_type": mime_type,
  233. "data": b64_data,
  234. }],
  235. )
  236. # 文本文件
  237. if content_type.startswith("text/") or content_type == "application/json":
  238. text = raw.decode("utf-8", errors="replace")
  239. lines = text.split("\n")
  240. preview = "\n".join(lines[:20])
  241. return ToolResult(
  242. title=filename,
  243. output=f"<file>\n{text}\n</file>",
  244. metadata={
  245. "preview": preview,
  246. "url": url,
  247. "mime_type": content_type,
  248. "total_lines": len(lines),
  249. }
  250. )
  251. # 其他二进制文件
  252. return ToolResult(
  253. title=filename,
  254. output=f"二进制文件: {filename} (URL: {url}, {len(raw)} bytes)",
  255. metadata={"url": url, "mime_type": content_type, "size": len(raw)}
  256. )
  257. except httpx.HTTPStatusError as e:
  258. return ToolResult(
  259. title="HTTP 错误",
  260. output=f"无法下载文件: {url}\nHTTP {e.response.status_code}: {e.response.reason_phrase}",
  261. error=str(e)
  262. )
  263. except Exception as e:
  264. return ToolResult(
  265. title="下载失败",
  266. output=f"无法从 URL 读取文件: {url}\n错误: {str(e)}",
  267. error=str(e)
  268. )