grep.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """
  2. Grep Tool - 内容搜索工具
  3. 参考:vendor/opencode/packages/opencode/src/tool/grep.ts
  4. 核心功能:
  5. - 在文件中搜索正则表达式模式
  6. - 支持文件类型过滤
  7. - 按修改时间排序结果
  8. """
  9. import re
  10. import subprocess
  11. from pathlib import Path
  12. from typing import Optional, List, Tuple
  13. from agent.tools import tool, ToolResult, ToolContext
  14. # 常量
  15. LIMIT = 100 # 最大返回匹配数(参考 opencode grep.ts:107)
  16. MAX_LINE_LENGTH = 2000 # 最大行长度(参考 opencode grep.ts:10)
  17. @tool(description="在文件内容中搜索模式")
  18. async def grep_content(
  19. pattern: str,
  20. path: Optional[str] = None,
  21. include: Optional[str] = None,
  22. uid: str = "",
  23. context: Optional[ToolContext] = None
  24. ) -> ToolResult:
  25. """
  26. 在文件中搜索正则表达式模式
  27. 参考 OpenCode 实现
  28. 优先使用 ripgrep(如果可用),否则使用 Python 实现。
  29. Args:
  30. pattern: 正则表达式模式
  31. path: 搜索目录(默认当前目录)
  32. include: 文件模式(如 "*.py", "*.{ts,tsx}")
  33. uid: 用户 ID
  34. context: 工具上下文
  35. Returns:
  36. ToolResult: 搜索结果
  37. """
  38. # 确定搜索路径
  39. search_path = Path(path) if path else Path.cwd()
  40. if not search_path.is_absolute():
  41. search_path = Path.cwd() / search_path
  42. if not search_path.exists():
  43. return ToolResult(
  44. title="目录不存在",
  45. output=f"搜索目录不存在: {path}",
  46. error="Directory not found"
  47. )
  48. # 尝试使用 ripgrep
  49. try:
  50. matches = await _ripgrep_search(pattern, search_path, include)
  51. except Exception:
  52. # ripgrep 不可用,使用 Python 实现
  53. matches = await _python_search(pattern, search_path, include)
  54. # 按修改时间排序(参考 opencode:105)
  55. matches_with_mtime = []
  56. for file_path, line_num, line_text in matches:
  57. try:
  58. mtime = file_path.stat().st_mtime
  59. matches_with_mtime.append((file_path, line_num, line_text, mtime))
  60. except Exception:
  61. matches_with_mtime.append((file_path, line_num, line_text, 0))
  62. matches_with_mtime.sort(key=lambda x: x[3], reverse=True)
  63. # 限制数量
  64. truncated = len(matches_with_mtime) > LIMIT
  65. matches_with_mtime = matches_with_mtime[:LIMIT]
  66. # 格式化输出(参考 opencode:118-133)
  67. if not matches_with_mtime:
  68. output = "未找到匹配"
  69. else:
  70. output = f"找到 {len(matches_with_mtime)} 个匹配\n"
  71. current_file = None
  72. for file_path, line_num, line_text, _ in matches_with_mtime:
  73. if current_file != file_path:
  74. if current_file is not None:
  75. output += "\n"
  76. current_file = file_path
  77. output += f"\n{file_path}:\n"
  78. # 截断过长的行
  79. if len(line_text) > MAX_LINE_LENGTH:
  80. line_text = line_text[:MAX_LINE_LENGTH] + "..."
  81. output += f" Line {line_num}: {line_text}\n"
  82. if truncated:
  83. output += "\n(结果已截断。考虑使用更具体的路径或模式。)"
  84. return ToolResult(
  85. title=f"搜索: {pattern}",
  86. output=output,
  87. metadata={
  88. "matches": len(matches_with_mtime),
  89. "truncated": truncated,
  90. "pattern": pattern
  91. }
  92. )
  93. async def _ripgrep_search(
  94. pattern: str,
  95. search_path: Path,
  96. include: Optional[str]
  97. ) -> List[Tuple[Path, int, str]]:
  98. """
  99. 使用 ripgrep 搜索
  100. 参考 OpenCode 实现
  101. """
  102. args = [
  103. "rg",
  104. "-nH", # 显示行号和文件名
  105. "--hidden",
  106. "--follow",
  107. "--no-messages",
  108. "--field-match-separator=|",
  109. "--regexp", pattern
  110. ]
  111. if include:
  112. args.extend(["--glob", include])
  113. args.append(str(search_path))
  114. # 执行 ripgrep
  115. process = await subprocess.create_subprocess_exec(
  116. *args,
  117. stdout=subprocess.PIPE,
  118. stderr=subprocess.PIPE
  119. )
  120. stdout, stderr = await process.communicate()
  121. exit_code = process.returncode
  122. # Exit codes: 0 = matches, 1 = no matches, 2 = errors
  123. if exit_code == 1:
  124. return []
  125. if exit_code != 0 and exit_code != 2:
  126. raise RuntimeError(f"ripgrep failed: {stderr.decode()}")
  127. # 解析输出
  128. matches = []
  129. for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
  130. if not line:
  131. continue
  132. parts = line.split('|', 2)
  133. if len(parts) < 3:
  134. continue
  135. file_path_str, line_num_str, line_text = parts
  136. matches.append((
  137. Path(file_path_str),
  138. int(line_num_str),
  139. line_text
  140. ))
  141. return matches
  142. async def _python_search(
  143. pattern: str,
  144. search_path: Path,
  145. include: Optional[str]
  146. ) -> List[Tuple[Path, int, str]]:
  147. """
  148. 使用 Python 正则实现搜索(fallback)
  149. """
  150. try:
  151. regex = re.compile(pattern)
  152. except Exception as e:
  153. raise ValueError(f"无效的正则表达式: {e}")
  154. matches = []
  155. # 确定要搜索的文件
  156. if include:
  157. # 简单的 glob 匹配
  158. import glob
  159. file_pattern = str(search_path / "**" / include)
  160. files = [Path(f) for f in glob.glob(file_pattern, recursive=True)]
  161. else:
  162. # 搜索所有文本文件
  163. files = [f for f in search_path.rglob("*") if f.is_file()]
  164. # 搜索文件内容
  165. for file_path in files:
  166. try:
  167. with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
  168. for line_num, line in enumerate(f, 1):
  169. if regex.search(line):
  170. matches.append((file_path, line_num, line.rstrip('\n')))
  171. # 限制数量避免过多搜索
  172. if len(matches) >= LIMIT * 2:
  173. return matches
  174. except Exception:
  175. continue
  176. return matches