| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- """
- Read Tool - 文件读取工具
- 参考 OpenCode read.ts 完整实现。
- 核心功能:
- - 支持文本文件、图片、PDF
- - 分页读取(offset/limit)
- - 二进制文件检测
- - 行长度和字节限制
- """
- import os
- import mimetypes
- from pathlib import Path
- from typing import Optional
- from agent.tools import tool, ToolResult, ToolContext
- # 常量(参考 opencode)
- DEFAULT_READ_LIMIT = 2000
- MAX_LINE_LENGTH = 2000
- MAX_BYTES = 50 * 1024 # 50KB
- @tool(description="读取文件内容,支持文本文件、图片、PDF 等多种格式")
- async def read_file(
- file_path: str,
- offset: int = 0,
- limit: int = DEFAULT_READ_LIMIT,
- uid: str = "",
- context: Optional[ToolContext] = None
- ) -> ToolResult:
- """
- 读取文件内容
- 参考 OpenCode 实现
- Args:
- file_path: 文件路径(绝对路径或相对路径)
- offset: 起始行号(从 0 开始)
- limit: 读取行数(默认 2000 行)
- uid: 用户 ID(自动注入)
- context: 工具上下文
- Returns:
- ToolResult: 文件内容
- """
- # 解析路径
- path = Path(file_path)
- if not path.is_absolute():
- path = Path.cwd() / path
- # 检查文件是否存在
- if not path.exists():
- # 尝试提供建议(参考 opencode:44-60)
- parent_dir = path.parent
- if parent_dir.exists():
- candidates = [
- f for f in parent_dir.iterdir()
- if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower()
- ][:3]
- if candidates:
- suggestions = "\n".join(str(c) for c in candidates)
- return ToolResult(
- title=f"文件未找到: {path.name}",
- output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}",
- error="File not found"
- )
- return ToolResult(
- title="文件未找到",
- output=f"文件不存在: {file_path}",
- error="File not found"
- )
- # 检测文件类型
- mime_type, _ = mimetypes.guess_type(str(path))
- mime_type = mime_type or ""
- # 图片文件(参考 opencode:66-91)
- if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]:
- # 注意:实际项目中需要实现图片的 base64 编码
- # 这里简化处理
- return ToolResult(
- title=path.name,
- output=f"图片文件: {path.name} (MIME: {mime_type})",
- metadata={"mime_type": mime_type, "truncated": False}
- )
- # PDF 文件
- if mime_type == "application/pdf":
- return ToolResult(
- title=path.name,
- output=f"PDF 文件: {path.name}",
- metadata={"mime_type": mime_type, "truncated": False}
- )
- # 二进制文件检测(参考 opencode:156-211)
- if _is_binary_file(path):
- return ToolResult(
- title="二进制文件",
- output=f"无法读取二进制文件: {path.name}",
- error="Binary file"
- )
- # 读取文本文件(参考 opencode:96-143)
- try:
- with open(path, 'r', encoding='utf-8') as f:
- lines = f.readlines()
- total_lines = len(lines)
- end_line = min(offset + limit, total_lines)
- # 截取行并处理长度限制
- output_lines = []
- total_bytes = 0
- truncated_by_bytes = False
- for i in range(offset, end_line):
- line = lines[i].rstrip('\n\r')
- # 行长度限制(参考 opencode:104)
- if len(line) > MAX_LINE_LENGTH:
- line = line[:MAX_LINE_LENGTH] + "..."
- # 字节限制(参考 opencode:105-112)
- line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
- if total_bytes + line_bytes > MAX_BYTES:
- truncated_by_bytes = True
- break
- output_lines.append(line)
- total_bytes += line_bytes
- # 格式化输出(参考 opencode:114-134)
- formatted = []
- for idx, line in enumerate(output_lines):
- line_num = offset + idx + 1
- formatted.append(f"{line_num:5d}| {line}")
- output = "<file>\n" + "\n".join(formatted)
- last_read_line = offset + len(output_lines)
- has_more = total_lines > last_read_line
- truncated = has_more or truncated_by_bytes
- # 添加提示
- if truncated_by_bytes:
- output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
- elif has_more:
- output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
- else:
- output += f"\n\n(文件结束 - 共 {total_lines} 行)"
- output += "\n</file>"
- # 预览(前 20 行)
- preview = "\n".join(output_lines[:20])
- return ToolResult(
- title=path.name,
- output=output,
- metadata={
- "preview": preview,
- "truncated": truncated,
- "total_lines": total_lines,
- "read_lines": len(output_lines)
- }
- )
- except UnicodeDecodeError:
- return ToolResult(
- title="编码错误",
- output=f"无法解码文件(非 UTF-8 编码): {path.name}",
- error="Encoding error"
- )
- except Exception as e:
- return ToolResult(
- title="读取错误",
- output=f"读取文件时出错: {str(e)}",
- error=str(e)
- )
- def _is_binary_file(path: Path) -> bool:
- """
- 检测是否为二进制文件
- 参考 OpenCode 实现
- """
- # 常见二进制扩展名
- binary_exts = {
- '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class',
- '.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx',
- '.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat',
- '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
- }
- if path.suffix.lower() in binary_exts:
- return True
- # 检查文件内容
- try:
- file_size = path.stat().st_size
- if file_size == 0:
- return False
- # 读取前 4KB
- buffer_size = min(4096, file_size)
- with open(path, 'rb') as f:
- buffer = f.read(buffer_size)
- # 检测 null 字节
- if b'\x00' in buffer:
- return True
- # 统计非打印字符(参考 opencode:202-210)
- non_printable = 0
- for byte in buffer:
- if byte < 9 or (13 < byte < 32):
- non_printable += 1
- # 如果超过 30% 是非打印字符,认为是二进制
- return non_printable / len(buffer) > 0.3
- except Exception:
- return False
|