"""
Read Tool - 文件读取工具
参考 OpenCode read.ts 完整实现。
核心功能:
- 支持文本文件、图片、PDF
- 分页读取(offset/limit)
- 二进制文件检测
- 行长度和字节限制
"""
import os
import base64
import mimetypes
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
import httpx
from agent.tools import tool, ToolResult, ToolContext
# 常量(参考 opencode)
DEFAULT_READ_LIMIT = 2000
MAX_LINE_LENGTH = 2000
MAX_BYTES = 50 * 1024 # 50KB
@tool(description="读取文件内容,支持文本文件、图片、PDF 等多种格式,也支持 HTTP/HTTPS URL", hidden_params=["context"])
async def read_file(
file_path: str,
offset: int = 0,
limit: int = DEFAULT_READ_LIMIT,
context: Optional[ToolContext] = None
) -> ToolResult:
"""
读取文件内容
参考 OpenCode 实现
Args:
file_path: 文件路径(绝对路径、相对路径或 HTTP/HTTPS URL)
offset: 起始行号(从 0 开始)
limit: 读取行数(默认 2000 行)
context: 工具上下文
Returns:
ToolResult: 文件内容
"""
# 检测是否为 HTTP/HTTPS URL
parsed = urlparse(file_path)
if parsed.scheme in ("http", "https"):
return await _read_from_url(file_path)
# 解析路径
path = Path(file_path)
if not path.is_absolute():
path = Path.cwd() / path
# 检查文件是否存在
if not path.exists():
# 尝试提供建议(参考 opencode:44-60)
parent_dir = path.parent
if parent_dir.exists():
candidates = [
f for f in parent_dir.iterdir()
if path.name.lower() in f.name.lower() or f.name.lower() in path.name.lower()
][:3]
if candidates:
suggestions = "\n".join(str(c) for c in candidates)
return ToolResult(
title=f"文件未找到: {path.name}",
output=f"文件不存在: {file_path}\n\n你是否想要:\n{suggestions}",
error="File not found"
)
return ToolResult(
title="文件未找到",
output=f"文件不存在: {file_path}",
error="File not found"
)
# 检测文件类型
mime_type, _ = mimetypes.guess_type(str(path))
mime_type = mime_type or ""
# 图片文件(参考 opencode:66-91)
if mime_type.startswith("image/") and mime_type not in ["image/svg+xml", "image/vnd.fastbidsheet"]:
try:
raw = path.read_bytes()
b64_data = base64.b64encode(raw).decode("ascii")
return ToolResult(
title=path.name,
output=f"图片文件: {path.name} (MIME: {mime_type}, {len(raw)} bytes)",
metadata={"mime_type": mime_type, "truncated": False},
images=[{
"type": "base64",
"media_type": mime_type,
"data": b64_data,
}],
)
except Exception as e:
return ToolResult(
title=path.name,
output=f"图片文件读取失败: {path.name}: {e}",
error=str(e),
)
# PDF 文件
if mime_type == "application/pdf":
return ToolResult(
title=path.name,
output=f"PDF 文件: {path.name}",
metadata={"mime_type": mime_type, "truncated": False}
)
# 二进制文件检测(参考 opencode:156-211)
if _is_binary_file(path):
return ToolResult(
title="二进制文件",
output=f"无法读取二进制文件: {path.name}",
error="Binary file"
)
# 读取文本文件(参考 opencode:96-143)
try:
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines()
total_lines = len(lines)
end_line = min(offset + limit, total_lines)
# 截取行并处理长度限制
output_lines = []
total_bytes = 0
truncated_by_bytes = False
for i in range(offset, end_line):
line = lines[i].rstrip('\n\r')
# 行长度限制(参考 opencode:104)
if len(line) > MAX_LINE_LENGTH:
line = line[:MAX_LINE_LENGTH] + "..."
# 字节限制(参考 opencode:105-112)
line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
if total_bytes + line_bytes > MAX_BYTES:
truncated_by_bytes = True
break
output_lines.append(line)
total_bytes += line_bytes
# 格式化输出(参考 opencode:114-134)
formatted = []
for idx, line in enumerate(output_lines):
line_num = offset + idx + 1
formatted.append(f"{line_num:5d}| {line}")
output = "\n" + "\n".join(formatted)
last_read_line = offset + len(output_lines)
has_more = total_lines > last_read_line
truncated = has_more or truncated_by_bytes
# 添加提示
if truncated_by_bytes:
output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
elif has_more:
output += f"\n\n(文件还有更多内容。使用 'offset' 参数读取第 {last_read_line} 行之后的内容)"
else:
output += f"\n\n(文件结束 - 共 {total_lines} 行)"
output += "\n"
# 预览(前 20 行)
preview = "\n".join(output_lines[:20])
return ToolResult(
title=path.name,
output=output,
metadata={
"preview": preview,
"truncated": truncated,
"total_lines": total_lines,
"read_lines": len(output_lines)
}
)
except UnicodeDecodeError:
return ToolResult(
title="编码错误",
output=f"无法解码文件(非 UTF-8 编码): {path.name}",
error="Encoding error"
)
except Exception as e:
return ToolResult(
title="读取错误",
output=f"读取文件时出错: {str(e)}",
error=str(e)
)
def _is_binary_file(path: Path) -> bool:
"""
检测是否为二进制文件
参考 OpenCode 实现
"""
# 常见二进制扩展名
binary_exts = {
'.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class',
'.jar', '.war', '.7z', '.doc', '.docx', '.xls', '.xlsx',
'.ppt', '.pptx', '.odt', '.ods', '.odp', '.bin', '.dat',
'.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
}
if path.suffix.lower() in binary_exts:
return True
# 检查文件内容
try:
file_size = path.stat().st_size
if file_size == 0:
return False
# 读取前 4KB
buffer_size = min(4096, file_size)
with open(path, 'rb') as f:
buffer = f.read(buffer_size)
# 检测 null 字节
if b'\x00' in buffer:
return True
# 统计非打印字符(参考 opencode:202-210)
non_printable = 0
for byte in buffer:
if byte < 9 or (13 < byte < 32):
non_printable += 1
# 如果超过 30% 是非打印字符,认为是二进制
return non_printable / len(buffer) > 0.3
except Exception:
return False
async def _read_from_url(url: str) -> ToolResult:
"""
从 HTTP/HTTPS URL 读取文件内容。
主要用于图片等多媒体资源,自动转换为 base64。
"""
try:
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
response = await client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
raw = response.content
# 从 URL 提取文件名
from urllib.parse import urlparse
parsed = urlparse(url)
filename = Path(parsed.path).name or "downloaded_file"
# 图片文件
if content_type.startswith("image/") or any(url.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"]):
mime_type = content_type.split(";")[0] if content_type else "image/jpeg"
b64_data = base64.b64encode(raw).decode("ascii")
return ToolResult(
title=filename,
output=f"图片文件: {filename} (URL: {url}, MIME: {mime_type}, {len(raw)} bytes)",
metadata={"mime_type": mime_type, "url": url, "truncated": False},
images=[{
"type": "base64",
"media_type": mime_type,
"data": b64_data,
}],
)
# 文本文件
if content_type.startswith("text/") or content_type == "application/json":
text = raw.decode("utf-8", errors="replace")
lines = text.split("\n")
preview = "\n".join(lines[:20])
return ToolResult(
title=filename,
output=f"\n{text}\n",
metadata={
"preview": preview,
"url": url,
"mime_type": content_type,
"total_lines": len(lines),
}
)
# 其他二进制文件
return ToolResult(
title=filename,
output=f"二进制文件: {filename} (URL: {url}, {len(raw)} bytes)",
metadata={"url": url, "mime_type": content_type, "size": len(raw)}
)
except httpx.HTTPStatusError as e:
return ToolResult(
title="HTTP 错误",
output=f"无法下载文件: {url}\nHTTP {e.response.status_code}: {e.response.reason_phrase}",
error=str(e)
)
except Exception as e:
return ToolResult(
title="下载失败",
output=f"无法从 URL 读取文件: {url}\n错误: {str(e)}",
error=str(e)
)