""" Bash Tool - 命令执行工具 核心功能: - 执行 shell 命令 - 超时控制 - 工作目录设置 - 环境变量传递 - 虚拟环境隔离(Python 命令,强制执行,LLM 不可控) - 目录白名单保护 """ import os import signal import asyncio import logging from pathlib import Path from typing import Optional, Dict from agent.tools import tool, ToolResult, ToolContext # 常量 DEFAULT_TIMEOUT = 120 MAX_OUTPUT_LENGTH = 50000 GRACEFUL_KILL_WAIT = 3 # ===== 安全配置(模块级,LLM 不可控)===== ENABLE_VENV = True # 是否强制启用虚拟环境 VENV_DIR = ".venv" # 虚拟环境目录名(相对于项目根目录) ALLOWED_WORKDIR_PATTERNS = [ ".", "examples", "examples/**", ".cache", ".cache/**", "tests", "tests/**", "output", "output/**", ] PYTHON_KEYWORDS = ["python", "python3", "pip", "pip3", "pytest", "poetry", "uv"] logger = logging.getLogger(__name__) def _get_project_root() -> Path: """获取项目根目录(bash.py 在 agent/tools/builtin/ 下)""" return Path(__file__).parent.parent.parent.parent def _is_safe_workdir(path: Path) -> bool: """检查工作目录是否在白名单内""" try: project_root = _get_project_root() resolved_path = path.resolve() resolved_root = project_root.resolve() if not resolved_path.is_relative_to(resolved_root): return False relative_path = resolved_path.relative_to(resolved_root) for pattern in ALLOWED_WORKDIR_PATTERNS: if pattern == ".": if relative_path == Path("."): return True elif pattern.endswith("/**"): base = Path(pattern[:-3]) if relative_path == base or relative_path.is_relative_to(base): return True else: if relative_path == Path(pattern): return True return False except (ValueError, OSError) as e: logger.warning(f"路径检查失败: {e}") return False def _should_use_venv(command: str) -> bool: """判断命令是否应该在虚拟环境中执行""" command_lower = command.lower() for keyword in PYTHON_KEYWORDS: if keyword in command_lower: return True return False async def _ensure_venv(venv_path: Path) -> bool: """确保虚拟环境存在,不存在则创建""" if os.name == 'nt': python_exe = venv_path / "Scripts" / "python.exe" else: python_exe = venv_path / "bin" / "python" if venv_path.exists() and python_exe.exists(): return True # 创建虚拟环境 print(f"[bash] 正在创建虚拟环境: {venv_path}") logger.info(f"创建虚拟环境: {venv_path}") try: process = await asyncio.create_subprocess_shell( f"python -m venv {venv_path}", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60) if process.returncode == 0: print(f"[bash] ✅ 虚拟环境创建成功: {venv_path}") logger.info(f"虚拟环境创建成功: {venv_path}") return True else: err_text = stderr.decode('utf-8', errors='replace') if stderr else "" print(f"[bash] ❌ 虚拟环境创建失败: {err_text[:200]}") logger.error(f"虚拟环境创建失败: exit code {process.returncode}, {err_text[:200]}") return False except Exception as e: print(f"[bash] ❌ 虚拟环境创建异常: {e}") logger.error(f"虚拟环境创建异常: {e}") return False def _wrap_command_with_venv(command: str, venv_path: Path) -> str: """将命令包装为在虚拟环境中执行""" if os.name == 'nt': activate_script = venv_path / "Scripts" / "activate.bat" return f'call "{activate_script}" && {command}' else: activate_script = venv_path / "bin" / "activate" return f'source "{activate_script}" && {command}' def _kill_process_tree(pid: int) -> None: """先 SIGTERM 整个进程组,等 GRACEFUL_KILL_WAIT 秒后 SIGKILL 兜底。""" import time try: pgid = os.getpgid(pid) except ProcessLookupError: return try: os.killpg(pgid, signal.SIGTERM) except ProcessLookupError: return time.sleep(GRACEFUL_KILL_WAIT) try: os.killpg(pgid, signal.SIGKILL) except ProcessLookupError: pass @tool(description="执行 bash 命令", hidden_params=["context"]) async def bash_command( command: str, timeout: Optional[int] = None, workdir: Optional[str] = None, env: Optional[Dict[str, str]] = None, description: str = "", context: Optional[ToolContext] = None ) -> ToolResult: """ 执行 bash 命令 Args: command: 要执行的命令 timeout: 超时时间(秒),默认 120 秒 workdir: 工作目录,默认为当前目录 env: 环境变量字典(会合并到系统环境变量) description: 命令描述(5-10 个词) context: 工具上下文 Returns: ToolResult: 命令输出 """ if timeout is not None and timeout < 0: return ToolResult( title="参数错误", output=f"无效的 timeout 值: {timeout}。必须是正数。", error="Invalid timeout" ) timeout_sec = timeout or DEFAULT_TIMEOUT # 工作目录 cwd = Path(workdir) if workdir else Path.cwd() if not cwd.exists(): return ToolResult( title="目录不存在", output=f"工作目录不存在: {workdir}", error="Directory not found" ) # 目录白名单检查 if not _is_safe_workdir(cwd): project_root = _get_project_root() return ToolResult( title="目录不允许", output=( f"工作目录不在白名单内: {cwd}\n" f"项目根目录: {project_root}\n" f"允许的目录: {', '.join(ALLOWED_WORKDIR_PATTERNS)}" ), error="Directory not allowed" ) # 虚拟环境处理(强制执行,LLM 不可绕过) actual_command = command if ENABLE_VENV and _should_use_venv(command): venv_dir = _get_project_root() / VENV_DIR venv_ok = await _ensure_venv(venv_dir) if venv_ok: actual_command = _wrap_command_with_venv(command, venv_dir) print(f"[bash] 🐍 使用虚拟环境: {venv_dir}") logger.info(f"[bash] 使用虚拟环境: {venv_dir}") else: logger.warning(f"[bash] 虚拟环境不可用,回退到系统环境: {venv_dir}") # 准备环境变量 process_env = os.environ.copy() if env: process_env.update(env) # 执行命令 try: if os.name == 'nt': process = await asyncio.create_subprocess_shell( actual_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(cwd), env=process_env, ) else: process = await asyncio.create_subprocess_shell( actual_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(cwd), env=process_env, start_new_session=True, ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout_sec ) except asyncio.TimeoutError: _kill_process_tree(process.pid) try: await asyncio.wait_for(process.wait(), timeout=GRACEFUL_KILL_WAIT + 2) except asyncio.TimeoutError: pass return ToolResult( title="命令超时", output=f"命令执行超时(>{timeout_sec}s): {command[:100]}", error="Timeout", metadata={"command": command, "timeout": timeout_sec} ) stdout_text = stdout.decode('utf-8', errors='replace') if stdout else "" stderr_text = stderr.decode('utf-8', errors='replace') if stderr else "" truncated = False if len(stdout_text) > MAX_OUTPUT_LENGTH: stdout_text = stdout_text[:MAX_OUTPUT_LENGTH] + f"\n\n(输出被截断,总长度: {len(stdout_text)} 字符)" truncated = True output = "" if stdout_text: output += stdout_text if stderr_text: if output: output += "\n\n--- stderr ---\n" output += stderr_text if not output: output = "(命令无输出)" exit_code = process.returncode success = exit_code == 0 title = description or f"命令: {command[:50]}" if not success: title += f" (exit code: {exit_code})" return ToolResult( title=title, output=output, metadata={ "exit_code": exit_code, "success": success, "truncated": truncated, "command": command, "cwd": str(cwd) }, error=None if success else f"Command failed with exit code {exit_code}" ) except Exception as e: return ToolResult( title="执行错误", output=f"命令执行失败: {str(e)}", error=str(e), metadata={"command": command} )