read.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. """
  2. Read Tool - 文件读取工具
  3. 参考 OpenCode read.ts 完整实现。
  4. 核心功能:
  5. - 支持文本文件、图片、PDF
  6. - 分页读取(offset/limit)
  7. - 二进制文件检测
  8. - 行长度和字节限制
  9. """
  10. import os
  11. import base64
  12. import mimetypes
  13. from pathlib import Path
  14. from typing import Optional
  15. from urllib.parse import urlparse
  16. import httpx
  17. from agent.tools import tool, ToolResult, ToolContext
  18. # 常量(参考 opencode)
  19. DEFAULT_READ_LIMIT = 2000
  20. MAX_LINE_LENGTH = 2000
  21. MAX_BYTES = 50 * 1024 # 50KB
  22. @tool(description="读取单个文件内容,支持文本文件、图片、PDF 等多种格式,也支持 HTTP/HTTPS URL", hidden_params=["context"], groups=["core"])
  23. async def read_file(
  24. file_path: str,
  25. offset: int = 0,
  26. limit: int = DEFAULT_READ_LIMIT,
  27. char_offset: int = 0,
  28. context: Optional[ToolContext] = None
  29. ) -> ToolResult:
  30. """
  31. 读取单个文件内容
  32. 用于读取一个文本文件、PDF 或一张图片。如需批量读取多张图片(2 张以上)
  33. 并做对比/选图,请使用 read_images 工具,它支持自动降采样和网格拼图。
  34. 参考 OpenCode 实现
  35. Args:
  36. file_path: 文件路径(绝对路径、相对路径或 HTTP/HTTPS URL)
  37. offset: 起始行号(从 0 开始)
  38. limit: 读取行数(默认 2000 行)
  39. char_offset: 在**起始行**内跳过的字符数(默认 0)。用于分段续读超长单行:
  40. 当某行超过单行上限被截断时,输出会提示用 char_offset=<位置> 接着读该行。
  41. 适合 JSON 等"长字符串挤在一行"的数据文件(行级 offset 翻不动单行)。
  42. context: 工具上下文
  43. Returns:
  44. ToolResult: 文件内容
  45. """
  46. # 检测是否为 HTTP/HTTPS URL
  47. parsed = urlparse(file_path)
  48. if parsed.scheme in ("http", "https"):
  49. return await _read_from_url(file_path)
  50. # 解析路径
  51. path = Path(file_path)
  52. if not path.is_absolute():
  53. path = Path.cwd() / path
  54. # 检查文件是否存在
  55. if not path.exists():
  56. # 尝试提供建议(参考 opencode:44-60)
  57. parent_dir = path.parent
  58. if parent_dir.exists():
  59. candidates = [
  60. f for f in parent_dir.iterdir()
  61. if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower()
  62. ][:3]
  63. if candidates:
  64. suggestions = "\n".join(str(c) for c in candidates)
  65. return ToolResult(
  66. title=f"文件未找到: {path.name}",
  67. output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}",
  68. error="File not found"
  69. )
  70. return ToolResult(
  71. title="文件未找到",
  72. output=f"文件不存在: {file_path}",
  73. error="File not found"
  74. )
  75. # 检测文件类型
  76. mime_type, _ = mimetypes.guess_type(str(path))
  77. mime_type = mime_type or ""
  78. # 图片文件(参考 opencode:66-91)
  79. if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]:
  80. try:
  81. raw = path.read_bytes()
  82. b64_data = base64.b64encode(raw).decode("ascii")
  83. return ToolResult(
  84. title=path.name,
  85. output=f"图片文件: {path.name} (MIME: {mime_type}, {len(raw)} bytes)",
  86. metadata={"mime_type": mime_type, "truncated": False},
  87. images=[{
  88. "type": "base64",
  89. "media_type": mime_type,
  90. "data": b64_data,
  91. }],
  92. )
  93. except Exception as e:
  94. return ToolResult(
  95. title=path.name,
  96. output=f"图片文件读取失败: {path.name}: {e}",
  97. error=str(e),
  98. )
  99. # PDF 文件
  100. if mime_type == "application/pdf":
  101. return ToolResult(
  102. title=path.name,
  103. output=f"PDF 文件: {path.name}",
  104. metadata={"mime_type": mime_type, "truncated": False}
  105. )
  106. # 二进制文件检测(参考 opencode:156-211)
  107. if _is_binary_file(path):
  108. return ToolResult(
  109. title="二进制文件",
  110. output=f"无法读取二进制文件: {path.name}",
  111. error="Binary file"
  112. )
  113. # 读取文本文件(参考 opencode:96-143)
  114. try:
  115. with open(path, 'r', encoding='utf-8') as f:
  116. lines = f.readlines()
  117. total_lines = len(lines)
  118. end_line = min(offset + limit, total_lines)
  119. # 截取行并处理长度限制
  120. output_lines = []
  121. total_bytes = 0
  122. truncated_by_bytes = False
  123. line_continues_at = None # (行号0based, 该行已读到的字符位置) — 超长行可续读
  124. for i in range(offset, end_line):
  125. line = lines[i].rstrip('\n\r')
  126. # char_offset 只作用于起始行: 跳过已读过的前缀, 实现"接着读这一行"
  127. start = char_offset if (i == offset and char_offset > 0) else 0
  128. if start:
  129. line = line[start:]
  130. # 行长度限制(参考 opencode:104)。超长不再是"砍掉就没了":
  131. # 记录可续读的字符位置, 让 Agent 用 char_offset 分段读完整行。
  132. if len(line) > MAX_LINE_LENGTH:
  133. line = line[:MAX_LINE_LENGTH] + "..."
  134. line_continues_at = (i, start + MAX_LINE_LENGTH)
  135. # 字节限制(参考 opencode:105-112)
  136. line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
  137. if total_bytes + line_bytes > MAX_BYTES:
  138. truncated_by_bytes = True
  139. line_continues_at = None # 这行没真正放进去, 别误导续读
  140. break
  141. output_lines.append(line)
  142. total_bytes += line_bytes
  143. # 长行被截断: 先停在这, 让 Agent 用 char_offset 续读该行 (而非跳到下一行漏内容)
  144. if line_continues_at is not None:
  145. break
  146. # 格式化输出(参考 opencode:114-134)
  147. formatted = []
  148. for idx, line in enumerate(output_lines):
  149. line_num = offset + idx + 1
  150. formatted.append(f"{line_num:5d}| {line}")
  151. output = "<file>\n" + "\n".join(formatted)
  152. last_read_line = offset + len(output_lines)
  153. has_more = total_lines > last_read_line
  154. truncated = has_more or truncated_by_bytes or (line_continues_at is not None)
  155. # 添加提示 (优先提示"续读超长行", 否则才是行级翻页)
  156. if line_continues_at is not None:
  157. li, cpos = line_continues_at
  158. output += (f"\n\n(第 {li + 1} 行过长被截断, 已读到第 {cpos} 字符。"
  159. f"续读该行剩余内容: read_file(file_path=..., offset={li}, char_offset={cpos}))")
  160. elif truncated_by_bytes:
  161. output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  162. elif has_more:
  163. output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  164. else:
  165. output += f"\n\n(文件结束 - 共 {total_lines} 行)"
  166. output += "\n</file>"
  167. # 预览(前 20 行)
  168. preview = "\n".join(output_lines[:20])
  169. return ToolResult(
  170. title=path.name,
  171. output=output,
  172. metadata={
  173. "preview": preview,
  174. "truncated": truncated,
  175. "total_lines": total_lines,
  176. "read_lines": len(output_lines)
  177. }
  178. )
  179. except UnicodeDecodeError:
  180. return ToolResult(
  181. title="编码错误",
  182. output=f"无法解码文件(非 UTF-8 编码): {path.name}",
  183. error="Encoding error"
  184. )
  185. except Exception as e:
  186. return ToolResult(
  187. title="读取错误",
  188. output=f"读取文件时出错: {str(e)}",
  189. error=str(e)
  190. )
  191. def _is_binary_file(path: Path) -> bool:
  192. """
  193. 检测是否为二进制文件
  194. 参考 OpenCode 实现
  195. """
  196. # 常见二进制扩展名
  197. binary_exts = {
  198. '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class',
  199. '.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx',
  200. '.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat',
  201. '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
  202. }
  203. if path.suffix.lower() in binary_exts:
  204. return True
  205. # 检查文件内容
  206. try:
  207. file_size = path.stat().st_size
  208. if file_size == 0:
  209. return False
  210. # 读取前 4KB
  211. buffer_size = min(4096, file_size)
  212. with open(path, 'rb') as f:
  213. buffer = f.read(buffer_size)
  214. # 检测 null 字节
  215. if b'\x00' in buffer:
  216. return True
  217. # 统计非打印字符(参考 opencode:202-210)
  218. non_printable = 0
  219. for byte in buffer:
  220. if byte < 9 or (13 < byte < 32):
  221. non_printable += 1
  222. # 如果超过 30% 是非打印字符,认为是二进制
  223. return non_printable / len(buffer) > 0.3
  224. except Exception:
  225. return False
  226. async def _read_from_url(url: str) -> ToolResult:
  227. """
  228. 从 HTTP/HTTPS URL 读取文件内容。
  229. 主要用于图片等多媒体资源,自动转换为 base64。
  230. """
  231. try:
  232. async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
  233. response = await client.get(url)
  234. response.raise_for_status()
  235. content_type = response.headers.get("content-type", "")
  236. raw = response.content
  237. # 从 URL 提取文件名
  238. from urllib.parse import urlparse
  239. parsed = urlparse(url)
  240. filename = Path(parsed.path).name or "downloaded_file"
  241. # 图片文件
  242. if content_type.startswith("image/") or any(url.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
  243. mime_type = content_type.split(";")[0] if content_type else "image/jpeg"
  244. b64_data = base64.b64encode(raw).decode("ascii")
  245. return ToolResult(
  246. title=filename,
  247. output=f"图片文件: {filename} (URL: {url}, MIME: {mime_type}, {len(raw)} bytes)",
  248. metadata={"mime_type": mime_type, "url": url, "truncated": False},
  249. images=[{
  250. "type": "base64",
  251. "media_type": mime_type,
  252. "data": b64_data,
  253. }],
  254. )
  255. # 文本文件
  256. if content_type.startswith("text/") or content_type == "application/json":
  257. text = raw.decode("utf-8", errors="replace")
  258. lines = text.split("\n")
  259. preview = "\n".join(lines[:20])
  260. return ToolResult(
  261. title=filename,
  262. output=f"<file>\n{text}\n</file>",
  263. metadata={
  264. "preview": preview,
  265. "url": url,
  266. "mime_type": content_type,
  267. "total_lines": len(lines),
  268. }
  269. )
  270. # 其他二进制文件
  271. return ToolResult(
  272. title=filename,
  273. output=f"二进制文件: {filename} (URL: {url}, {len(raw)} bytes)",
  274. metadata={"url": url, "mime_type": content_type, "size": len(raw)}
  275. )
  276. except httpx.HTTPStatusError as e:
  277. return ToolResult(
  278. title="HTTP 错误",
  279. output=f"无法下载文件: {url}\nHTTP {e.response.status_code}: {e.response.reason_phrase}",
  280. error=str(e)
  281. )
  282. except Exception as e:
  283. return ToolResult(
  284. title="下载失败",
  285. output=f"无法从 URL 读取文件: {url}\n错误: {str(e)}",
  286. error=str(e)
  287. )