""" Docker 内执行工具(模块 ``agent.tools.docker_runner``;与 ``agent.core.runner.AgentRunner`` 无关)。 解析顺序:**ContextVar(Runner 注入)** → **Trace.context['gateway_exec']** → **环境变量默认容器** (``AGENT_DEFAULT_DOCKER_CONTAINER``,可选 ``AGENT_DEFAULT_DOCKER_WORKDIR`` / ``AGENT_DEFAULT_DOCKER_USER``)。有有效 ``docker_container`` 时,``bash_command``、 ``read_file`` / ``write_file`` / ``edit_file`` / ``glob_files`` / ``grep_content`` 走容器内 ``docker exec``;否则仍走原有 builtin(本机)。 - ``GatewayExecResolver`` / ``active_gateway_exec``:``AgentRunner`` 在 ``tools.execute`` 前后 set/reset ContextVar。 - ``BashGatewayDispatcher`` / ``WorkspaceFileToolsDispatcher``:在 ``import agent.tools``(builtin 注册之后)时向 ``ToolRegistry`` 注册包装函数。 需要 API 进程能访问 Docker(例如挂载 ``/var/run/docker.sock``)。 """ from __future__ import annotations import asyncio import base64 import io import json import logging import mimetypes import os import posixpath import tarfile from contextvars import ContextVar from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, ClassVar, Coroutine, Dict, List, Optional, Tuple from urllib.parse import urlparse from agent.tools.builtin.file.edit import replace as edit_replace from agent.tools.builtin.file.grep import LIMIT as GREP_LIMIT from agent.tools.builtin.file.read import DEFAULT_READ_LIMIT, MAX_BYTES, MAX_LINE_LENGTH from agent.tools.builtin.file.write import _create_diff from agent.tools.builtin.glob_tool import LIMIT as GLOB_LIMIT from agent.tools.models import ToolContext, ToolResult if TYPE_CHECKING: from agent.tools.registry import ToolRegistry logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Trace.gateway_exec:ContextVar + 路径解析 # --------------------------------------------------------------------------- class GatewayExecResolver: """从工具 context / ContextVar 解析 ``gateway_exec``,并把用户路径映射到容器内路径。""" ACTIVE: ClassVar[ContextVar[Optional[dict[str, Any]]]] = ContextVar( "active_gateway_exec", default=None ) @classmethod def from_tool_context(cls, context: Any) -> dict[str, Any] | None: if not isinstance(context, dict): return None tc = context.get("trace_context") if not isinstance(tc, dict): return None ge = tc.get("gateway_exec") return ge if isinstance(ge, dict) else None @classmethod def default_gateway_exec_from_env(cls) -> dict[str, Any] | None: """无 Trace.gateway_exec 时,用环境变量指定默认 Workspace 容器(直连 API / 本地调试)。""" container = os.getenv("AGENT_DEFAULT_DOCKER_CONTAINER", "").strip() if not container: return None out: dict[str, Any] = {"docker_container": container} wd = os.getenv("AGENT_DEFAULT_DOCKER_WORKDIR", "").strip() if wd: out["container_workdir"] = wd user = os.getenv("AGENT_DEFAULT_DOCKER_USER", "").strip() if user: out["container_user"] = user return out @classmethod def for_trace_context(cls, trace_context: dict[str, Any] | None) -> dict[str, Any] | None: """Trace.context 中的 gateway_exec 优先,否则环境变量默认容器。""" tc = trace_context or {} ge = tc.get("gateway_exec") if isinstance(ge, dict) and str(ge.get("docker_container") or "").strip(): return ge return cls.default_gateway_exec_from_env() @classmethod def effective(cls, context: Any) -> dict[str, Any] | None: ge = cls.ACTIVE.get() if isinstance(ge, dict) and str(ge.get("docker_container") or "").strip(): return ge if isinstance(context, dict): tc = context.get("trace_context") if isinstance(tc, dict): return cls.for_trace_context(tc) return cls.default_gateway_exec_from_env() @staticmethod def workdir(ge: dict[str, Any]) -> str: w = str(ge.get("container_workdir") or "/home/agent/workspace").strip() return w.rstrip("/") or "/home/agent/workspace" @staticmethod def user(ge: dict[str, Any]) -> str: u = str(ge.get("container_user") or "agent").strip() return u or "agent" @staticmethod def _host_mapping_root() -> str | None: raw = os.getenv("AGENT_WORKSPACE_HOST_PROJECT_ROOT", "").strip() if raw: return str(Path(raw).resolve()) try: return str(Path.cwd().resolve()) except Exception: return None @classmethod def resolve_path(cls, ge: dict[str, Any], user_path: str | None, *, is_dir: bool) -> str | None: wd = cls.workdir(ge) if not user_path or not str(user_path).strip(): return wd if is_dir else None raw = str(user_path).strip().replace("\\", "/") host_root = cls._host_mapping_root() if posixpath.isabs(raw): norm = posixpath.normpath(raw) if norm == wd or norm.startswith(wd + "/"): return norm if host_root: hr = host_root.replace("\\", "/").rstrip("/") if norm == hr or norm.startswith(hr + "/"): rel = posixpath.relpath(norm, hr) if rel.startswith("../"): return None candidate = posixpath.normpath(posixpath.join(wd, rel)) if candidate == wd or candidate.startswith(wd + "/"): return candidate return None return None for seg in raw.split("/"): if seg == "..": return None candidate = posixpath.normpath(posixpath.join(wd, raw)) if candidate == wd or candidate.startswith(wd + "/"): return candidate return None # 兼容旧导入:runner 使用 ``active_gateway_exec.set`` / ``reset`` active_gateway_exec = GatewayExecResolver.ACTIVE gateway_exec_from_tool_context = GatewayExecResolver.from_tool_context effective_gateway_exec = GatewayExecResolver.effective container_workdir = GatewayExecResolver.workdir container_user = GatewayExecResolver.user resolve_container_path = GatewayExecResolver.resolve_path # --------------------------------------------------------------------------- # 单会话:Docker 容器内 exec / 读写 / 工具级 read/write/glob/grep/bash # --------------------------------------------------------------------------- class DockerWorkspaceClient: """绑定一份 ``gateway_exec`` 字典,封装对该 Workspace 容器的所有 I/O。""" __slots__ = ("_ge",) _BINARY_EXTS = frozenset({ ".zip", ".tar", ".gz", ".exe", ".dll", ".so", ".class", ".jar", ".war", ".7z", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".odt", ".ods", ".odp", ".bin", ".dat", ".obj", ".o", ".a", ".lib", ".wasm", ".pyc", ".pyo", }) def __init__(self, ge: dict[str, Any]) -> None: self._ge = ge @property def ge(self) -> dict[str, Any]: return self._ge def container_id(self) -> str | None: c = self._ge.get("docker_container") if c is None: return None s = str(c).strip() return s or None def _docker_container(self): import docker cid = self.container_id() if not cid: raise ValueError("gateway_exec 缺少 docker_container") return docker.from_env().containers.get(cid) def sync_exec_argv( self, argv: List[str], *, workdir: str, environment: Optional[Dict[str, str]] = None, ) -> Tuple[int, bytes, bytes]: ct = self._docker_container() user = GatewayExecResolver.user(self._ge) exit_code, output = ct.exec_run( argv, user=user, workdir=workdir, environment=environment, demux=True, ) if isinstance(output, tuple) and len(output) == 2: stdout_b, stderr_b = output else: stdout_b = output if isinstance(output, (bytes, bytearray)) else b"" stderr_b = b"" if stdout_b is None: stdout_b = b"" if stderr_b is None: stderr_b = b"" code = int(exit_code) if exit_code is not None else -1 return code, bytes(stdout_b), bytes(stderr_b) async def async_exec_argv( self, argv: List[str], *, workdir: str, environment: Optional[Dict[str, str]] = None, ) -> Tuple[int, bytes, bytes]: loop = asyncio.get_running_loop() return await loop.run_in_executor( None, lambda: self.sync_exec_argv(argv, workdir=workdir, environment=environment), ) def sync_read_file_bytes(self, container_path: str) -> bytes: ct = self._docker_container() try: _stat, stream = ct.get_archive(container_path) except Exception as e: logger.debug("get_archive failed path=%s: %s", container_path, e) raise FileNotFoundError(container_path) from e # get_archive 流在部分 SDK/传输下会产出 str,不能 b"".join;str 用 latin-1 按字节还原 parts: list[bytes] = [] for chunk in stream: if isinstance(chunk, (bytes, bytearray, memoryview)): parts.append(bytes(chunk)) elif isinstance(chunk, str): parts.append(chunk.encode("latin-1")) else: parts.append(bytes(chunk)) bio = io.BytesIO(b"".join(parts)) with tarfile.open(fileobj=bio, mode="r") as tar: member = tar.next() if member is None: return b"" if member.isdir(): raise IsADirectoryError(container_path) ef = tar.extractfile(member) if ef is None: return b"" return ef.read() @staticmethod def _posixpath_dir(p: str) -> str: return os.path.dirname(p.replace("\\", "/")) @staticmethod def _posixpath_basename(p: str) -> str: return os.path.basename(p.replace("\\", "/")) def sync_write_file_bytes(self, container_path: str, data: bytes) -> None: ct = self._docker_container() parent = self._posixpath_dir(container_path) or "/" base = self._posixpath_basename(container_path) if not base: raise ValueError("invalid container_path") code, _out, err = self.sync_exec_argv( ["mkdir", "-p", parent], workdir="/", ) if code != 0: raise RuntimeError( f"mkdir -p failed: {parent!r} code={code} stderr={err.decode('utf-8', errors='replace')}" ) tar_stream = io.BytesIO() with tarfile.open(fileobj=tar_stream, mode="w") as tar: ti = tarfile.TarInfo(name=base) ti.size = len(data) ti.mode = 0o644 tar.addfile(ti, io.BytesIO(data)) tar_stream.seek(0) ok = ct.put_archive(parent, tar_stream) if not ok: raise RuntimeError(f"put_archive failed: {container_path!r}") async def async_read_file_bytes(self, container_path: str) -> bytes: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: self.sync_read_file_bytes(container_path)) async def async_write_file_bytes(self, container_path: str, data: bytes) -> None: loop = asyncio.get_running_loop() await loop.run_in_executor(None, lambda: self.sync_write_file_bytes(container_path, data)) def sync_path_exists(self, container_path: str, *, is_dir: bool) -> bool: flag = "d" if is_dir else "f" code, _, _ = self.sync_exec_argv( ["test", "-" + flag, container_path], workdir="/", ) return code == 0 @classmethod def _is_binary_buffer(cls, data: bytes, suffix: str) -> bool: if suffix.lower() in cls._BINARY_EXTS: return True if not data: return False buf = data[:4096] if b"\x00" in buf: return True non_printable = sum(1 for b in buf if b < 9 or (13 < b < 32)) return (non_printable / len(buf)) > 0.3 if buf else False MAX_BASH_OUT: ClassVar[int] = 50_000 async def run_bash_tool( self, command: str, *, timeout: Optional[int], workdir: Optional[str], env: Optional[Dict[str, str]], description: str, ) -> ToolResult: _ = description timeout_sec = timeout if timeout is not None and timeout > 0 else 120 container = self.container_id() if not container: return ToolResult(title="配置错误", output="gateway_exec 缺少 docker_container", error="missing_container") default_wd = GatewayExecResolver.workdir(self._ge) inner_wd = str(workdir).strip() if workdir else default_wd loop = asyncio.get_running_loop() try: code, stdout, stderr = await asyncio.wait_for( loop.run_in_executor( None, lambda: self.sync_exec_argv( ["bash", "-lc", command], workdir=inner_wd, environment=env, ), ), timeout=timeout_sec, ) except asyncio.TimeoutError: return ToolResult( title="命令超时", output=f"docker exec 超时(>{timeout_sec}s): {command[:100]}", error="Timeout", metadata={"command": command, "timeout": timeout_sec}, ) except Exception as e: logger.exception("docker exec 失败 container=%s", container) return ToolResult( title="Docker 执行失败", output=( f"{e}\n\n请确认 API 容器已挂载 /var/run/docker.sock," "且已安装 docker Python 包;容器名在 gateway_exec.docker_container。" ), error="docker_error", ) stdout_text = stdout.decode("utf-8", errors="replace") if stdout else "" stderr_text = stderr.decode("utf-8", errors="replace") if stderr else "" truncated = False if len(stdout_text) > self.MAX_BASH_OUT: stdout_text = stdout_text[: self.MAX_BASH_OUT] + f"\n\n(输出被截断,总长度: {len(stdout_text)} 字符)" truncated = True parts: list[str] = [] if stdout_text: parts.append(stdout_text) if stderr_text: parts.append("\n\n--- stderr ---\n" + stderr_text) output = "\n".join(parts) if parts else "(命令无输出)" ok = code == 0 meta: dict[str, Any] = {"exit_code": code, "docker_container": container, "truncated": truncated} return ToolResult( title=f"docker bash (exit {code})", output=output, error=None if ok else f"exit code {code}", metadata=meta, ) async def tool_read_file(self, file_path: str, offset: int, limit: int) -> ToolResult: cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False) if not cpath: return ToolResult( title="路径无效", output="在 Workspace 模式下路径须相对于工作区根,或为工作区/映射项目根下的绝对路径。", error="invalid_path", ) name = Path(cpath.replace("\\", "/")).name try: raw = await self.async_read_file_bytes(cpath) except FileNotFoundError: return ToolResult( title="文件未找到", output=f"文件不存在: {file_path}", error="File not found", ) except IsADirectoryError: return ToolResult( title="路径错误", output=f"路径是目录: {file_path}", error="Is a directory", ) except Exception as e: logger.exception("workspace read_file") return ToolResult(title="读取失败", output=str(e), error=str(e)) mime_type, _ = mimetypes.guess_type(name) mime_type = mime_type or "" if mime_type.startswith("image/") and mime_type not in ("image/svg+xml", "image/vnd.fastbidsheet"): b64_data = base64.b64encode(raw).decode("ascii") return ToolResult( title=name, output=f"图片文件: {name} (MIME: {mime_type}, {len(raw)} bytes)", metadata={"mime_type": mime_type, "truncated": False, "workspace_container": True}, images=[{"type": "base64", "media_type": mime_type, "data": b64_data}], ) if mime_type == "application/pdf": return ToolResult( title=name, output=f"PDF 文件: {name}", metadata={"mime_type": mime_type, "truncated": False, "workspace_container": True}, ) if self._is_binary_buffer(raw, Path(name).suffix): return ToolResult( title="二进制文件", output=f"无法读取二进制文件: {name}", error="Binary file", ) try: text = raw.decode("utf-8") except UnicodeDecodeError: return ToolResult( title="编码错误", output=f"无法解码文件(非 UTF-8): {name}", error="Encoding error", ) lines_no_keep = text.splitlines() total_lines = len(lines_no_keep) end_line = min(offset + limit, total_lines) output_lines: list[str] = [] total_bytes = 0 truncated_by_bytes = False for i in range(offset, end_line): line = lines_no_keep[i] if len(line) > MAX_LINE_LENGTH: line = line[:MAX_LINE_LENGTH] + "..." line_bytes = len(line.encode("utf-8")) + (1 if output_lines else 0) if total_bytes + line_bytes > MAX_BYTES: truncated_by_bytes = True break output_lines.append(line) total_bytes += line_bytes formatted = [f"{offset + idx + 1:5d}| {ln}" for idx, ln in enumerate(output_lines)] output = "\n" + "\n".join(formatted) last_read_line = offset + len(output_lines) has_more = total_lines > last_read_line truncated = has_more or truncated_by_bytes if truncated_by_bytes: output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 offset 读取第 {last_read_line} 行之后)" elif has_more: output += f"\n\n(还有更多内容。使用 offset 读取第 {last_read_line} 行之后)" else: output += f"\n\n(文件结束 - 共 {total_lines} 行)" output += "\n" preview = "\n".join(output_lines[:20]) return ToolResult( title=name, output=output, metadata={ "preview": preview, "truncated": truncated, "total_lines": total_lines, "read_lines": len(output_lines), "workspace_container": True, }, ) async def tool_write_file(self, file_path: str, content: str, append: bool) -> ToolResult: cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False) if not cpath: return ToolResult(title="路径无效", output="路径不在工作区内。", error="invalid_path") name = Path(cpath.replace("\\", "/")).name if self.sync_path_exists(cpath, is_dir=True): return ToolResult(title="路径错误", output=f"路径是目录: {file_path}", error="Path is a directory") existed = self.sync_path_exists(cpath, is_dir=False) old_content = "" if existed: try: old_content = (await self.async_read_file_bytes(cpath)).decode("utf-8", errors="replace") except Exception: old_content = "" if append and existed: new_content = old_content + content else: new_content = content if existed and old_content: diff = _create_diff(str(file_path), old_content, new_content) else: diff = f"(新建文件: {name})" try: await self.async_write_file_bytes(cpath, new_content.encode("utf-8")) except Exception as e: logger.exception("workspace write_file") return ToolResult(title="写入失败", output=str(e), error=str(e)) lines = new_content.count("\n") if append and existed: operation = "追加内容到" elif existed: operation = "覆盖" else: operation = "创建" return ToolResult( title=name, output=f"文件写入成功 ({operation})\n\n{diff}", metadata={"existed": existed, "append": append, "lines": lines, "diff": diff, "workspace_container": True}, long_term_memory=f"{operation}文件 {name}", ) async def tool_edit_file( self, file_path: str, old_string: str, new_string: str, replace_all: bool, ) -> ToolResult: cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False) if not cpath: return ToolResult(title="路径无效", output="路径不在工作区内。", error="invalid_path") name = Path(cpath.replace("\\", "/")).name if not self.sync_path_exists(cpath, is_dir=False): return ToolResult(title="文件未找到", output=f"文件不存在: {file_path}", error="File not found") if self.sync_path_exists(cpath, is_dir=True): return ToolResult(title="路径错误", output=f"路径是目录: {file_path}", error="Path is a directory") try: content_old = (await self.async_read_file_bytes(cpath)).decode("utf-8") except Exception as e: return ToolResult(title="读取失败", output=str(e), error=str(e)) try: content_new = edit_replace(content_old, old_string, new_string, replace_all) except ValueError as e: return ToolResult(title="替换失败", output=str(e), error=str(e)) diff = _create_diff(file_path, content_old, content_new) try: await self.async_write_file_bytes(cpath, content_new.encode("utf-8")) except Exception as e: return ToolResult(title="写入失败", output=str(e), error=str(e)) return ToolResult( title=name, output=f"编辑成功\n\n{diff}", metadata={ "replace_all": replace_all, "workspace_container": True, "old_lines": content_old.count("\n"), "new_lines": content_new.count("\n"), }, long_term_memory=f"编辑文件 {name}", ) async def tool_glob(self, pattern: str, path: Optional[str]) -> ToolResult: wd = GatewayExecResolver.workdir(self._ge) sp = GatewayExecResolver.resolve_path(self._ge, path, is_dir=True) if path else wd if not sp: return ToolResult(title="路径无效", output="搜索目录无效。", error="invalid_path") if not self.sync_path_exists(sp, is_dir=True): return ToolResult(title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found") cfg = json.dumps({"pattern": pattern, "root": sp, "fetch": GLOB_LIMIT + 1}, ensure_ascii=False) script = ( "import glob,json,os;" "from pathlib import Path;" "c=json.loads(__import__('os').environ['GW_GLOB_CFG']);" "os.chdir(c['root']);pat=c['pattern'];n=int(c['fetch']);" "paths=[str(p) for p in Path('.').glob(pat) if p.is_file()] if '**' in pat " "else [p for p in glob.glob(pat) if os.path.isfile(p)];" "mt=sorted([(p,os.path.getmtime(p)) for p in paths],key=lambda x:-x[1]);" "print(json.dumps([p for p,_ in mt[:n]]))" ) code, out, err = await self.async_exec_argv( ["python3", "-c", script], workdir=sp, environment={"GW_GLOB_CFG": cfg}, ) if code != 0: return ToolResult( title="glob 失败", output=err.decode("utf-8", errors="replace") or out.decode("utf-8", errors="replace"), error="glob_failed", ) try: file_paths: List[str] = json.loads(out.decode("utf-8") or "[]") except json.JSONDecodeError: return ToolResult(title="glob 解析失败", output=out.decode("utf-8", errors="replace"), error="bad_json") truncated = len(file_paths) > GLOB_LIMIT file_paths = file_paths[:GLOB_LIMIT] if not file_paths: output = "未找到匹配的文件" else: output = "\n".join(file_paths) if truncated: output += "\n\n(结果已截断。考虑使用更具体的路径或模式。)" return ToolResult( title=f"Glob: {pattern}", output=output, metadata={"count": len(file_paths), "truncated": truncated, "workspace_container": True}, ) async def tool_grep( self, pattern: str, path: Optional[str], include: Optional[str], ) -> ToolResult: wd = GatewayExecResolver.workdir(self._ge) search_path = GatewayExecResolver.resolve_path(self._ge, path, is_dir=True) if path else wd if not search_path: return ToolResult(title="路径无效", output="搜索目录无效。", error="invalid_path") if not self.sync_path_exists(search_path, is_dir=True): return ToolResult(title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found") args: List[str] = [ "rg", "-nH", "--hidden", "--follow", "--no-messages", "--field-match-separator=|", "--regexp", pattern, ] if include: args.extend(["--glob", include]) args.append(search_path) code, stdout_b, stderr_b = await self.async_exec_argv(args, workdir=search_path) if code == 1: matches: List[Tuple[str, int, str]] = [] elif code != 0 and code != 2: return ToolResult( title="ripgrep 失败", output=stderr_b.decode("utf-8", errors="replace"), error="rg_failed", ) else: matches = [] for line in stdout_b.decode("utf-8", errors="replace").strip().split("\n"): if not line: continue parts = line.split("|", 2) if len(parts) < 3: continue file_path_str, line_num_str, line_text = parts try: matches.append((file_path_str, int(line_num_str), line_text)) except ValueError: continue matches.sort(key=lambda x: x[0], reverse=True) truncated = len(matches) > GREP_LIMIT matches = matches[:GREP_LIMIT] if not matches: output = "未找到匹配" else: output = f"找到 {len(matches)} 个匹配\n" current_file = None for file_path_str, line_num, line_text in matches: if current_file != file_path_str: if current_file is not None: output += "\n" current_file = file_path_str output += f"\n{file_path_str}:\n" if len(line_text) > 2000: line_text = line_text[:2000] + "..." output += f" Line {line_num}: {line_text}\n" if truncated: output += "\n(结果已截断。考虑使用更具体的路径或模式。)" return ToolResult( title=f"搜索: {pattern}", output=output, metadata={"matches": len(matches), "truncated": truncated, "pattern": pattern, "workspace_container": True}, ) # --------------------------------------------------------------------------- # 注册表:bash / 文件工具分发 # --------------------------------------------------------------------------- class BashGatewayDispatcher: """将 ``bash_command`` 覆盖为:有 ``gateway_exec`` 时走 ``DockerWorkspaceClient.run_bash_tool``。""" _builtin: ClassVar[Callable[..., Coroutine[Any, Any, ToolResult]] | None] = None _installed: ClassVar[bool] = False @classmethod def install(cls, registry: ToolRegistry) -> None: if cls._installed: return entry = registry._tools.get("bash_command") if not entry: logger.warning("docker_runner: bash_command 未注册,跳过覆盖") return cls._builtin = entry["func"] schema = entry["schema"] hidden = list(entry.get("hidden_params") or ["context"]) dispatch = cls._make_dispatch() registry.register( dispatch, schema=schema, hidden_params=hidden, inject_params=dict(entry.get("inject_params") or {}), requires_confirmation=entry["ui_metadata"].get("requires_confirmation", False), editable_params=list(entry["ui_metadata"].get("editable_params") or []), display=dict(entry["ui_metadata"].get("display") or {}), url_patterns=entry.get("url_patterns"), ) cls._installed = True logger.info("bash_command 已启用 gateway_exec → docker exec 分发") @classmethod def _make_dispatch(cls) -> Callable[..., Coroutine[Any, Any, ToolResult]]: async def bash_command( command: str, timeout: Optional[int] = None, workdir: Optional[str] = None, env: Optional[Dict[str, str]] = None, description: str = "", context: Optional[ToolContext] = None, ) -> ToolResult: ge = GatewayExecResolver.effective(context) if ge: ws = DockerWorkspaceClient(ge) if ws.container_id(): return await ws.run_bash_tool( command, timeout=timeout, workdir=workdir, env=env, description=description, ) if cls._builtin is None: return ToolResult(title="内部错误", output="builtin bash_command 未初始化", error="no_builtin") return await cls._builtin( command=command, timeout=timeout, workdir=workdir, env=env, description=description, context=context, ) bash_command.__name__ = "bash_command" bash_command.__doc__ = ( "执行 bash 命令(Trace.gateway_exec 或 AGENT_DEFAULT_DOCKER_CONTAINER 时在容器内 docker exec)" ) return bash_command class WorkspaceFileToolsDispatcher: """将 read/write/edit/glob/grep 在有 ``gateway_exec`` 时转发到 ``DockerWorkspaceClient``。""" _orig: ClassVar[dict[str, Callable[..., Coroutine[Any, Any, ToolResult]]]] = {} _installed: ClassVar[bool] = False @classmethod def install(cls, registry: ToolRegistry) -> None: if cls._installed: return async def read_file( file_path: str, offset: int = 0, limit: int = DEFAULT_READ_LIMIT, context: Optional[ToolContext] = None, ) -> ToolResult: ge = GatewayExecResolver.effective(context) parsed = urlparse(file_path) if parsed.scheme in ("http", "https"): return await cls._orig["read_file"](file_path=file_path, offset=offset, limit=limit, context=context) if ge and ge.get("docker_container"): return await DockerWorkspaceClient(ge).tool_read_file(file_path, offset, limit) return await cls._orig["read_file"](file_path=file_path, offset=offset, limit=limit, context=context) async def write_file( file_path: str, content: str, append: bool = False, context: Optional[ToolContext] = None, ) -> ToolResult: ge = GatewayExecResolver.effective(context) if ge and ge.get("docker_container"): return await DockerWorkspaceClient(ge).tool_write_file(file_path, content, append) return await cls._orig["write_file"](file_path=file_path, content=content, append=append, context=context) async def edit_file( file_path: str, old_string: str, new_string: str, replace_all: bool = False, context: Optional[ToolContext] = None, ) -> ToolResult: ge = GatewayExecResolver.effective(context) if ge and ge.get("docker_container"): return await DockerWorkspaceClient(ge).tool_edit_file( file_path, old_string, new_string, replace_all ) return await cls._orig["edit_file"]( file_path=file_path, old_string=old_string, new_string=new_string, replace_all=replace_all, context=context, ) async def glob_files( pattern: str, path: Optional[str] = None, context: Optional[ToolContext] = None, ) -> ToolResult: ge = GatewayExecResolver.effective(context) if ge and ge.get("docker_container"): return await DockerWorkspaceClient(ge).tool_glob(pattern, path) return await cls._orig["glob_files"](pattern=pattern, path=path, context=context) async def grep_content( pattern: str, path: Optional[str] = None, include: Optional[str] = None, context: Optional[ToolContext] = None, ) -> ToolResult: ge = GatewayExecResolver.effective(context) if ge and ge.get("docker_container"): return await DockerWorkspaceClient(ge).tool_grep(pattern, path, include) orig = cls._orig["grep_content"] return await orig(pattern=pattern, path=path, include=include, context=context) read_file.__name__ = "read_file" write_file.__name__ = "write_file" edit_file.__name__ = "edit_file" glob_files.__name__ = "glob_files" grep_content.__name__ = "grep_content" for name, fn in [ ("read_file", read_file), ("write_file", write_file), ("edit_file", edit_file), ("glob_files", glob_files), ("grep_content", grep_content), ]: cls._register_override(registry, name, fn) cls._installed = True logger.info("read/write/edit/glob/grep 已启用 gateway_exec → Workspace 容器分发") @classmethod def _register_override( cls, registry: ToolRegistry, name: str, dispatch: Callable[..., Coroutine[Any, Any, ToolResult]], ) -> None: entry = registry._tools.get(name) if not entry: logger.warning("docker_runner: 工具 %s 未注册,跳过覆盖", name) return cls._orig[name] = entry["func"] registry.register( dispatch, schema=entry["schema"], hidden_params=list(entry.get("hidden_params") or []), inject_params=dict(entry.get("inject_params") or {}), requires_confirmation=entry["ui_metadata"].get("requires_confirmation", False), editable_params=list(entry["ui_metadata"].get("editable_params") or []), display=dict(entry["ui_metadata"].get("display") or {}), url_patterns=entry.get("url_patterns"), ) def install_bash_gateway_dispatch(registry: ToolRegistry) -> None: BashGatewayDispatcher.install(registry) def install_workspace_file_tools_dispatch(registry: ToolRegistry) -> None: WorkspaceFileToolsDispatcher.install(registry) __all__ = [ "GatewayExecResolver", "DockerWorkspaceClient", "BashGatewayDispatcher", "WorkspaceFileToolsDispatcher", "active_gateway_exec", "gateway_exec_from_tool_context", "effective_gateway_exec", "container_workdir", "container_user", "resolve_container_path", "install_bash_gateway_dispatch", "install_workspace_file_tools_dispatch", ]