""" Grep Tool - 内容搜索工具 参考:vendor/opencode/packages/opencode/src/tool/grep.ts 核心功能: - 在文件中搜索正则表达式模式 - 支持文件类型过滤 - 按修改时间排序结果 """ import re import subprocess from pathlib import Path from typing import Optional, List, Tuple from agent.tools import tool, ToolResult, ToolContext # 常量 LIMIT = 100 # 最大返回匹配数(参考 opencode grep.ts:107) MAX_LINE_LENGTH = 2000 # 最大行长度(参考 opencode grep.ts:10) @tool(description="在文件内容中搜索模式") async def grep_content( pattern: str, path: Optional[str] = None, include: Optional[str] = None, context: Optional[ToolContext] = None ) -> ToolResult: """ 在文件中搜索正则表达式模式 参考 OpenCode 实现 优先使用 ripgrep(如果可用),否则使用 Python 实现。 Args: pattern: 正则表达式模式 path: 搜索目录(默认当前目录) include: 文件模式(如 "*.py", "*.{ts,tsx}") context: 工具上下文 Returns: ToolResult: 搜索结果 """ # 确定搜索路径 search_path = Path(path) if path else Path.cwd() if not search_path.is_absolute(): search_path = Path.cwd() / search_path if not search_path.exists(): return ToolResult( title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found" ) # 尝试使用 ripgrep try: matches = await _ripgrep_search(pattern, search_path, include) except Exception: # ripgrep 不可用,使用 Python 实现 matches = await _python_search(pattern, search_path, include) # 按修改时间排序(参考 opencode:105) matches_with_mtime = [] for file_path, line_num, line_text in matches: try: mtime = file_path.stat().st_mtime matches_with_mtime.append((file_path, line_num, line_text, mtime)) except Exception: matches_with_mtime.append((file_path, line_num, line_text, 0)) matches_with_mtime.sort(key=lambda x: x[3], reverse=True) # 限制数量 truncated = len(matches_with_mtime) > LIMIT matches_with_mtime = matches_with_mtime[:LIMIT] # 格式化输出(参考 opencode:118-133) if not matches_with_mtime: output = "未找到匹配" else: output = f"找到 {len(matches_with_mtime)} 个匹配\n" current_file = None for file_path, line_num, line_text, _ in matches_with_mtime: if current_file != file_path: if current_file is not None: output += "\n" current_file = file_path output += f"\n{file_path}:\n" # 截断过长的行 if len(line_text) > MAX_LINE_LENGTH: line_text = line_text[:MAX_LINE_LENGTH] + "..." output += f" Line {line_num}: {line_text}\n" if truncated: output += "\n(结果已截断。考虑使用更具体的路径或模式。)" return ToolResult( title=f"搜索: {pattern}", output=output, metadata={ "matches": len(matches_with_mtime), "truncated": truncated, "pattern": pattern } ) async def _ripgrep_search( pattern: str, search_path: Path, include: Optional[str] ) -> List[Tuple[Path, int, str]]: """ 使用 ripgrep 搜索 参考 OpenCode 实现 """ args = [ "rg", "-nH", # 显示行号和文件名 "--hidden", "--follow", "--no-messages", "--field-match-separator=|", "--regexp", pattern ] if include: args.extend(["--glob", include]) args.append(str(search_path)) # 执行 ripgrep process = await subprocess.create_subprocess_exec( *args, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = await process.communicate() exit_code = process.returncode # Exit codes: 0 = matches, 1 = no matches, 2 = errors if exit_code == 1: return [] if exit_code != 0 and exit_code != 2: raise RuntimeError(f"ripgrep failed: {stderr.decode()}") # 解析输出 matches = [] for line in stdout.decode('utf-8', errors='replace').strip().split('\n'): if not line: continue parts = line.split('|', 2) if len(parts) < 3: continue file_path_str, line_num_str, line_text = parts matches.append(( Path(file_path_str), int(line_num_str), line_text )) return matches async def _python_search( pattern: str, search_path: Path, include: Optional[str] ) -> List[Tuple[Path, int, str]]: """ 使用 Python 正则实现搜索(fallback) """ try: regex = re.compile(pattern) except Exception as e: raise ValueError(f"无效的正则表达式: {e}") matches = [] # 确定要搜索的文件 if include: # 简单的 glob 匹配 import glob file_pattern = str(search_path / "**" / include) files = [Path(f) for f in glob.glob(file_pattern, recursive=True)] else: # 搜索所有文本文件 files = [f for f in search_path.rglob("*") if f.is_file()] # 搜索文件内容 for file_path in files: try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: for line_num, line in enumerate(f, 1): if regex.search(line): matches.append((file_path, line_num, line.rstrip('\n'))) # 限制数量避免过多搜索 if len(matches) >= LIMIT * 2: return matches except Exception: continue return matches