| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- """
- Bash Tool - 命令执行工具
- 核心功能:
- - 执行 shell 命令
- - 超时控制
- - 工作目录设置
- - 环境变量传递
- - 虚拟环境隔离(Python 命令,强制执行,LLM 不可控)
- - 目录白名单保护
- """
- import os
- import signal
- import asyncio
- import logging
- from pathlib import Path
- from typing import Optional, Dict
- from agent.tools import tool, ToolResult, ToolContext
- # 常量
- DEFAULT_TIMEOUT = 120
- MAX_OUTPUT_LENGTH = 50000
- GRACEFUL_KILL_WAIT = 3
- # ===== 安全配置(模块级,LLM 不可控)=====
- ENABLE_VENV = True # 是否强制启用虚拟环境
- VENV_DIR = ".venv" # 虚拟环境目录名(相对于项目根目录)
- ALLOWED_WORKDIR_PATTERNS = [
- ".",
- "examples",
- "examples/**",
- ".cache",
- ".cache/**",
- "tests",
- "tests/**",
- "output",
- "output/**",
- ]
- PYTHON_KEYWORDS = ["python", "python3", "pip", "pip3", "pytest", "poetry", "uv"]
- logger = logging.getLogger(__name__)
- def _get_project_root() -> Path:
- """获取项目根目录(bash.py 在 agent/tools/builtin/ 下)"""
- return Path(__file__).parent.parent.parent.parent
- def _is_safe_workdir(path: Path) -> bool:
- """检查工作目录是否在白名单内"""
- try:
- project_root = _get_project_root()
- resolved_path = path.resolve()
- resolved_root = project_root.resolve()
- if not resolved_path.is_relative_to(resolved_root):
- return False
- relative_path = resolved_path.relative_to(resolved_root)
- for pattern in ALLOWED_WORKDIR_PATTERNS:
- if pattern == ".":
- if relative_path == Path("."):
- return True
- elif pattern.endswith("/**"):
- base = Path(pattern[:-3])
- if relative_path == base or relative_path.is_relative_to(base):
- return True
- else:
- if relative_path == Path(pattern):
- return True
- return False
- except (ValueError, OSError) as e:
- logger.warning(f"路径检查失败: {e}")
- return False
- def _should_use_venv(command: str) -> bool:
- """判断命令是否应该在虚拟环境中执行"""
- command_lower = command.lower()
- for keyword in PYTHON_KEYWORDS:
- if keyword in command_lower:
- return True
- return False
- async def _ensure_venv(venv_path: Path) -> bool:
- """确保虚拟环境存在,不存在则创建"""
- if os.name == 'nt':
- python_exe = venv_path / "Scripts" / "python.exe"
- else:
- python_exe = venv_path / "bin" / "python"
- if venv_path.exists() and python_exe.exists():
- return True
- # 创建虚拟环境
- print(f"[bash] 正在创建虚拟环境: {venv_path}")
- logger.info(f"创建虚拟环境: {venv_path}")
- try:
- process = await asyncio.create_subprocess_shell(
- f"python -m venv {venv_path}",
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60)
- if process.returncode == 0:
- print(f"[bash] ✅ 虚拟环境创建成功: {venv_path}")
- logger.info(f"虚拟环境创建成功: {venv_path}")
- return True
- else:
- err_text = stderr.decode('utf-8', errors='replace') if stderr else ""
- print(f"[bash] ❌ 虚拟环境创建失败: {err_text[:200]}")
- logger.error(f"虚拟环境创建失败: exit code {process.returncode}, {err_text[:200]}")
- return False
- except Exception as e:
- print(f"[bash] ❌ 虚拟环境创建异常: {e}")
- logger.error(f"虚拟环境创建异常: {e}")
- return False
- def _wrap_command_with_venv(command: str, venv_path: Path) -> str:
- """将命令包装为在虚拟环境中执行"""
- if os.name == 'nt':
- activate_script = venv_path / "Scripts" / "activate.bat"
- return f'call "{activate_script}" && {command}'
- else:
- activate_script = venv_path / "bin" / "activate"
- return f'source "{activate_script}" && {command}'
- def _kill_process_tree(pid: int) -> None:
- """先 SIGTERM 整个进程组,等 GRACEFUL_KILL_WAIT 秒后 SIGKILL 兜底。"""
- import time
- try:
- pgid = os.getpgid(pid)
- except ProcessLookupError:
- return
- try:
- os.killpg(pgid, signal.SIGTERM)
- except ProcessLookupError:
- return
- time.sleep(GRACEFUL_KILL_WAIT)
- try:
- os.killpg(pgid, signal.SIGKILL)
- except ProcessLookupError:
- pass
- @tool(description="执行 bash 命令")
- async def bash_command(
- command: str,
- timeout: Optional[int] = None,
- workdir: Optional[str] = None,
- env: Optional[Dict[str, str]] = None,
- description: str = "",
- context: Optional[ToolContext] = None
- ) -> ToolResult:
- """
- 执行 bash 命令
- Args:
- command: 要执行的命令
- timeout: 超时时间(秒),默认 120 秒
- workdir: 工作目录,默认为当前目录
- env: 环境变量字典(会合并到系统环境变量)
- description: 命令描述(5-10 个词)
- context: 工具上下文
- Returns:
- ToolResult: 命令输出
- """
- if timeout is not None and timeout < 0:
- return ToolResult(
- title="参数错误",
- output=f"无效的 timeout 值: {timeout}。必须是正数。",
- error="Invalid timeout"
- )
- timeout_sec = timeout or DEFAULT_TIMEOUT
- # 工作目录
- cwd = Path(workdir) if workdir else Path.cwd()
- if not cwd.exists():
- return ToolResult(
- title="目录不存在",
- output=f"工作目录不存在: {workdir}",
- error="Directory not found"
- )
- # 目录白名单检查
- if not _is_safe_workdir(cwd):
- project_root = _get_project_root()
- return ToolResult(
- title="目录不允许",
- output=(
- f"工作目录不在白名单内: {cwd}\n"
- f"项目根目录: {project_root}\n"
- f"允许的目录: {', '.join(ALLOWED_WORKDIR_PATTERNS)}"
- ),
- error="Directory not allowed"
- )
- # 虚拟环境处理(强制执行,LLM 不可绕过)
- actual_command = command
- if ENABLE_VENV and _should_use_venv(command):
- venv_dir = _get_project_root() / VENV_DIR
- venv_ok = await _ensure_venv(venv_dir)
- if venv_ok:
- actual_command = _wrap_command_with_venv(command, venv_dir)
- print(f"[bash] 🐍 使用虚拟环境: {venv_dir}")
- logger.info(f"[bash] 使用虚拟环境: {venv_dir}")
- else:
- logger.warning(f"[bash] 虚拟环境不可用,回退到系统环境: {venv_dir}")
- # 准备环境变量
- process_env = os.environ.copy()
- if env:
- process_env.update(env)
- # 执行命令
- try:
- if os.name == 'nt':
- process = await asyncio.create_subprocess_shell(
- actual_command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- cwd=str(cwd),
- env=process_env,
- )
- else:
- process = await asyncio.create_subprocess_shell(
- actual_command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- cwd=str(cwd),
- env=process_env,
- start_new_session=True,
- )
- try:
- stdout, stderr = await asyncio.wait_for(
- process.communicate(),
- timeout=timeout_sec
- )
- except asyncio.TimeoutError:
- _kill_process_tree(process.pid)
- try:
- await asyncio.wait_for(process.wait(), timeout=GRACEFUL_KILL_WAIT + 2)
- except asyncio.TimeoutError:
- pass
- return ToolResult(
- title="命令超时",
- output=f"命令执行超时(>{timeout_sec}s): {command[:100]}",
- error="Timeout",
- metadata={"command": command, "timeout": timeout_sec}
- )
- stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
- stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""
- truncated = False
- if len(stdout_text) > MAX_OUTPUT_LENGTH:
- stdout_text = stdout_text[:MAX_OUTPUT_LENGTH] + f"\n\n(输出被截断,总长度: {len(stdout_text)} 字符)"
- truncated = True
- output = ""
- if stdout_text:
- output += stdout_text
- if stderr_text:
- if output:
- output += "\n\n--- stderr ---\n"
- output += stderr_text
- if not output:
- output = "(命令无输出)"
- exit_code = process.returncode
- success = exit_code == 0
- title = description or f"命令: {command[:50]}"
- if not success:
- title += f" (exit code: {exit_code})"
- return ToolResult(
- title=title,
- output=output,
- metadata={
- "exit_code": exit_code,
- "success": success,
- "truncated": truncated,
- "command": command,
- "cwd": str(cwd)
- },
- error=None if success else f"Command failed with exit code {exit_code}"
- )
- except Exception as e:
- return ToolResult(
- title="执行错误",
- output=f"命令执行失败: {str(e)}",
- error=str(e),
- metadata={"command": command}
- )
|