| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- """
- Workspace 目录、引用计数、Docker 沙箱容器编排。
- 目录布局(与 docker-compose 卷一致)::
- {workspaces_root}/ # 默认 /root/.gateway/workspaces
- <sha256(workspace_id)>/ # 实际数据目录
- .gateway/meta.json
- {shared_root}/ # 默认 /root/.gateway/shared
- """
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import hashlib
- from pathlib import Path
- from typing import Any
- from utils.env_parse import env_bool, env_str
- from gateway.core.lifecycle.errors import LifecycleError, WorkspaceDockerError
- from gateway.core.lifecycle.workspace.docker_runner import WorkspaceDockerRunner
- logger = logging.getLogger(__name__)
- def workspace_subdir_key(workspace_id: str) -> str:
- return hashlib.sha256(workspace_id.encode("utf-8")).hexdigest()
- class WorkspaceManager:
- def __init__(
- self,
- *,
- workspaces_root: Path,
- shared_root: Path,
- docker_runner: WorkspaceDockerRunner,
- docker_required: bool,
- ) -> None:
- self._workspaces_root = workspaces_root
- self._shared_root = shared_root
- self._docker = docker_runner
- self._docker_required = docker_required
- self._lock = asyncio.Lock()
- self._refs: dict[str, set[str]] = {}
- self._trace_to_workspace: dict[str, str] = {}
- @classmethod
- def from_env(cls, docker_runner: WorkspaceDockerRunner | None = None) -> WorkspaceManager:
- ws = Path(env_str("GATEWAY_WORKSPACES_ROOT", "/root/.gateway/workspaces")).expanduser()
- sh = Path(env_str("GATEWAY_SHARED_ROOT", "/root/.gateway/shared")).expanduser()
- runner = docker_runner or WorkspaceDockerRunner.from_env()
- required = env_bool("GATEWAY_WORKSPACE_DOCKER_REQUIRED", False)
- return cls(
- workspaces_root=ws,
- shared_root=sh,
- docker_runner=runner,
- docker_required=required,
- )
- def _workspace_dir(self, workspace_id: str) -> Path:
- return self._workspaces_root / workspace_subdir_key(workspace_id)
- def _meta_path(self, workspace_id: str) -> Path:
- return self._workspace_dir(workspace_id) / ".gateway" / "meta.json"
- def _load_meta(self, workspace_id: str) -> dict[str, Any]:
- p = self._meta_path(workspace_id)
- if not p.is_file():
- return {}
- try:
- return json.loads(p.read_text(encoding="utf-8"))
- except (OSError, json.JSONDecodeError):
- return {}
- def get_workspace_container_id(self, workspace_id: str) -> str | None:
- """同步读取 meta 中的 Workspace 沙箱容器 ID(供 Gateway 调用 Agent API 时注入)。"""
- cid = self._load_meta(workspace_id).get("workspace_container_id")
- if cid is None:
- return None
- s = str(cid).strip()
- return s or None
- def _save_meta(self, workspace_id: str, data: dict[str, Any]) -> None:
- d = self._workspace_dir(workspace_id) / ".gateway"
- d.mkdir(parents=True, exist_ok=True)
- p = d / "meta.json"
- p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
- async def create_workspace(self, workspace_id: str) -> str:
- """创建 Workspace 目录并返回绝对路径(幂等)。"""
- async with self._lock:
- return await self._create_workspace_unlocked(workspace_id)
- async def _create_workspace_unlocked(self, workspace_id: str) -> str:
- path = self._workspace_dir(workspace_id)
- path.mkdir(parents=True, exist_ok=True)
- self._shared_root.mkdir(parents=True, exist_ok=True)
- meta = self._load_meta(workspace_id)
- meta.setdefault("workspace_id", workspace_id)
- meta.setdefault("trace_refs", [])
- self._save_meta(workspace_id, meta)
- for tid in meta.get("trace_refs") or []:
- if isinstance(tid, str) and tid:
- self._refs.setdefault(workspace_id, set()).add(tid)
- self._trace_to_workspace[tid] = workspace_id
- return str(path.resolve())
- async def get_workspace_path(self, workspace_id: str) -> str:
- path = self._workspace_dir(workspace_id)
- if not path.is_dir():
- raise LifecycleError(f"Workspace 不存在: {workspace_id}")
- return str(path.resolve())
- async def ensure_session(self, workspace_id: str) -> str:
- """
- 会话启动:保证目录、共享目录存在,并按策略启动 Workspace 容器。
- 返回 Workspace 目录绝对路径。
- """
- async with self._lock:
- ws_path_str = await self._create_workspace_unlocked(workspace_id)
- ws_path = Path(ws_path_str)
- subdir = workspace_subdir_key(workspace_id)
- try:
- cid = self._docker.ensure_workspace_container(
- workspace_subdir=subdir,
- workspace_host_path=ws_path,
- shared_host_path=self._shared_root,
- )
- meta = self._load_meta(workspace_id)
- if cid:
- meta["workspace_container_id"] = cid
- self._save_meta(workspace_id, meta)
- except WorkspaceDockerError as e:
- logger.exception("Workspace Docker 失败 workspace_id=%s", workspace_id)
- if self._docker_required:
- raise
- logger.warning("Docker 未强制要求,继续无沙箱容器:%s", e)
- return ws_path_str
- async def add_trace_ref(self, workspace_id: str, trace_id: str) -> None:
- async with self._lock:
- await self._create_workspace_unlocked(workspace_id)
- s = self._refs.setdefault(workspace_id, set())
- s.add(trace_id)
- self._trace_to_workspace[trace_id] = workspace_id
- meta = self._load_meta(workspace_id)
- meta["workspace_id"] = workspace_id
- meta["trace_refs"] = sorted(s)
- self._save_meta(workspace_id, meta)
- async def remove_trace_ref(self, workspace_id: str, trace_id: str) -> None:
- async with self._lock:
- s = self._refs.setdefault(workspace_id, set())
- s.discard(trace_id)
- self._trace_to_workspace.pop(trace_id, None)
- meta = self._load_meta(workspace_id)
- meta["trace_refs"] = sorted(s)
- self._save_meta(workspace_id, meta)
- async def cleanup_workspace(self, workspace_id: str, *, force: bool = False) -> None:
- async with self._lock:
- refs = self._refs.get(workspace_id) or set()
- meta_refs = set(self._load_meta(workspace_id).get("trace_refs") or [])
- active = refs | meta_refs
- if active and not force:
- raise LifecycleError(f"Workspace 仍有 {len(active)} 个 Trace 引用,拒绝清理")
- self._refs.pop(workspace_id, None)
- for tid in list(meta_refs):
- self._trace_to_workspace.pop(tid, None)
- meta = self._load_meta(workspace_id)
- meta["trace_refs"] = []
- self._save_meta(workspace_id, meta)
- if force:
- import shutil
- p = self._workspace_dir(workspace_id)
- if p.is_dir():
- shutil.rmtree(p, ignore_errors=True)
- async def list_workspaces(self) -> list[dict[str, Any]]:
- async with self._lock:
- out: list[dict[str, Any]] = []
- if not self._workspaces_root.is_dir():
- return out
- for child in self._workspaces_root.iterdir():
- if not child.is_dir() or child.name.startswith("."):
- continue
- meta_path = child / ".gateway" / "meta.json"
- if not meta_path.is_file():
- continue
- try:
- meta = json.loads(meta_path.read_text(encoding="utf-8"))
- except (OSError, json.JSONDecodeError):
- continue
- wid = str(meta.get("workspace_id") or child.name)
- ref_count = len(meta.get("trace_refs") or [])
- container_id = meta.get("workspace_container_id")
- mem_refs = len(self._refs.get(wid, ()))
- ref_count = max(ref_count, mem_refs)
- out.append(
- {
- "workspace_id": wid,
- "path": str(child.resolve()),
- "ref_count": ref_count,
- "workspace_container_id": container_id,
- }
- )
- return out
- def get_workspace_id_for_trace(self, trace_id: str) -> str | None:
- return self._trace_to_workspace.get(trace_id)
- async def stop_workspace_sandbox(self, workspace_id: str) -> None:
- """停止该 workspace 的沙箱容器(不删目录、不改 trace 引用)。"""
- subdir = workspace_subdir_key(workspace_id)
- await asyncio.to_thread(self._docker.stop_workspace_container, subdir)
|