read.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. """
  2. Read Tool - 文件读取工具
  3. 参考 OpenCode read.ts 完整实现。
  4. 核心功能:
  5. - 支持文本文件、图片、PDF
  6. - 分页读取(offset/limit)
  7. - 二进制文件检测
  8. - 行长度和字节限制
  9. """
  10. import os
  11. import base64
  12. import mimetypes
  13. from pathlib import Path
  14. from typing import Optional
  15. from agent.tools import tool, ToolResult, ToolContext
  16. # 常量(参考 opencode)
  17. DEFAULT_READ_LIMIT = 2000
  18. MAX_LINE_LENGTH = 2000
  19. MAX_BYTES = 50 * 1024 # 50KB
  20. @tool(description="读取文件内容,支持文本文件、图片、PDF 等多种格式")
  21. async def read_file(
  22. file_path: str,
  23. offset: int = 0,
  24. limit: int = DEFAULT_READ_LIMIT,
  25. context: Optional[ToolContext] = None
  26. ) -> ToolResult:
  27. """
  28. 读取文件内容
  29. 参考 OpenCode 实现
  30. Args:
  31. file_path: 文件路径(绝对路径或相对路径)
  32. offset: 起始行号(从 0 开始)
  33. limit: 读取行数(默认 2000 行)
  34. context: 工具上下文
  35. Returns:
  36. ToolResult: 文件内容
  37. """
  38. # 解析路径
  39. path = Path(file_path)
  40. if not path.is_absolute():
  41. path = Path.cwd() / path
  42. # 检查文件是否存在
  43. if not path.exists():
  44. # 尝试提供建议(参考 opencode:44-60)
  45. parent_dir = path.parent
  46. if parent_dir.exists():
  47. candidates = [
  48. f for f in parent_dir.iterdir()
  49. if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower()
  50. ][:3]
  51. if candidates:
  52. suggestions = "\n".join(str(c) for c in candidates)
  53. return ToolResult(
  54. title=f"文件未找到: {path.name}",
  55. output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}",
  56. error="File not found"
  57. )
  58. return ToolResult(
  59. title="文件未找到",
  60. output=f"文件不存在: {file_path}",
  61. error="File not found"
  62. )
  63. # 检测文件类型
  64. mime_type, _ = mimetypes.guess_type(str(path))
  65. mime_type = mime_type or ""
  66. # 图片文件(参考 opencode:66-91)
  67. if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]:
  68. try:
  69. raw = path.read_bytes()
  70. b64_data = base64.b64encode(raw).decode("ascii")
  71. return ToolResult(
  72. title=path.name,
  73. output=f"图片文件: {path.name} (MIME: {mime_type}, {len(raw)} bytes)",
  74. metadata={"mime_type": mime_type, "truncated": False},
  75. images=[{
  76. "type": "base64",
  77. "media_type": mime_type,
  78. "data": b64_data,
  79. }],
  80. )
  81. except Exception as e:
  82. return ToolResult(
  83. title=path.name,
  84. output=f"图片文件读取失败: {path.name}: {e}",
  85. error=str(e),
  86. )
  87. # PDF 文件
  88. if mime_type == "application/pdf":
  89. return ToolResult(
  90. title=path.name,
  91. output=f"PDF 文件: {path.name}",
  92. metadata={"mime_type": mime_type, "truncated": False}
  93. )
  94. # 二进制文件检测(参考 opencode:156-211)
  95. if _is_binary_file(path):
  96. return ToolResult(
  97. title="二进制文件",
  98. output=f"无法读取二进制文件: {path.name}",
  99. error="Binary file"
  100. )
  101. # 读取文本文件(参考 opencode:96-143)
  102. try:
  103. with open(path, 'r', encoding='utf-8') as f:
  104. lines = f.readlines()
  105. total_lines = len(lines)
  106. end_line = min(offset + limit, total_lines)
  107. # 截取行并处理长度限制
  108. output_lines = []
  109. total_bytes = 0
  110. truncated_by_bytes = False
  111. for i in range(offset, end_line):
  112. line = lines[i].rstrip('\n\r')
  113. # 行长度限制(参考 opencode:104)
  114. if len(line) > MAX_LINE_LENGTH:
  115. line = line[:MAX_LINE_LENGTH] + "..."
  116. # 字节限制(参考 opencode:105-112)
  117. line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
  118. if total_bytes + line_bytes > MAX_BYTES:
  119. truncated_by_bytes = True
  120. break
  121. output_lines.append(line)
  122. total_bytes += line_bytes
  123. # 格式化输出(参考 opencode:114-134)
  124. formatted = []
  125. for idx, line in enumerate(output_lines):
  126. line_num = offset + idx + 1
  127. formatted.append(f"{line_num:5d}| {line}")
  128. output = "<file>\n" + "\n".join(formatted)
  129. last_read_line = offset + len(output_lines)
  130. has_more = total_lines > last_read_line
  131. truncated = has_more or truncated_by_bytes
  132. # 添加提示
  133. if truncated_by_bytes:
  134. output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  135. elif has_more:
  136. output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
  137. else:
  138. output += f"\n\n(文件结束 - 共 {total_lines} 行)"
  139. output += "\n</file>"
  140. # 预览(前 20 行)
  141. preview = "\n".join(output_lines[:20])
  142. return ToolResult(
  143. title=path.name,
  144. output=output,
  145. metadata={
  146. "preview": preview,
  147. "truncated": truncated,
  148. "total_lines": total_lines,
  149. "read_lines": len(output_lines)
  150. }
  151. )
  152. except UnicodeDecodeError:
  153. return ToolResult(
  154. title="编码错误",
  155. output=f"无法解码文件(非 UTF-8 编码): {path.name}",
  156. error="Encoding error"
  157. )
  158. except Exception as e:
  159. return ToolResult(
  160. title="读取错误",
  161. output=f"读取文件时出错: {str(e)}",
  162. error=str(e)
  163. )
  164. def _is_binary_file(path: Path) -> bool:
  165. """
  166. 检测是否为二进制文件
  167. 参考 OpenCode 实现
  168. """
  169. # 常见二进制扩展名
  170. binary_exts = {
  171. '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class',
  172. '.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx',
  173. '.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat',
  174. '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
  175. }
  176. if path.suffix.lower() in binary_exts:
  177. return True
  178. # 检查文件内容
  179. try:
  180. file_size = path.stat().st_size
  181. if file_size == 0:
  182. return False
  183. # 读取前 4KB
  184. buffer_size = min(4096, file_size)
  185. with open(path, 'rb') as f:
  186. buffer = f.read(buffer_size)
  187. # 检测 null 字节
  188. if b'\x00' in buffer:
  189. return True
  190. # 统计非打印字符(参考 opencode:202-210)
  191. non_printable = 0
  192. for byte in buffer:
  193. if byte < 9 or (13 < byte < 32):
  194. non_printable += 1
  195. # 如果超过 30% 是非打印字符,认为是二进制
  196. return non_printable / len(buffer) > 0.3
  197. except Exception:
  198. return False