write.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. Write Tool - 文件写入工具
  3. 参考:vendor/opencode/packages/opencode/src/tool/write.ts
  4. 核心功能:
  5. - 创建新文件或覆盖现有文件
  6. - 支持追加模式(append)
  7. - 生成 diff 预览
  8. """
  9. from pathlib import Path
  10. from typing import Optional
  11. import difflib
  12. from agent.tools import tool, ToolResult, ToolContext
  13. @tool(description="写入文件内容(创建新文件、覆盖现有文件或追加内容)", hidden_params=["context"], groups=["core"])
  14. async def write_file(
  15. file_path: str,
  16. content: str,
  17. append: bool = False,
  18. context: Optional[ToolContext] = None
  19. ) -> ToolResult:
  20. """
  21. 写入文件
  22. 参考 OpenCode 实现,并添加追加模式支持
  23. Args:
  24. file_path: 文件路径
  25. content: 文件内容
  26. append: 是否追加模式(默认 False,覆盖写入)
  27. context: 工具上下文
  28. Returns:
  29. ToolResult: 写入结果
  30. """
  31. # 解析路径
  32. path = Path(file_path)
  33. if not path.is_absolute():
  34. path = Path.cwd() / path
  35. # 检查是否为目录
  36. if path.exists() and path.is_dir():
  37. return ToolResult(
  38. title="路径错误",
  39. output=f"路径是目录,不是文件: {file_path}",
  40. error="Path is a directory"
  41. )
  42. # 读取旧内容(如果存在)
  43. existed = path.exists()
  44. old_content = ""
  45. if existed:
  46. try:
  47. with open(path, 'r', encoding='utf-8') as f:
  48. old_content = f.read()
  49. except Exception:
  50. old_content = ""
  51. # 确定最终内容
  52. if append and existed:
  53. new_content = old_content + content
  54. else:
  55. new_content = content
  56. # 生成 diff
  57. if existed and old_content:
  58. diff = _create_diff(str(path), old_content, new_content)
  59. else:
  60. diff = f"(新建文件: {path.name})"
  61. # 确保父目录存在
  62. path.parent.mkdir(parents=True, exist_ok=True)
  63. # 落盘前自动将内容中的外站图片 URL 替换为自有 CDN 链接(仅处理文本文件)
  64. if not append: # 追加模式不做替换,避免重复处理
  65. try:
  66. from agent.tools.builtin.file.image_cdn import replace_image_urls
  67. new_content = await replace_image_urls(new_content)
  68. except Exception as cdn_err:
  69. import logging
  70. logging.getLogger(__name__).warning("[write_file] CDN mirror step failed, writing original: %s", cdn_err)
  71. # 写入文件
  72. try:
  73. with open(path, 'w', encoding='utf-8') as f:
  74. f.write(new_content)
  75. except Exception as e:
  76. return ToolResult(
  77. title="写入失败",
  78. output=f"无法写入文件: {str(e)}",
  79. error=str(e)
  80. )
  81. # 统计
  82. lines = new_content.count('\n')
  83. # 构建操作描述
  84. if append and existed:
  85. operation = "追加内容到"
  86. elif existed:
  87. operation = "覆盖"
  88. else:
  89. operation = "创建"
  90. return ToolResult(
  91. title=path.name,
  92. output=f"文件写入成功 ({operation})\n\n{diff}",
  93. metadata={
  94. "existed": existed,
  95. "append": append,
  96. "lines": lines,
  97. "diff": diff
  98. },
  99. long_term_memory=f"{operation}文件 {path.name}"
  100. )
  101. def _create_diff(filepath: str, old_content: str, new_content: str) -> str:
  102. """生成 unified diff"""
  103. old_lines = old_content.splitlines(keepends=True)
  104. new_lines = new_content.splitlines(keepends=True)
  105. diff_lines = list(difflib.unified_diff(
  106. old_lines,
  107. new_lines,
  108. fromfile=f"a/{filepath}",
  109. tofile=f"b/{filepath}",
  110. lineterm=''
  111. ))
  112. if not diff_lines:
  113. return "(无变更)"
  114. return ''.join(diff_lines)