read.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. """
  2. Read Tool - 文件读取工具
  3. 参考 OpenCode read.ts 完整实现。
  4. 核心功能:
  5. - 支持文本文件、图片、PDF
  6. - 分页读取(offset/limit)
  7. - 二进制文件检测
  8. - 行长度和字节限制
  9. """
  10. import os
  11. import base64
  12. import mimetypes
  13. from pathlib import Path
  14. from typing import Optional
  15. from urllib.parse import urlparse
  16. import httpx
  17. from agent.tools import tool, ToolResult, ToolContext
  18. # 常量(参考 opencode)
  19. DEFAULT_READ_LIMIT = 2000
  20. MAX_LINE_LENGTH = 2000
  21. MAX_BYTES = 50 * 1024 # 50KB
  22. @tool(description="读取文件内容,支持文本文件、图片、PDF 等多种格式,也支持 HTTP/HTTPS URL")
  23. async def read_file(
  24. file_path: str,
  25. offset: int = 0,
  26. limit: int = DEFAULT_READ_LIMIT,
  27. context: Optional[ToolContext] = None
  28. ) -> ToolResult:
  29. """
  30. 读取文件内容
  31. 参考 OpenCode 实现
  32. Args:
  33. file_path: 文件路径(绝对路径、相对路径或 HTTP/HTTPS URL)
  34. offset: 起始行号(从 0 开始)
  35. limit: 读取行数(默认 2000 行)
  36. context: 工具上下文
  37. Returns:
  38. ToolResult: 文件内容
  39. """
  40. # 检测是否为 HTTP/HTTPS URL
  41. parsed = urlparse(file_path)
  42. if parsed.scheme in ("http", "https"):
  43. return await _read_from_url(file_path)
  44. # 解析路径
  45. path = Path(file_path)
  46. if not path.is_absolute():
  47. path = Path.cwd() / path
  48. # 检查文件是否存在
  49. if not path.exists():
  50. # 尝试提供建议(参考 opencode:44-60)
  51. parent_dir = path.parent
  52. if parent_dir.exists():
  53. candidates = [
  54. f for f in parent_dir.iterdir()
  55. if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower()
  56. ][:3]
  57. if candidates:
  58. suggestions = "\n".join(str(c) for c in candidates)
  59. return ToolResult(
  60. title=f"文件未找到: {path.name}",
  61. output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}",
  62. error="File not found"
  63. )
  64. return ToolResult(
  65. title="文件未找到",
  66. output=f"文件不存在: {file_path}",
  67. error="File not found"
  68. )
  69. # 检测文件类型
  70. mime_type, _ = mimetypes.guess_type(str(path))
  71. mime_type = mime_type or ""
  72. # 图片文件(参考 opencode:66-91)
  73. if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]:
  74. try:
  75. raw = path.read_bytes()
  76. b64_data = base64.b64encode(raw).decode("ascii")
  77. return ToolResult(
  78. title=path.name,
  79. output=f"图片文件: {path.name} (MIME: {mime_type}, {len(raw)} bytes)",
  80. metadata={"mime_type": mime_type, "truncated": False},
  81. images=[{
  82. "type": "base64",
  83. "media_type": mime_type,
  84. "data": b64_data,
  85. }],
  86. )
  87. except Exception as e:
  88. return ToolResult(
  89. title=path.name,
  90. output=f"图片文件读取失败: {path.name}: {e}",
  91. error=str(e),
  92. )
  93. # PDF 文件
  94. if mime_type == "application/pdf":
  95. return ToolResult(
  96. title=path.name,
  97. output=f"PDF 文件: {path.name}",
  98. metadata={"mime_type": mime_type, "truncated": False}
  99. )
  100. # 二进制文件检测(参考 opencode:156-211)
  101. if _is_binary_file(path):
  102. return ToolResult(
  103. title="二进制文件",
  104. output=f"无法读取二进制文件: {path.name}",
  105. error="Binary file"
  106. )
  107. # 读取文本文件(参考 opencode:96-143)
  108. try:
  109. with open(path, 'r', encoding='utf-8') as f:
  110. lines = f.readlines()
  111. total_lines = len(lines)
  112. end_line = min(offset + limit, total_lines)
  113. # 截取行并处理长度限制
  114. output_lines = []
  115. total_bytes = 0
  116. truncated_by_bytes = False
  117. for i in range(offset, end_line):
  118. line = lines[i].rstrip('\n\r')
  119. # 行长度限制(参考 opencode:104)
  120. if len(line) > MAX_LINE_LENGTH:
  121. line = line[:MAX_LINE_LENGTH] + "..."
  122. # 字节限制(参考 opencode:105-112)
  123. line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
  124. if total_bytes + line_bytes > MAX_BYTES:
  125. truncated_by_bytes = True
  126. break
  127. output_lines.append(line)
  128. total_bytes += line_bytes
  129. # 格式化输出(参考 opencode:114-134)
  130. formatted = []
  131. for idx, line in enumerate(output_lines):
  132. line_num = offset + idx + 1
  133. formatted.append(f"{line_num:5d}| {line}")
  134. output = "<file>\n" + "\n".join(formatted)
  135. last_read_line = offset + len(output_lines)
  136. has_more = total_lines > last_read_line
  137. truncated = has_more or truncated_by_bytes
  138. # 添加提示
  139. if truncated_by_bytes:
  140. output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  141. elif has_more:
  142. output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  143. else:
  144. output += f"\n\n(文件结束 - 共 {total_lines} 行)"
  145. output += "\n</file>"
  146. # 预览(前 20 行)
  147. preview = "\n".join(output_lines[:20])
  148. return ToolResult(
  149. title=path.name,
  150. output=output,
  151. metadata={
  152. "preview": preview,
  153. "truncated": truncated,
  154. "total_lines": total_lines,
  155. "read_lines": len(output_lines)
  156. }
  157. )
  158. except UnicodeDecodeError:
  159. return ToolResult(
  160. title="编码错误",
  161. output=f"无法解码文件(非 UTF-8 编码): {path.name}",
  162. error="Encoding error"
  163. )
  164. except Exception as e:
  165. return ToolResult(
  166. title="读取错误",
  167. output=f"读取文件时出错: {str(e)}",
  168. error=str(e)
  169. )
  170. def _is_binary_file(path: Path) -> bool:
  171. """
  172. 检测是否为二进制文件
  173. 参考 OpenCode 实现
  174. """
  175. # 常见二进制扩展名
  176. binary_exts = {
  177. '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class',
  178. '.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx',
  179. '.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat',
  180. '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
  181. }
  182. if path.suffix.lower() in binary_exts:
  183. return True
  184. # 检查文件内容
  185. try:
  186. file_size = path.stat().st_size
  187. if file_size == 0:
  188. return False
  189. # 读取前 4KB
  190. buffer_size = min(4096, file_size)
  191. with open(path, 'rb') as f:
  192. buffer = f.read(buffer_size)
  193. # 检测 null 字节
  194. if b'\x00' in buffer:
  195. return True
  196. # 统计非打印字符(参考 opencode:202-210)
  197. non_printable = 0
  198. for byte in buffer:
  199. if byte < 9 or (13 < byte < 32):
  200. non_printable += 1
  201. # 如果超过 30% 是非打印字符,认为是二进制
  202. return non_printable / len(buffer) > 0.3
  203. except Exception:
  204. return False
  205. async def _read_from_url(url: str) -> ToolResult:
  206. """
  207. 从 HTTP/HTTPS URL 读取文件内容。
  208. 主要用于图片等多媒体资源,自动转换为 base64。
  209. """
  210. try:
  211. async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
  212. response = await client.get(url)
  213. response.raise_for_status()
  214. content_type = response.headers.get("content-type", "")
  215. raw = response.content
  216. # 从 URL 提取文件名
  217. from urllib.parse import urlparse
  218. parsed = urlparse(url)
  219. filename = Path(parsed.path).name or "downloaded_file"
  220. # 图片文件
  221. if content_type.startswith("image/") or any(url.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
  222. mime_type = content_type.split(";")[0] if content_type else "image/jpeg"
  223. b64_data = base64.b64encode(raw).decode("ascii")
  224. return ToolResult(
  225. title=filename,
  226. output=f"图片文件: {filename} (URL: {url}, MIME: {mime_type}, {len(raw)} bytes)",
  227. metadata={"mime_type": mime_type, "url": url, "truncated": False},
  228. images=[{
  229. "type": "base64",
  230. "media_type": mime_type,
  231. "data": b64_data,
  232. }],
  233. )
  234. # 文本文件
  235. if content_type.startswith("text/") or content_type == "application/json":
  236. text = raw.decode("utf-8", errors="replace")
  237. lines = text.split("\n")
  238. preview = "\n".join(lines[:20])
  239. return ToolResult(
  240. title=filename,
  241. output=f"<file>\n{text}\n</file>",
  242. metadata={
  243. "preview": preview,
  244. "url": url,
  245. "mime_type": content_type,
  246. "total_lines": len(lines),
  247. }
  248. )
  249. # 其他二进制文件
  250. return ToolResult(
  251. title=filename,
  252. output=f"二进制文件: {filename} (URL: {url}, {len(raw)} bytes)",
  253. metadata={"url": url, "mime_type": content_type, "size": len(raw)}
  254. )
  255. except httpx.HTTPStatusError as e:
  256. return ToolResult(
  257. title="HTTP 错误",
  258. output=f"无法下载文件: {url}\nHTTP {e.response.status_code}: {e.response.reason_phrase}",
  259. error=str(e)
  260. )
  261. except Exception as e:
  262. return ToolResult(
  263. title="下载失败",
  264. output=f"无法从 URL 读取文件: {url}\n错误: {str(e)}",
  265. error=str(e)
  266. )