""" Write Tool - 文件写入工具 参考:vendor/opencode/packages/opencode/src/tool/write.ts 核心功能: - 创建新文件或覆盖现有文件 - 支持追加模式(append) - 生成 diff 预览 """ from pathlib import Path from typing import Optional import difflib from agent.tools import tool, ToolResult, ToolContext @tool(description="写入文件内容(创建新文件、覆盖现有文件或追加内容)") async def write_file( file_path: str, content: str, append: bool = False, context: Optional[ToolContext] = None ) -> ToolResult: """ 写入文件 参考 OpenCode 实现,并添加追加模式支持 Args: file_path: 文件路径 content: 文件内容 append: 是否追加模式(默认 False,覆盖写入) context: 工具上下文 Returns: ToolResult: 写入结果 """ # 解析路径 path = Path(file_path) if not path.is_absolute(): path = Path.cwd() / path # 检查是否为目录 if path.exists() and path.is_dir(): return ToolResult( title="路径错误", output=f"路径是目录,不是文件: {file_path}", error="Path is a directory" ) # 读取旧内容(如果存在) existed = path.exists() old_content = "" if existed: try: with open(path, 'r', encoding='utf-8') as f: old_content = f.read() except Exception: old_content = "" # 确定最终内容 if append and existed: new_content = old_content + content else: new_content = content # 生成 diff if existed and old_content: diff = _create_diff(str(path), old_content, new_content) else: diff = f"(新建文件: {path.name})" # 确保父目录存在 path.parent.mkdir(parents=True, exist_ok=True) # 写入文件 try: with open(path, 'w', encoding='utf-8') as f: f.write(new_content) except Exception as e: return ToolResult( title="写入失败", output=f"无法写入文件: {str(e)}", error=str(e) ) # 统计 lines = new_content.count('\n') # 构建操作描述 if append and existed: operation = "追加内容到" elif existed: operation = "覆盖" else: operation = "创建" return ToolResult( title=path.name, output=f"文件写入成功 ({operation})\n\n{diff}", metadata={ "existed": existed, "append": append, "lines": lines, "diff": diff }, long_term_memory=f"{operation}文件 {path.name}" ) def _create_diff(filepath: str, old_content: str, new_content: str) -> str: """生成 unified diff""" old_lines = old_content.splitlines(keepends=True) new_lines = new_content.splitlines(keepends=True) diff_lines = list(difflib.unified_diff( old_lines, new_lines, fromfile=f"a/{filepath}", tofile=f"b/{filepath}", lineterm='' )) if not diff_lines: return "(无变更)" return ''.join(diff_lines)