write.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. """
  2. Write Tool - 文件写入工具
  3. 参考:vendor/opencode/packages/opencode/src/tool/write.ts
  4. 核心功能:
  5. - 创建新文件或覆盖现有文件
  6. - 支持追加模式(append)
  7. - 生成 diff 预览
  8. """
  9. from pathlib import Path
  10. from typing import Optional
  11. import difflib
  12. from agent.tools import tool, ToolResult, ToolContext
  13. @tool(description="写入文件内容(创建新文件、覆盖现有文件或追加内容)")
  14. async def write_file(
  15. file_path: str,
  16. content: str,
  17. append: bool = False,
  18. context: Optional[ToolContext] = None
  19. ) -> ToolResult:
  20. """
  21. 写入文件
  22. 参考 OpenCode 实现,并添加追加模式支持
  23. Args:
  24. file_path: 文件路径
  25. content: 文件内容
  26. append: 是否追加模式(默认 False,覆盖写入)
  27. context: 工具上下文
  28. Returns:
  29. ToolResult: 写入结果
  30. """
  31. # 解析路径
  32. path = Path(file_path)
  33. if not path.is_absolute():
  34. path = Path.cwd() / path
  35. # 检查是否为目录
  36. if path.exists() and path.is_dir():
  37. return ToolResult(
  38. title="路径错误",
  39. output=f"路径是目录,不是文件: {file_path}",
  40. error="Path is a directory"
  41. )
  42. # 读取旧内容(如果存在)
  43. existed = path.exists()
  44. old_content = ""
  45. if existed:
  46. try:
  47. with open(path, 'r', encoding='utf-8') as f:
  48. old_content = f.read()
  49. except Exception:
  50. old_content = ""
  51. # 确定最终内容
  52. if append and existed:
  53. new_content = old_content + content
  54. else:
  55. new_content = content
  56. # 生成 diff
  57. if existed and old_content:
  58. diff = _create_diff(str(path), old_content, new_content)
  59. else:
  60. diff = f"(新建文件: {path.name})"
  61. # 确保父目录存在
  62. path.parent.mkdir(parents=True, exist_ok=True)
  63. # 写入文件
  64. try:
  65. with open(path, 'w', encoding='utf-8') as f:
  66. f.write(new_content)
  67. except Exception as e:
  68. return ToolResult(
  69. title="写入失败",
  70. output=f"无法写入文件: {str(e)}",
  71. error=str(e)
  72. )
  73. # 统计
  74. lines = new_content.count('\n')
  75. # 构建操作描述
  76. if append and existed:
  77. operation = "追加内容到"
  78. elif existed:
  79. operation = "覆盖"
  80. else:
  81. operation = "创建"
  82. return ToolResult(
  83. title=path.name,
  84. output=f"文件写入成功 ({operation})\n\n{diff}",
  85. metadata={
  86. "existed": existed,
  87. "append": append,
  88. "lines": lines,
  89. "diff": diff
  90. },
  91. long_term_memory=f"{operation}文件 {path.name}"
  92. )
  93. def _create_diff(filepath: str, old_content: str, new_content: str) -> str:
  94. """生成 unified diff"""
  95. old_lines = old_content.splitlines(keepends=True)
  96. new_lines = new_content.splitlines(keepends=True)
  97. diff_lines = list(difflib.unified_diff(
  98. old_lines,
  99. new_lines,
  100. fromfile=f"a/{filepath}",
  101. tofile=f"b/{filepath}",
  102. lineterm=''
  103. ))
  104. if not diff_lines:
  105. return "(无变更)"
  106. return ''.join(diff_lines)