""" Read Tool - 文件读取工具 参考 OpenCode read.ts 完整实现。 核心功能: - 支持文本文件、图片、PDF - 分页读取(offset/limit) - 二进制文件检测 - 行长度和字节限制 """ import os import base64 import mimetypes from pathlib import Path from typing import Optional from urllib.parse import urlparse import httpx from agent.tools import tool, ToolResult, ToolContext # 常量(参考 opencode) DEFAULT_READ_LIMIT = 2000 MAX_LINE_LENGTH = 2000 MAX_BYTES = 50 * 1024 # 50KB @tool(description="读取文件内容,支持文本文件、图片、PDF 等多种格式,也支持 HTTP/HTTPS URL", hidden_params=["context"]) async def read_file( file_path: str, offset: int = 0, limit: int = DEFAULT_READ_LIMIT, context: Optional[ToolContext] = None ) -> ToolResult: """ 读取文件内容 参考 OpenCode 实现 Args: file_path: 文件路径(绝对路径、相对路径或 HTTP/HTTPS URL) offset: 起始行号(从 0 开始) limit: 读取行数(默认 2000 行) context: 工具上下文 Returns: ToolResult: 文件内容 """ # 检测是否为 HTTP/HTTPS URL parsed = urlparse(file_path) if parsed.scheme in ("http", "https"): return await _read_from_url(file_path) # 解析路径 path = Path(file_path) if not path.is_absolute(): path = Path.cwd() / path # 检查文件是否存在 if not path.exists(): # 尝试提供建议(参考 opencode:44-60) parent_dir = path.parent if parent_dir.exists(): candidates = [ f for f in parent_dir.iterdir() if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower() ][:3] if candidates: suggestions = "\n".join(str(c) for c in candidates) return ToolResult( title=f"文件未找到: {path.name}", output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}", error="File not found" ) return ToolResult( title="文件未找到", output=f"文件不存在: {file_path}", error="File not found" ) # 检测文件类型 mime_type, _ = mimetypes.guess_type(str(path)) mime_type = mime_type or "" # 图片文件(参考 opencode:66-91) if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]: try: raw = path.read_bytes() b64_data = base64.b64encode(raw).decode("ascii") return ToolResult( title=path.name, output=f"图片文件: {path.name} (MIME: {mime_type}, {len(raw)} bytes)", metadata={"mime_type": mime_type, "truncated": False}, images=[{ "type": "base64", "media_type": mime_type, "data": b64_data, }], ) except Exception as e: return ToolResult( title=path.name, output=f"图片文件读取失败: {path.name}: {e}", error=str(e), ) # PDF 文件 if mime_type == "application/pdf": return ToolResult( title=path.name, output=f"PDF 文件: {path.name}", metadata={"mime_type": mime_type, "truncated": False} ) # 二进制文件检测(参考 opencode:156-211) if _is_binary_file(path): return ToolResult( title="二进制文件", output=f"无法读取二进制文件: {path.name}", error="Binary file" ) # 读取文本文件(参考 opencode:96-143) try: with open(path, 'r', encoding='utf-8') as f: lines = f.readlines() total_lines = len(lines) end_line = min(offset + limit, total_lines) # 截取行并处理长度限制 output_lines = [] total_bytes = 0 truncated_by_bytes = False for i in range(offset, end_line): line = lines[i].rstrip('\n\r') # 行长度限制(参考 opencode:104) if len(line) > MAX_LINE_LENGTH: line = line[:MAX_LINE_LENGTH] + "..." # 字节限制(参考 opencode:105-112) line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0) if total_bytes + line_bytes > MAX_BYTES: truncated_by_bytes = True break output_lines.append(line) total_bytes += line_bytes # 格式化输出(参考 opencode:114-134) formatted = [] for idx, line in enumerate(output_lines): line_num = offset + idx + 1 formatted.append(f"{line_num:5d}| {line}") output = "\n" + "\n".join(formatted) last_read_line = offset + len(output_lines) has_more = total_lines > last_read_line truncated = has_more or truncated_by_bytes # 添加提示 if truncated_by_bytes: output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)" elif has_more: output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)" else: output += f"\n\n(文件结束 - 共 {total_lines} 行)" output += "\n" # 预览(前 20 行) preview = "\n".join(output_lines[:20]) return ToolResult( title=path.name, output=output, metadata={ "preview": preview, "truncated": truncated, "total_lines": total_lines, "read_lines": len(output_lines) } ) except UnicodeDecodeError: return ToolResult( title="编码错误", output=f"无法解码文件(非 UTF-8 编码): {path.name}", error="Encoding error" ) except Exception as e: return ToolResult( title="读取错误", output=f"读取文件时出错: {str(e)}", error=str(e) ) def _is_binary_file(path: Path) -> bool: """ 检测是否为二进制文件 参考 OpenCode 实现 """ # 常见二进制扩展名 binary_exts = { '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class', '.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat', '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo' } if path.suffix.lower() in binary_exts: return True # 检查文件内容 try: file_size = path.stat().st_size if file_size == 0: return False # 读取前 4KB buffer_size = min(4096, file_size) with open(path, 'rb') as f: buffer = f.read(buffer_size) # 检测 null 字节 if b'\x00' in buffer: return True # 统计非打印字符(参考 opencode:202-210) non_printable = 0 for byte in buffer: if byte < 9 or (13 < byte < 32): non_printable += 1 # 如果超过 30% 是非打印字符,认为是二进制 return non_printable / len(buffer) > 0.3 except Exception: return False async def _read_from_url(url: str) -> ToolResult: """ 从 HTTP/HTTPS URL 读取文件内容。 主要用于图片等多媒体资源,自动转换为 base64。 """ try: async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: response = await client.get(url) response.raise_for_status() content_type = response.headers.get("content-type", "") raw = response.content # 从 URL 提取文件名 from urllib.parse import urlparse parsed = urlparse(url) filename = Path(parsed.path).name or "downloaded_file" # 图片文件 if content_type.startswith("image/") or any(url.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]): mime_type = content_type.split(";")[0] if content_type else "image/jpeg" b64_data = base64.b64encode(raw).decode("ascii") return ToolResult( title=filename, output=f"图片文件: {filename} (URL: {url}, MIME: {mime_type}, {len(raw)} bytes)", metadata={"mime_type": mime_type, "url": url, "truncated": False}, images=[{ "type": "base64", "media_type": mime_type, "data": b64_data, }], ) # 文本文件 if content_type.startswith("text/") or content_type == "application/json": text = raw.decode("utf-8", errors="replace") lines = text.split("\n") preview = "\n".join(lines[:20]) return ToolResult( title=filename, output=f"\n{text}\n", metadata={ "preview": preview, "url": url, "mime_type": content_type, "total_lines": len(lines), } ) # 其他二进制文件 return ToolResult( title=filename, output=f"二进制文件: {filename} (URL: {url}, {len(raw)} bytes)", metadata={"url": url, "mime_type": content_type, "size": len(raw)} ) except httpx.HTTPStatusError as e: return ToolResult( title="HTTP 错误", output=f"无法下载文件: {url}\nHTTP {e.response.status_code}: {e.response.reason_phrase}", error=str(e) ) except Exception as e: return ToolResult( title="下载失败", output=f"无法从 URL 读取文件: {url}\n错误: {str(e)}", error=str(e) )