manager.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. """
  2. Workspace 目录、引用计数、Docker 沙箱容器编排。
  3. 目录布局(与 docker-compose 卷一致)::
  4. {workspaces_root}/ # 默认 /root/.gateway/workspaces
  5. <sha256(workspace_id)>/ # 实际数据目录
  6. .gateway/meta.json
  7. {shared_root}/ # 默认 /root/.gateway/shared
  8. """
  9. from __future__ import annotations
  10. import asyncio
  11. import json
  12. import logging
  13. import hashlib
  14. from pathlib import Path
  15. from typing import Any
  16. from utils.env_parse import env_bool, env_str
  17. from gateway.core.lifecycle.errors import LifecycleError, WorkspaceDockerError
  18. from gateway.core.lifecycle.workspace.docker_runner import WorkspaceDockerRunner
  19. logger = logging.getLogger(__name__)
  20. def workspace_subdir_key(workspace_id: str) -> str:
  21. return hashlib.sha256(workspace_id.encode("utf-8")).hexdigest()
  22. class WorkspaceManager:
  23. def __init__(
  24. self,
  25. *,
  26. workspaces_root: Path,
  27. shared_root: Path,
  28. docker_runner: WorkspaceDockerRunner,
  29. docker_required: bool,
  30. ) -> None:
  31. self._workspaces_root = workspaces_root
  32. self._shared_root = shared_root
  33. self._docker = docker_runner
  34. self._docker_required = docker_required
  35. self._lock = asyncio.Lock()
  36. self._refs: dict[str, set[str]] = {}
  37. self._trace_to_workspace: dict[str, str] = {}
  38. @classmethod
  39. def from_env(cls, docker_runner: WorkspaceDockerRunner | None = None) -> WorkspaceManager:
  40. ws = Path(env_str("GATEWAY_WORKSPACES_ROOT", "/root/.gateway/workspaces")).expanduser()
  41. sh = Path(env_str("GATEWAY_SHARED_ROOT", "/root/.gateway/shared")).expanduser()
  42. runner = docker_runner or WorkspaceDockerRunner.from_env()
  43. required = env_bool("GATEWAY_WORKSPACE_DOCKER_REQUIRED", False)
  44. return cls(
  45. workspaces_root=ws,
  46. shared_root=sh,
  47. docker_runner=runner,
  48. docker_required=required,
  49. )
  50. def _workspace_dir(self, workspace_id: str) -> Path:
  51. return self._workspaces_root / workspace_subdir_key(workspace_id)
  52. def _meta_path(self, workspace_id: str) -> Path:
  53. return self._workspace_dir(workspace_id) / ".gateway" / "meta.json"
  54. def _load_meta(self, workspace_id: str) -> dict[str, Any]:
  55. p = self._meta_path(workspace_id)
  56. if not p.is_file():
  57. return {}
  58. try:
  59. return json.loads(p.read_text(encoding="utf-8"))
  60. except (OSError, json.JSONDecodeError):
  61. return {}
  62. def get_workspace_container_id(self, workspace_id: str) -> str | None:
  63. """同步读取 meta 中的 Workspace 沙箱容器 ID(供 Gateway 调用 Agent API 时注入)。"""
  64. cid = self._load_meta(workspace_id).get("workspace_container_id")
  65. if cid is None:
  66. return None
  67. s = str(cid).strip()
  68. return s or None
  69. def _save_meta(self, workspace_id: str, data: dict[str, Any]) -> None:
  70. d = self._workspace_dir(workspace_id) / ".gateway"
  71. d.mkdir(parents=True, exist_ok=True)
  72. p = d / "meta.json"
  73. p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
  74. async def create_workspace(self, workspace_id: str) -> str:
  75. """创建 Workspace 目录并返回绝对路径(幂等)。"""
  76. async with self._lock:
  77. return await self._create_workspace_unlocked(workspace_id)
  78. async def _create_workspace_unlocked(self, workspace_id: str) -> str:
  79. path = self._workspace_dir(workspace_id)
  80. path.mkdir(parents=True, exist_ok=True)
  81. self._shared_root.mkdir(parents=True, exist_ok=True)
  82. meta = self._load_meta(workspace_id)
  83. meta.setdefault("workspace_id", workspace_id)
  84. meta.setdefault("trace_refs", [])
  85. self._save_meta(workspace_id, meta)
  86. for tid in meta.get("trace_refs") or []:
  87. if isinstance(tid, str) and tid:
  88. self._refs.setdefault(workspace_id, set()).add(tid)
  89. self._trace_to_workspace[tid] = workspace_id
  90. return str(path.resolve())
  91. async def get_workspace_path(self, workspace_id: str) -> str:
  92. path = self._workspace_dir(workspace_id)
  93. if not path.is_dir():
  94. raise LifecycleError(f"Workspace 不存在: {workspace_id}")
  95. return str(path.resolve())
  96. async def ensure_session(self, workspace_id: str) -> str:
  97. """
  98. 会话启动:保证目录、共享目录存在,并按策略启动 Workspace 容器。
  99. 返回 Workspace 目录绝对路径。
  100. """
  101. async with self._lock:
  102. ws_path_str = await self._create_workspace_unlocked(workspace_id)
  103. ws_path = Path(ws_path_str)
  104. subdir = workspace_subdir_key(workspace_id)
  105. try:
  106. cid = self._docker.ensure_workspace_container(
  107. workspace_subdir=subdir,
  108. workspace_host_path=ws_path,
  109. shared_host_path=self._shared_root,
  110. )
  111. meta = self._load_meta(workspace_id)
  112. if cid:
  113. meta["workspace_container_id"] = cid
  114. self._save_meta(workspace_id, meta)
  115. except WorkspaceDockerError as e:
  116. logger.exception("Workspace Docker 失败 workspace_id=%s", workspace_id)
  117. if self._docker_required:
  118. raise
  119. logger.warning("Docker 未强制要求,继续无沙箱容器:%s", e)
  120. return ws_path_str
  121. async def add_trace_ref(self, workspace_id: str, trace_id: str) -> None:
  122. async with self._lock:
  123. await self._create_workspace_unlocked(workspace_id)
  124. s = self._refs.setdefault(workspace_id, set())
  125. s.add(trace_id)
  126. self._trace_to_workspace[trace_id] = workspace_id
  127. meta = self._load_meta(workspace_id)
  128. meta["workspace_id"] = workspace_id
  129. meta["trace_refs"] = sorted(s)
  130. self._save_meta(workspace_id, meta)
  131. async def remove_trace_ref(self, workspace_id: str, trace_id: str) -> None:
  132. async with self._lock:
  133. s = self._refs.setdefault(workspace_id, set())
  134. s.discard(trace_id)
  135. self._trace_to_workspace.pop(trace_id, None)
  136. meta = self._load_meta(workspace_id)
  137. meta["trace_refs"] = sorted(s)
  138. self._save_meta(workspace_id, meta)
  139. async def cleanup_workspace(self, workspace_id: str, *, force: bool = False) -> None:
  140. async with self._lock:
  141. refs = self._refs.get(workspace_id) or set()
  142. meta_refs = set(self._load_meta(workspace_id).get("trace_refs") or [])
  143. active = refs | meta_refs
  144. if active and not force:
  145. raise LifecycleError(f"Workspace 仍有 {len(active)} 个 Trace 引用,拒绝清理")
  146. self._refs.pop(workspace_id, None)
  147. for tid in list(meta_refs):
  148. self._trace_to_workspace.pop(tid, None)
  149. meta = self._load_meta(workspace_id)
  150. meta["trace_refs"] = []
  151. self._save_meta(workspace_id, meta)
  152. if force:
  153. import shutil
  154. p = self._workspace_dir(workspace_id)
  155. if p.is_dir():
  156. shutil.rmtree(p, ignore_errors=True)
  157. async def list_workspaces(self) -> list[dict[str, Any]]:
  158. async with self._lock:
  159. out: list[dict[str, Any]] = []
  160. if not self._workspaces_root.is_dir():
  161. return out
  162. for child in self._workspaces_root.iterdir():
  163. if not child.is_dir() or child.name.startswith("."):
  164. continue
  165. meta_path = child / ".gateway" / "meta.json"
  166. if not meta_path.is_file():
  167. continue
  168. try:
  169. meta = json.loads(meta_path.read_text(encoding="utf-8"))
  170. except (OSError, json.JSONDecodeError):
  171. continue
  172. wid = str(meta.get("workspace_id") or child.name)
  173. ref_count = len(meta.get("trace_refs") or [])
  174. container_id = meta.get("workspace_container_id")
  175. mem_refs = len(self._refs.get(wid, ()))
  176. ref_count = max(ref_count, mem_refs)
  177. out.append(
  178. {
  179. "workspace_id": wid,
  180. "path": str(child.resolve()),
  181. "ref_count": ref_count,
  182. "workspace_container_id": container_id,
  183. }
  184. )
  185. return out
  186. def get_workspace_id_for_trace(self, trace_id: str) -> str | None:
  187. return self._trace_to_workspace.get(trace_id)
  188. async def stop_workspace_sandbox(self, workspace_id: str) -> None:
  189. """停止该 workspace 的沙箱容器(不删目录、不改 trace 引用)。"""
  190. subdir = workspace_subdir_key(workspace_id)
  191. await asyncio.to_thread(self._docker.stop_workspace_container, subdir)