| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- """
- Grep Tool - 内容搜索工具
- 参考:vendor/opencode/packages/opencode/src/tool/grep.ts
- 核心功能:
- - 在文件中搜索正则表达式模式
- - 支持文件类型过滤
- - 按修改时间排序结果
- """
- import re
- import subprocess
- from pathlib import Path
- from typing import Optional, List, Tuple
- from agent.tools import tool, ToolResult, ToolContext
- # 常量
- LIMIT = 100 # 最大返回匹配数(参考 opencode grep.ts:107)
- MAX_LINE_LENGTH = 2000 # 最大行长度(参考 opencode grep.ts:10)
- @tool(description="在文件内容中搜索模式")
- async def grep_content(
- pattern: str,
- path: Optional[str] = None,
- include: Optional[str] = None,
- uid: str = "",
- context: Optional[ToolContext] = None
- ) -> ToolResult:
- """
- 在文件中搜索正则表达式模式
- 参考 OpenCode 实现
- 优先使用 ripgrep(如果可用),否则使用 Python 实现。
- Args:
- pattern: 正则表达式模式
- path: 搜索目录(默认当前目录)
- include: 文件模式(如 "*.py", "*.{ts,tsx}")
- uid: 用户 ID
- context: 工具上下文
- Returns:
- ToolResult: 搜索结果
- """
- # 确定搜索路径
- search_path = Path(path) if path else Path.cwd()
- if not search_path.is_absolute():
- search_path = Path.cwd() / search_path
- if not search_path.exists():
- return ToolResult(
- title="目录不存在",
- output=f"搜索目录不存在: {path}",
- error="Directory not found"
- )
- # 尝试使用 ripgrep
- try:
- matches = await _ripgrep_search(pattern, search_path, include)
- except Exception:
- # ripgrep 不可用,使用 Python 实现
- matches = await _python_search(pattern, search_path, include)
- # 按修改时间排序(参考 opencode:105)
- matches_with_mtime = []
- for file_path, line_num, line_text in matches:
- try:
- mtime = file_path.stat().st_mtime
- matches_with_mtime.append((file_path, line_num, line_text, mtime))
- except Exception:
- matches_with_mtime.append((file_path, line_num, line_text, 0))
- matches_with_mtime.sort(key=lambda x: x[3], reverse=True)
- # 限制数量
- truncated = len(matches_with_mtime) > LIMIT
- matches_with_mtime = matches_with_mtime[:LIMIT]
- # 格式化输出(参考 opencode:118-133)
- if not matches_with_mtime:
- output = "未找到匹配"
- else:
- output = f"找到 {len(matches_with_mtime)} 个匹配\n"
- current_file = None
- for file_path, line_num, line_text, _ in matches_with_mtime:
- if current_file != file_path:
- if current_file is not None:
- output += "\n"
- current_file = file_path
- output += f"\n{file_path}:\n"
- # 截断过长的行
- if len(line_text) > MAX_LINE_LENGTH:
- line_text = line_text[:MAX_LINE_LENGTH] + "..."
- output += f" Line {line_num}: {line_text}\n"
- if truncated:
- output += "\n(结果已截断。考虑使用更具体的路径或模式。)"
- return ToolResult(
- title=f"搜索: {pattern}",
- output=output,
- metadata={
- "matches": len(matches_with_mtime),
- "truncated": truncated,
- "pattern": pattern
- }
- )
- async def _ripgrep_search(
- pattern: str,
- search_path: Path,
- include: Optional[str]
- ) -> List[Tuple[Path, int, str]]:
- """
- 使用 ripgrep 搜索
- 参考 OpenCode 实现
- """
- args = [
- "rg",
- "-nH", # 显示行号和文件名
- "--hidden",
- "--follow",
- "--no-messages",
- "--field-match-separator=|",
- "--regexp", pattern
- ]
- if include:
- args.extend(["--glob", include])
- args.append(str(search_path))
- # 执行 ripgrep
- process = await subprocess.create_subprocess_exec(
- *args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
- stdout, stderr = await process.communicate()
- exit_code = process.returncode
- # Exit codes: 0 = matches, 1 = no matches, 2 = errors
- if exit_code == 1:
- return []
- if exit_code != 0 and exit_code != 2:
- raise RuntimeError(f"ripgrep failed: {stderr.decode()}")
- # 解析输出
- matches = []
- for line in stdout.decode('utf-8', errors='replace').strip().split('\n'):
- if not line:
- continue
- parts = line.split('|', 2)
- if len(parts) < 3:
- continue
- file_path_str, line_num_str, line_text = parts
- matches.append((
- Path(file_path_str),
- int(line_num_str),
- line_text
- ))
- return matches
- async def _python_search(
- pattern: str,
- search_path: Path,
- include: Optional[str]
- ) -> List[Tuple[Path, int, str]]:
- """
- 使用 Python 正则实现搜索(fallback)
- """
- try:
- regex = re.compile(pattern)
- except Exception as e:
- raise ValueError(f"无效的正则表达式: {e}")
- matches = []
- # 确定要搜索的文件
- if include:
- # 简单的 glob 匹配
- import glob
- file_pattern = str(search_path / "**" / include)
- files = [Path(f) for f in glob.glob(file_pattern, recursive=True)]
- else:
- # 搜索所有文本文件
- files = [f for f in search_path.rglob("*") if f.is_file()]
- # 搜索文件内容
- for file_path in files:
- try:
- with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
- for line_num, line in enumerate(f, 1):
- if regex.search(line):
- matches.append((file_path, line_num, line.rstrip('\n')))
- # 限制数量避免过多搜索
- if len(matches) >= LIMIT * 2:
- return matches
- except Exception:
- continue
- return matches
|