| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- """
- 为每个 Workspace 启动 ``agent/workspace:latest`` 类沙箱容器,挂载:
- - 该 Workspace 目录 → 容器 ``/home/agent/workspace``
- - 共享目录 → 容器 ``/home/agent/shared``
- 挂载策略(``GATEWAY_WORKSPACE_MOUNT_MODE``):
- - ``bind``(默认):使用宿主机/当前命名空间下的目录路径做 bind mount(Gateway 在本机直连 Docker 时可用)。
- - ``volume_subpath``:使用命名卷 + ``VolumeOptions.Subpath``(Gateway 在 Compose 内且与数据卷在同一 Docker 守护进程时推荐;需较新 Docker Engine)。
- """
- from __future__ import annotations
- import logging
- import re
- from pathlib import Path
- from typing import Any
- from utils.env_parse import env_bool, env_str
- from gateway.core.lifecycle.errors import WorkspaceDockerError
- logger = logging.getLogger(__name__)
- _SAFE_NAME_RE = re.compile(r"[^a-z0-9._-]+", re.IGNORECASE)
- def container_name_for_subdir(workspace_subdir: str) -> str:
- """Docker 容器名最长 63;workspace_subdir 为 64 位 hex,截断前缀保证唯一性足够。"""
- safe = _SAFE_NAME_RE.sub("-", workspace_subdir.lower()).strip("-")
- if not safe:
- safe = "ws"
- base = f"gws-{safe[:50]}"
- return base[:63]
- class WorkspaceDockerRunner:
- def __init__(
- self,
- *,
- image: str,
- network: str | None,
- mount_mode: str,
- workspace_volume: str | None,
- shared_volume: str | None,
- docker_enabled: bool,
- ) -> None:
- self._image = image
- self._network = network
- self._mount_mode = mount_mode
- self._workspace_volume = workspace_volume
- self._shared_volume = shared_volume
- self._enabled = docker_enabled
- self._client: Any = None
- @classmethod
- def from_env(cls) -> WorkspaceDockerRunner:
- net = env_str("GATEWAY_WORKSPACE_DOCKER_NETWORK", "")
- wvol = env_str("GATEWAY_WORKSPACE_DOCKER_VOLUME", "")
- svol = env_str("GATEWAY_SHARED_DOCKER_VOLUME", "")
- return cls(
- image=env_str("GATEWAY_WORKSPACE_IMAGE", "agent/workspace:latest"),
- network=net or None,
- mount_mode=env_str("GATEWAY_WORKSPACE_MOUNT_MODE", "bind").lower(),
- workspace_volume=wvol or None,
- shared_volume=svol or None,
- docker_enabled=env_bool("GATEWAY_WORKSPACE_DOCKER_ENABLED", True),
- )
- def _get_client(self) -> Any:
- if self._client is not None:
- return self._client
- import docker
- try:
- self._client = docker.from_env()
- except Exception as e:
- raise WorkspaceDockerError(f"无法连接 Docker:{e}") from e
- return self._client
- def _build_mounts(
- self,
- *,
- workspace_host_path: Path,
- shared_host_path: Path,
- workspace_subdir: str,
- ) -> list[dict[str, Any]]:
- if self._mount_mode == "volume_subpath":
- if not self._workspace_volume or not self._shared_volume:
- raise WorkspaceDockerError(
- "volume_subpath 模式需设置 GATEWAY_WORKSPACE_DOCKER_VOLUME 与 GATEWAY_SHARED_DOCKER_VOLUME"
- )
- m_ws: dict[str, Any] = {
- "Type": "volume",
- "Source": self._workspace_volume,
- "Target": "/home/agent/workspace",
- "VolumeOptions": {"Subpath": workspace_subdir},
- }
- m_sh: dict[str, Any] = {
- "Type": "volume",
- "Source": self._shared_volume,
- "Target": "/home/agent/shared",
- }
- return [m_ws, m_sh]
- ws_abs = str(workspace_host_path.resolve())
- sh_abs = str(shared_host_path.resolve())
- return [
- {"Type": "bind", "Source": ws_abs, "Target": "/home/agent/workspace"},
- {"Type": "bind", "Source": sh_abs, "Target": "/home/agent/shared"},
- ]
- def ensure_workspace_container(
- self,
- *,
- workspace_subdir: str,
- workspace_host_path: Path,
- shared_host_path: Path,
- ) -> str | None:
- """
- 保证存在运行中的 Workspace 容器。返回 container id;未启用 Docker 时返回 None。
- """
- if not self._enabled:
- return None
- name = container_name_for_subdir(workspace_subdir)
- client = self._get_client()
- from docker.errors import APIError, NotFound
- try:
- existing = client.containers.get(name)
- if existing.status != "running":
- existing.start()
- return existing.id
- except NotFound:
- pass
- except APIError as e:
- if getattr(e, "status_code", None) != 404:
- raise WorkspaceDockerError(f"查询容器 {name} 失败:{e}") from e
- try:
- run_kw: dict[str, Any] = {
- "image": self._image,
- "name": name,
- "detach": True,
- "remove": False,
- "entrypoint": ["sleep", "infinity"],
- }
- if self._network:
- run_kw["network"] = self._network
- if self._mount_mode == "volume_subpath":
- run_kw["mounts"] = self._build_mounts(
- workspace_host_path=workspace_host_path,
- shared_host_path=shared_host_path,
- workspace_subdir=workspace_subdir,
- )
- else:
- ws_abs = str(workspace_host_path.resolve())
- sh_abs = str(shared_host_path.resolve())
- run_kw["volumes"] = {
- ws_abs: {"bind": "/home/agent/workspace", "mode": "rw"},
- sh_abs: {"bind": "/home/agent/shared", "mode": "rw"},
- }
- container = client.containers.run(**run_kw)
- cid = getattr(container, "id", None) or container.get("Id")
- logger.info(
- "Workspace 容器已启动 name=%s id=%s image=%s mode=%s",
- name,
- cid,
- self._image,
- self._mount_mode,
- )
- return str(cid) if cid else name
- except Exception as e:
- raise WorkspaceDockerError(f"启动 Workspace 容器失败({name}):{e}") from e
- def stop_workspace_container(self, workspace_subdir: str) -> None:
- """停止该 workspace 对应的沙箱容器;不存在或已停止则忽略(不抛 WorkspaceDockerError)。"""
- if not self._enabled:
- return
- name = container_name_for_subdir(workspace_subdir)
- try:
- client = self._get_client()
- from docker.errors import NotFound
- try:
- c = client.containers.get(name)
- except NotFound:
- return
- st = getattr(c, "status", None) or ""
- if st == "running":
- c.stop(timeout=15)
- logger.info("Workspace 容器已停止 name=%s", name)
- except Exception as e:
- logger.warning("停止 Workspace 容器失败 name=%s: %s", name, e)
|