| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- """
- Write Tool - 文件写入工具
- 参考:vendor/opencode/packages/opencode/src/tool/write.ts
- 核心功能:
- - 创建新文件或覆盖现有文件
- - 支持追加模式(append)
- - 生成 diff 预览
- """
- from pathlib import Path
- from typing import Optional
- import difflib
- from agent.tools import tool, ToolResult, ToolContext
- @tool(description="写入文件内容(创建新文件、覆盖现有文件或追加内容)", hidden_params=["context"])
- async def write_file(
- file_path: str,
- content: str,
- append: bool = False,
- context: Optional[ToolContext] = None
- ) -> ToolResult:
- """
- 写入文件
- 参考 OpenCode 实现,并添加追加模式支持
- Args:
- file_path: 文件路径
- content: 文件内容
- append: 是否追加模式(默认 False,覆盖写入)
- context: 工具上下文
- Returns:
- ToolResult: 写入结果
- """
- # 解析路径
- path = Path(file_path)
- if not path.is_absolute():
- path = Path.cwd() / path
- # 检查是否为目录
- if path.exists() and path.is_dir():
- return ToolResult(
- title="路径错误",
- output=f"路径是目录,不是文件: {file_path}",
- error="Path is a directory"
- )
- # 读取旧内容(如果存在)
- existed = path.exists()
- old_content = ""
- if existed:
- try:
- with open(path, 'r', encoding='utf-8') as f:
- old_content = f.read()
- except Exception:
- old_content = ""
- # 确定最终内容
- if append and existed:
- new_content = old_content + content
- else:
- new_content = content
- # 生成 diff
- if existed and old_content:
- diff = _create_diff(str(path), old_content, new_content)
- else:
- diff = f"(新建文件: {path.name})"
- # 确保父目录存在
- path.parent.mkdir(parents=True, exist_ok=True)
- # 写入文件
- try:
- with open(path, 'w', encoding='utf-8') as f:
- f.write(new_content)
- except Exception as e:
- return ToolResult(
- title="写入失败",
- output=f"无法写入文件: {str(e)}",
- error=str(e)
- )
- # 统计
- lines = new_content.count('\n')
- # 构建操作描述
- if append and existed:
- operation = "追加内容到"
- elif existed:
- operation = "覆盖"
- else:
- operation = "创建"
- return ToolResult(
- title=path.name,
- output=f"文件写入成功 ({operation})\n\n{diff}",
- metadata={
- "existed": existed,
- "append": append,
- "lines": lines,
- "diff": diff
- },
- long_term_memory=f"{operation}文件 {path.name}"
- )
- def _create_diff(filepath: str, old_content: str, new_content: str) -> str:
- """生成 unified diff"""
- old_lines = old_content.splitlines(keepends=True)
- new_lines = new_content.splitlines(keepends=True)
- diff_lines = list(difflib.unified_diff(
- old_lines,
- new_lines,
- fromfile=f"a/{filepath}",
- tofile=f"b/{filepath}",
- lineterm=''
- ))
- if not diff_lines:
- return "(无变更)"
- return ''.join(diff_lines)
|