""" 为每个 Workspace 启动 ``agent/workspace:latest`` 类沙箱容器,挂载: - 该 Workspace 目录 → 容器 ``/home/agent/workspace`` - 共享目录 → 容器 ``/home/agent/shared`` 挂载策略(``GATEWAY_WORKSPACE_MOUNT_MODE``): - ``bind``(默认):使用宿主机/当前命名空间下的目录路径做 bind mount(Gateway 在本机直连 Docker 时可用)。 - ``volume_subpath``:使用命名卷 + ``VolumeOptions.Subpath``(Gateway 在 Compose 内且与数据卷在同一 Docker 守护进程时推荐;需较新 Docker Engine)。 """ from __future__ import annotations import logging import re from pathlib import Path from typing import Any from utils.env_parse import env_bool, env_str from gateway.core.lifecycle.errors import WorkspaceDockerError logger = logging.getLogger(__name__) _SAFE_NAME_RE = re.compile(r"[^a-z0-9._-]+", re.IGNORECASE) def container_name_for_subdir(workspace_subdir: str) -> str: """Docker 容器名最长 63;workspace_subdir 为 64 位 hex,截断前缀保证唯一性足够。""" safe = _SAFE_NAME_RE.sub("-", workspace_subdir.lower()).strip("-") if not safe: safe = "ws" base = f"gws-{safe[:50]}" return base[:63] class WorkspaceDockerRunner: def __init__( self, *, image: str, network: str | None, mount_mode: str, workspace_volume: str | None, shared_volume: str | None, docker_enabled: bool, ) -> None: self._image = image self._network = network self._mount_mode = mount_mode self._workspace_volume = workspace_volume self._shared_volume = shared_volume self._enabled = docker_enabled self._client: Any = None @classmethod def from_env(cls) -> WorkspaceDockerRunner: net = env_str("GATEWAY_WORKSPACE_DOCKER_NETWORK", "") wvol = env_str("GATEWAY_WORKSPACE_DOCKER_VOLUME", "") svol = env_str("GATEWAY_SHARED_DOCKER_VOLUME", "") return cls( image=env_str("GATEWAY_WORKSPACE_IMAGE", "agent/workspace:latest"), network=net or None, mount_mode=env_str("GATEWAY_WORKSPACE_MOUNT_MODE", "bind").lower(), workspace_volume=wvol or None, shared_volume=svol or None, docker_enabled=env_bool("GATEWAY_WORKSPACE_DOCKER_ENABLED", True), ) def _get_client(self) -> Any: if self._client is not None: return self._client import docker try: self._client = docker.from_env() except Exception as e: raise WorkspaceDockerError(f"无法连接 Docker:{e}") from e return self._client def _build_mounts( self, *, workspace_host_path: Path, shared_host_path: Path, workspace_subdir: str, ) -> list[dict[str, Any]]: if self._mount_mode == "volume_subpath": if not self._workspace_volume or not self._shared_volume: raise WorkspaceDockerError( "volume_subpath 模式需设置 GATEWAY_WORKSPACE_DOCKER_VOLUME 与 GATEWAY_SHARED_DOCKER_VOLUME" ) m_ws: dict[str, Any] = { "Type": "volume", "Source": self._workspace_volume, "Target": "/home/agent/workspace", "VolumeOptions": {"Subpath": workspace_subdir}, } m_sh: dict[str, Any] = { "Type": "volume", "Source": self._shared_volume, "Target": "/home/agent/shared", } return [m_ws, m_sh] ws_abs = str(workspace_host_path.resolve()) sh_abs = str(shared_host_path.resolve()) return [ {"Type": "bind", "Source": ws_abs, "Target": "/home/agent/workspace"}, {"Type": "bind", "Source": sh_abs, "Target": "/home/agent/shared"}, ] def ensure_workspace_container( self, *, workspace_subdir: str, workspace_host_path: Path, shared_host_path: Path, ) -> str | None: """ 保证存在运行中的 Workspace 容器。返回 container id;未启用 Docker 时返回 None。 """ if not self._enabled: return None name = container_name_for_subdir(workspace_subdir) client = self._get_client() from docker.errors import APIError, NotFound try: existing = client.containers.get(name) if existing.status != "running": existing.start() return existing.id except NotFound: pass except APIError as e: if getattr(e, "status_code", None) != 404: raise WorkspaceDockerError(f"查询容器 {name} 失败:{e}") from e try: run_kw: dict[str, Any] = { "image": self._image, "name": name, "detach": True, "remove": False, "entrypoint": ["sleep", "infinity"], } if self._network: run_kw["network"] = self._network if self._mount_mode == "volume_subpath": run_kw["mounts"] = self._build_mounts( workspace_host_path=workspace_host_path, shared_host_path=shared_host_path, workspace_subdir=workspace_subdir, ) else: ws_abs = str(workspace_host_path.resolve()) sh_abs = str(shared_host_path.resolve()) run_kw["volumes"] = { ws_abs: {"bind": "/home/agent/workspace", "mode": "rw"}, sh_abs: {"bind": "/home/agent/shared", "mode": "rw"}, } container = client.containers.run(**run_kw) cid = getattr(container, "id", None) or container.get("Id") logger.info( "Workspace 容器已启动 name=%s id=%s image=%s mode=%s", name, cid, self._image, self._mount_mode, ) return str(cid) if cid else name except Exception as e: raise WorkspaceDockerError(f"启动 Workspace 容器失败({name}):{e}") from e def stop_workspace_container(self, workspace_subdir: str) -> None: """停止该 workspace 对应的沙箱容器;不存在或已停止则忽略(不抛 WorkspaceDockerError)。""" if not self._enabled: return name = container_name_for_subdir(workspace_subdir) try: client = self._get_client() from docker.errors import NotFound try: c = client.containers.get(name) except NotFound: return st = getattr(c, "status", None) or "" if st == "running": c.stop(timeout=15) logger.info("Workspace 容器已停止 name=%s", name) except Exception as e: logger.warning("停止 Workspace 容器失败 name=%s: %s", name, e)