""" Workspace 目录、引用计数、Docker 沙箱容器编排。 目录布局(与 docker-compose 卷一致):: {workspaces_root}/ # 默认 /root/.gateway/workspaces / # 实际数据目录 .gateway/meta.json {shared_root}/ # 默认 /root/.gateway/shared """ from __future__ import annotations import asyncio import json import logging import hashlib from pathlib import Path from typing import Any from utils.env_parse import env_bool, env_str from gateway.core.lifecycle.errors import LifecycleError, WorkspaceDockerError from gateway.core.lifecycle.workspace.docker_runner import WorkspaceDockerRunner logger = logging.getLogger(__name__) def workspace_subdir_key(workspace_id: str) -> str: return hashlib.sha256(workspace_id.encode("utf-8")).hexdigest() class WorkspaceManager: def __init__( self, *, workspaces_root: Path, shared_root: Path, docker_runner: WorkspaceDockerRunner, docker_required: bool, ) -> None: self._workspaces_root = workspaces_root self._shared_root = shared_root self._docker = docker_runner self._docker_required = docker_required self._lock = asyncio.Lock() self._refs: dict[str, set[str]] = {} self._trace_to_workspace: dict[str, str] = {} @classmethod def from_env(cls, docker_runner: WorkspaceDockerRunner | None = None) -> WorkspaceManager: ws = Path(env_str("GATEWAY_WORKSPACES_ROOT", "/root/.gateway/workspaces")).expanduser() sh = Path(env_str("GATEWAY_SHARED_ROOT", "/root/.gateway/shared")).expanduser() runner = docker_runner or WorkspaceDockerRunner.from_env() required = env_bool("GATEWAY_WORKSPACE_DOCKER_REQUIRED", False) return cls( workspaces_root=ws, shared_root=sh, docker_runner=runner, docker_required=required, ) def _workspace_dir(self, workspace_id: str) -> Path: return self._workspaces_root / workspace_subdir_key(workspace_id) def _meta_path(self, workspace_id: str) -> Path: return self._workspace_dir(workspace_id) / ".gateway" / "meta.json" def _load_meta(self, workspace_id: str) -> dict[str, Any]: p = self._meta_path(workspace_id) if not p.is_file(): return {} try: return json.loads(p.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return {} def get_workspace_container_id(self, workspace_id: str) -> str | None: """同步读取 meta 中的 Workspace 沙箱容器 ID(供 Gateway 调用 Agent API 时注入)。""" cid = self._load_meta(workspace_id).get("workspace_container_id") if cid is None: return None s = str(cid).strip() return s or None def _save_meta(self, workspace_id: str, data: dict[str, Any]) -> None: d = self._workspace_dir(workspace_id) / ".gateway" d.mkdir(parents=True, exist_ok=True) p = d / "meta.json" p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") async def create_workspace(self, workspace_id: str) -> str: """创建 Workspace 目录并返回绝对路径(幂等)。""" async with self._lock: return await self._create_workspace_unlocked(workspace_id) async def _create_workspace_unlocked(self, workspace_id: str) -> str: path = self._workspace_dir(workspace_id) path.mkdir(parents=True, exist_ok=True) self._shared_root.mkdir(parents=True, exist_ok=True) meta = self._load_meta(workspace_id) meta.setdefault("workspace_id", workspace_id) meta.setdefault("trace_refs", []) self._save_meta(workspace_id, meta) for tid in meta.get("trace_refs") or []: if isinstance(tid, str) and tid: self._refs.setdefault(workspace_id, set()).add(tid) self._trace_to_workspace[tid] = workspace_id return str(path.resolve()) async def get_workspace_path(self, workspace_id: str) -> str: path = self._workspace_dir(workspace_id) if not path.is_dir(): raise LifecycleError(f"Workspace 不存在: {workspace_id}") return str(path.resolve()) async def ensure_session(self, workspace_id: str) -> str: """ 会话启动:保证目录、共享目录存在,并按策略启动 Workspace 容器。 返回 Workspace 目录绝对路径。 """ async with self._lock: ws_path_str = await self._create_workspace_unlocked(workspace_id) ws_path = Path(ws_path_str) subdir = workspace_subdir_key(workspace_id) try: cid = self._docker.ensure_workspace_container( workspace_subdir=subdir, workspace_host_path=ws_path, shared_host_path=self._shared_root, ) meta = self._load_meta(workspace_id) if cid: meta["workspace_container_id"] = cid self._save_meta(workspace_id, meta) except WorkspaceDockerError as e: logger.exception("Workspace Docker 失败 workspace_id=%s", workspace_id) if self._docker_required: raise logger.warning("Docker 未强制要求,继续无沙箱容器:%s", e) return ws_path_str async def add_trace_ref(self, workspace_id: str, trace_id: str) -> None: async with self._lock: await self._create_workspace_unlocked(workspace_id) s = self._refs.setdefault(workspace_id, set()) s.add(trace_id) self._trace_to_workspace[trace_id] = workspace_id meta = self._load_meta(workspace_id) meta["workspace_id"] = workspace_id meta["trace_refs"] = sorted(s) self._save_meta(workspace_id, meta) async def remove_trace_ref(self, workspace_id: str, trace_id: str) -> None: async with self._lock: s = self._refs.setdefault(workspace_id, set()) s.discard(trace_id) self._trace_to_workspace.pop(trace_id, None) meta = self._load_meta(workspace_id) meta["trace_refs"] = sorted(s) self._save_meta(workspace_id, meta) async def cleanup_workspace(self, workspace_id: str, *, force: bool = False) -> None: async with self._lock: refs = self._refs.get(workspace_id) or set() meta_refs = set(self._load_meta(workspace_id).get("trace_refs") or []) active = refs | meta_refs if active and not force: raise LifecycleError(f"Workspace 仍有 {len(active)} 个 Trace 引用,拒绝清理") self._refs.pop(workspace_id, None) for tid in list(meta_refs): self._trace_to_workspace.pop(tid, None) meta = self._load_meta(workspace_id) meta["trace_refs"] = [] self._save_meta(workspace_id, meta) if force: import shutil p = self._workspace_dir(workspace_id) if p.is_dir(): shutil.rmtree(p, ignore_errors=True) async def list_workspaces(self) -> list[dict[str, Any]]: async with self._lock: out: list[dict[str, Any]] = [] if not self._workspaces_root.is_dir(): return out for child in self._workspaces_root.iterdir(): if not child.is_dir() or child.name.startswith("."): continue meta_path = child / ".gateway" / "meta.json" if not meta_path.is_file(): continue try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue wid = str(meta.get("workspace_id") or child.name) ref_count = len(meta.get("trace_refs") or []) container_id = meta.get("workspace_container_id") mem_refs = len(self._refs.get(wid, ())) ref_count = max(ref_count, mem_refs) out.append( { "workspace_id": wid, "path": str(child.resolve()), "ref_count": ref_count, "workspace_container_id": container_id, } ) return out def get_workspace_id_for_trace(self, trace_id: str) -> str | None: return self._trace_to_workspace.get(trace_id) async def stop_workspace_sandbox(self, workspace_id: str) -> None: """停止该 workspace 的沙箱容器(不删目录、不改 trace 引用)。""" subdir = workspace_subdir_key(workspace_id) await asyncio.to_thread(self._docker.stop_workspace_container, subdir)