write.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. """
  2. Write Tool - 文件写入工具
  3. 参考:vendor/opencode/packages/opencode/src/tool/write.ts
  4. 核心功能:
  5. - 创建新文件或覆盖现有文件
  6. - 生成 diff 预览
  7. """
  8. from pathlib import Path
  9. from typing import Optional
  10. import difflib
  11. from agent.tools import tool, ToolResult, ToolContext
  12. @tool(description="写入文件内容(创建新文件或覆盖现有文件)")
  13. async def write_file(
  14. file_path: str,
  15. content: str,
  16. uid: str = "",
  17. context: Optional[ToolContext] = None
  18. ) -> ToolResult:
  19. """
  20. 写入文件
  21. 参考 OpenCode 实现
  22. Args:
  23. file_path: 文件路径
  24. content: 文件内容
  25. uid: 用户 ID
  26. context: 工具上下文
  27. Returns:
  28. ToolResult: 写入结果
  29. """
  30. # 解析路径
  31. path = Path(file_path)
  32. if not path.is_absolute():
  33. path = Path.cwd() / path
  34. # 检查是否为目录
  35. if path.exists() and path.is_dir():
  36. return ToolResult(
  37. title="路径错误",
  38. output=f"路径是目录,不是文件: {file_path}",
  39. error="Path is a directory"
  40. )
  41. # 读取旧内容(如果存在)
  42. existed = path.exists()
  43. old_content = ""
  44. if existed:
  45. try:
  46. with open(path, 'r', encoding='utf-8') as f:
  47. old_content = f.read()
  48. except Exception:
  49. old_content = ""
  50. # 生成 diff
  51. if existed and old_content:
  52. diff = _create_diff(str(path), old_content, content)
  53. else:
  54. diff = f"(新建文件: {path.name})"
  55. # 确保父目录存在
  56. path.parent.mkdir(parents=True, exist_ok=True)
  57. # 写入文件
  58. try:
  59. with open(path, 'w', encoding='utf-8') as f:
  60. f.write(content)
  61. except Exception as e:
  62. return ToolResult(
  63. title="写入失败",
  64. output=f"无法写入文件: {str(e)}",
  65. error=str(e)
  66. )
  67. # 统计
  68. lines = content.count('\n')
  69. return ToolResult(
  70. title=path.name,
  71. output=f"文件写入成功\n\n{diff}",
  72. metadata={
  73. "existed": existed,
  74. "lines": lines,
  75. "diff": diff
  76. },
  77. long_term_memory=f"{'覆盖' if existed else '创建'}文件 {path.name}"
  78. )
  79. def _create_diff(filepath: str, old_content: str, new_content: str) -> str:
  80. """生成 unified diff"""
  81. old_lines = old_content.splitlines(keepends=True)
  82. new_lines = new_content.splitlines(keepends=True)
  83. diff_lines = list(difflib.unified_diff(
  84. old_lines,
  85. new_lines,
  86. fromfile=f"a/{filepath}",
  87. tofile=f"b/{filepath}",
  88. lineterm=''
  89. ))
  90. if not diff_lines:
  91. return "(无变更)"
  92. return ''.join(diff_lines)