grep.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """
  2. Grep Tool - 内容搜索工具
  3. 参考:vendor/opencode/packages/opencode/src/tool/grep.ts
  4. 核心功能:
  5. - 在文件中搜索正则表达式模式
  6. - 支持文件类型过滤
  7. - 按修改时间排序结果
  8. """
  9. import re
  10. import subprocess
  11. from pathlib import Path
  12. from typing import Optional, List, Tuple
  13. from agent.tools import tool, ToolResult, ToolContext
  14. # 常量
  15. LIMIT = 100 # 最大返回匹配数(参考 opencode grep.ts:107)
  16. MAX_LINE_LENGTH = 2000 # 最大行长度(参考 opencode grep.ts:10)
  17. @tool(description="在文件内容中搜索模式", hidden_params=["context"])
  18. async def grep_content(
  19. pattern: str,
  20. path: Optional[str] = None,
  21. include: Optional[str] = None,
  22. context: Optional[ToolContext] = None
  23. ) -> ToolResult:
  24. """
  25. 在文件中搜索正则表达式模式
  26. 参考 OpenCode 实现
  27. 优先使用 ripgrep(如果可用),否则使用 Python 实现。
  28. Args:
  29. pattern: 正则表达式模式
  30. path: 搜索目录(默认当前目录)
  31. include: 文件模式(如 "*.py", "*.{ts,tsx}")
  32. context: 工具上下文
  33. Returns:
  34. ToolResult: 搜索结果
  35. """
  36. # 确定搜索路径
  37. search_path = Path(path) if path else Path.cwd()
  38. if not search_path.is_absolute():
  39. search_path = Path.cwd() / search_path
  40. if not search_path.exists():
  41. return ToolResult(
  42. title="目录不存在",
  43. output=f"搜索目录不存在: {path}",
  44. error="Directory not found"
  45. )
  46. # 尝试使用 ripgrep
  47. try:
  48. matches = await _ripgrep_search(pattern, search_path, include)
  49. except Exception:
  50. # ripgrep 不可用,使用 Python 实现
  51. matches = await _python_search(pattern, search_path, include)
  52. # 按修改时间排序(参考 opencode:105)
  53. matches_with_mtime = []
  54. for file_path, line_num, line_text in matches:
  55. try:
  56. mtime = file_path.stat().st_mtime
  57. matches_with_mtime.append((file_path, line_num, line_text, mtime))
  58. except Exception:
  59. matches_with_mtime.append((file_path, line_num, line_text, 0))
  60. matches_with_mtime.sort(key=lambda x: x[3], reverse=True)
  61. # 限制数量
  62. truncated = len(matches_with_mtime) > LIMIT
  63. matches_with_mtime = matches_with_mtime[:LIMIT]
  64. # 格式化输出(参考 opencode:118-133)
  65. if not matches_with_mtime:
  66. output = "未找到匹配"
  67. else:
  68. output = f"找到 {len(matches_with_mtime)} 个匹配\n"
  69. current_file = None
  70. for file_path, line_num, line_text, _ in matches_with_mtime:
  71. if current_file != file_path:
  72. if current_file is not None:
  73. output += "\n"
  74. current_file = file_path
  75. output += f"\n{file_path}:\n"
  76. # 截断过长的行
  77. if len(line_text) > MAX_LINE_LENGTH:
  78. line_text = line_text[:MAX_LINE_LENGTH] + "..."
  79. output += f" Line {line_num}: {line_text}\n"
  80. if truncated:
  81. output += "\n(结果已截断。考虑使用更具体的路径或模式。)"
  82. return ToolResult(
  83. title=f"搜索: {pattern}",
  84. output=output,
  85. metadata={
  86. "matches": len(matches_with_mtime),
  87. "truncated": truncated,
  88. "pattern": pattern
  89. }
  90. )
  91. async def _ripgrep_search(
  92. pattern: str,
  93. search_path: Path,
  94. include: Optional[str]
  95. ) -> List[Tuple[Path, int, str]]:
  96. """
  97. 使用 ripgrep 搜索
  98. 参考 OpenCode 实现
  99. """
  100. args = [
  101. "rg",
  102. "-nH", # 显示行号和文件名
  103. "--hidden",
  104. "--follow",
  105. "--no-messages",
  106. "--field-match-separator=|",
  107. "--regexp", pattern
  108. ]
  109. if include:
  110. args.extend(["--glob", include])
  111. args.append(str(search_path))
  112. # 执行 ripgrep
  113. process = await subprocess.create_subprocess_exec(
  114. *args,
  115. stdout=subprocess.PIPE,
  116. stderr=subprocess.PIPE
  117. )
  118. stdout, stderr = await process.communicate()
  119. exit_code = process.returncode
  120. # Exit codes: 0 = matches, 1 = no matches, 2 = errors
  121. if exit_code == 1:
  122. return []
  123. if exit_code != 0 and exit_code != 2:
  124. raise RuntimeError(f"ripgrep failed: {stderr.decode()}")
  125. # 解析输出
  126. matches = []
  127. for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
  128. if not line:
  129. continue
  130. parts = line.split('|', 2)
  131. if len(parts) < 3:
  132. continue
  133. file_path_str, line_num_str, line_text = parts
  134. matches.append((
  135. Path(file_path_str),
  136. int(line_num_str),
  137. line_text
  138. ))
  139. return matches
  140. async def _python_search(
  141. pattern: str,
  142. search_path: Path,
  143. include: Optional[str]
  144. ) -> List[Tuple[Path, int, str]]:
  145. """
  146. 使用 Python 正则实现搜索(fallback)
  147. """
  148. try:
  149. regex = re.compile(pattern)
  150. except Exception as e:
  151. raise ValueError(f"无效的正则表达式: {e}")
  152. matches = []
  153. # 确定要搜索的文件
  154. if include:
  155. # 简单的 glob 匹配
  156. import glob
  157. file_pattern = str(search_path / "**" / include)
  158. files = [Path(f) for f in glob.glob(file_pattern, recursive=True)]
  159. else:
  160. # 搜索所有文本文件
  161. files = [f for f in search_path.rglob("*") if f.is_file()]
  162. # 搜索文件内容
  163. for file_path in files:
  164. try:
  165. with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
  166. for line_num, line in enumerate(f, 1):
  167. if regex.search(line):
  168. matches.append((file_path, line_num, line.rstrip('\n')))
  169. # 限制数量避免过多搜索
  170. if len(matches) >= LIMIT * 2:
  171. return matches
  172. except Exception:
  173. continue
  174. return matches