docker_runner.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """
  2. 为每个 Workspace 启动 ``agent/workspace:latest`` 类沙箱容器,挂载:
  3. - 该 Workspace 目录 → 容器 ``/home/agent/workspace``
  4. - 共享目录 → 容器 ``/home/agent/shared``
  5. 挂载策略(``GATEWAY_WORKSPACE_MOUNT_MODE``):
  6. - ``bind``(默认):使用宿主机/当前命名空间下的目录路径做 bind mount(Gateway 在本机直连 Docker 时可用)。
  7. - ``volume_subpath``:使用命名卷 + ``VolumeOptions.Subpath``(Gateway 在 Compose 内且与数据卷在同一 Docker 守护进程时推荐;需较新 Docker Engine)。
  8. """
  9. from __future__ import annotations
  10. import logging
  11. import re
  12. from pathlib import Path
  13. from typing import Any
  14. from utils.env_parse import env_bool, env_str
  15. from gateway.core.lifecycle.errors import WorkspaceDockerError
  16. logger = logging.getLogger(__name__)
  17. _SAFE_NAME_RE = re.compile(r"[^a-z0-9._-]+", re.IGNORECASE)
  18. def container_name_for_subdir(workspace_subdir: str) -> str:
  19. """Docker 容器名最长 63;workspace_subdir 为 64 位 hex,截断前缀保证唯一性足够。"""
  20. safe = _SAFE_NAME_RE.sub("-", workspace_subdir.lower()).strip("-")
  21. if not safe:
  22. safe = "ws"
  23. base = f"gws-{safe[:50]}"
  24. return base[:63]
  25. class WorkspaceDockerRunner:
  26. def __init__(
  27. self,
  28. *,
  29. image: str,
  30. network: str | None,
  31. mount_mode: str,
  32. workspace_volume: str | None,
  33. shared_volume: str | None,
  34. docker_enabled: bool,
  35. ) -> None:
  36. self._image = image
  37. self._network = network
  38. self._mount_mode = mount_mode
  39. self._workspace_volume = workspace_volume
  40. self._shared_volume = shared_volume
  41. self._enabled = docker_enabled
  42. self._client: Any = None
  43. @classmethod
  44. def from_env(cls) -> WorkspaceDockerRunner:
  45. net = env_str("GATEWAY_WORKSPACE_DOCKER_NETWORK", "")
  46. wvol = env_str("GATEWAY_WORKSPACE_DOCKER_VOLUME", "")
  47. svol = env_str("GATEWAY_SHARED_DOCKER_VOLUME", "")
  48. return cls(
  49. image=env_str("GATEWAY_WORKSPACE_IMAGE", "agent/workspace:latest"),
  50. network=net or None,
  51. mount_mode=env_str("GATEWAY_WORKSPACE_MOUNT_MODE", "bind").lower(),
  52. workspace_volume=wvol or None,
  53. shared_volume=svol or None,
  54. docker_enabled=env_bool("GATEWAY_WORKSPACE_DOCKER_ENABLED", True),
  55. )
  56. def _get_client(self) -> Any:
  57. if self._client is not None:
  58. return self._client
  59. import docker
  60. try:
  61. self._client = docker.from_env()
  62. except Exception as e:
  63. raise WorkspaceDockerError(f"无法连接 Docker:{e}") from e
  64. return self._client
  65. def _build_mounts(
  66. self,
  67. *,
  68. workspace_host_path: Path,
  69. shared_host_path: Path,
  70. workspace_subdir: str,
  71. ) -> list[dict[str, Any]]:
  72. if self._mount_mode == "volume_subpath":
  73. if not self._workspace_volume or not self._shared_volume:
  74. raise WorkspaceDockerError(
  75. "volume_subpath 模式需设置 GATEWAY_WORKSPACE_DOCKER_VOLUME 与 GATEWAY_SHARED_DOCKER_VOLUME"
  76. )
  77. m_ws: dict[str, Any] = {
  78. "Type": "volume",
  79. "Source": self._workspace_volume,
  80. "Target": "/home/agent/workspace",
  81. "VolumeOptions": {"Subpath": workspace_subdir},
  82. }
  83. m_sh: dict[str, Any] = {
  84. "Type": "volume",
  85. "Source": self._shared_volume,
  86. "Target": "/home/agent/shared",
  87. }
  88. return [m_ws, m_sh]
  89. ws_abs = str(workspace_host_path.resolve())
  90. sh_abs = str(shared_host_path.resolve())
  91. return [
  92. {"Type": "bind", "Source": ws_abs, "Target": "/home/agent/workspace"},
  93. {"Type": "bind", "Source": sh_abs, "Target": "/home/agent/shared"},
  94. ]
  95. def ensure_workspace_container(
  96. self,
  97. *,
  98. workspace_subdir: str,
  99. workspace_host_path: Path,
  100. shared_host_path: Path,
  101. ) -> str | None:
  102. """
  103. 保证存在运行中的 Workspace 容器。返回 container id;未启用 Docker 时返回 None。
  104. """
  105. if not self._enabled:
  106. return None
  107. name = container_name_for_subdir(workspace_subdir)
  108. client = self._get_client()
  109. from docker.errors import APIError, NotFound
  110. try:
  111. existing = client.containers.get(name)
  112. if existing.status != "running":
  113. existing.start()
  114. return existing.id
  115. except NotFound:
  116. pass
  117. except APIError as e:
  118. if getattr(e, "status_code", None) != 404:
  119. raise WorkspaceDockerError(f"查询容器 {name} 失败:{e}") from e
  120. try:
  121. run_kw: dict[str, Any] = {
  122. "image": self._image,
  123. "name": name,
  124. "detach": True,
  125. "remove": False,
  126. "entrypoint": ["sleep", "infinity"],
  127. }
  128. if self._network:
  129. run_kw["network"] = self._network
  130. if self._mount_mode == "volume_subpath":
  131. run_kw["mounts"] = self._build_mounts(
  132. workspace_host_path=workspace_host_path,
  133. shared_host_path=shared_host_path,
  134. workspace_subdir=workspace_subdir,
  135. )
  136. else:
  137. ws_abs = str(workspace_host_path.resolve())
  138. sh_abs = str(shared_host_path.resolve())
  139. run_kw["volumes"] = {
  140. ws_abs: {"bind": "/home/agent/workspace", "mode": "rw"},
  141. sh_abs: {"bind": "/home/agent/shared", "mode": "rw"},
  142. }
  143. container = client.containers.run(**run_kw)
  144. cid = getattr(container, "id", None) or container.get("Id")
  145. logger.info(
  146. "Workspace 容器已启动 name=%s id=%s image=%s mode=%s",
  147. name,
  148. cid,
  149. self._image,
  150. self._mount_mode,
  151. )
  152. return str(cid) if cid else name
  153. except Exception as e:
  154. raise WorkspaceDockerError(f"启动 Workspace 容器失败({name}):{e}") from e
  155. def stop_workspace_container(self, workspace_subdir: str) -> None:
  156. """停止该 workspace 对应的沙箱容器;不存在或已停止则忽略(不抛 WorkspaceDockerError)。"""
  157. if not self._enabled:
  158. return
  159. name = container_name_for_subdir(workspace_subdir)
  160. try:
  161. client = self._get_client()
  162. from docker.errors import NotFound
  163. try:
  164. c = client.containers.get(name)
  165. except NotFound:
  166. return
  167. st = getattr(c, "status", None) or ""
  168. if st == "running":
  169. c.stop(timeout=15)
  170. logger.info("Workspace 容器已停止 name=%s", name)
  171. except Exception as e:
  172. logger.warning("停止 Workspace 容器失败 name=%s: %s", name, e)