kevin.yang 1 day ago
parent
commit
d0c0b44b21
36 changed files with 2417 additions and 375 deletions
  1. 31 21
      agent/core/runner.py
  2. 23 0
      agent/tools/__init__.py
  3. 55 40
      agent/tools/builtin/skill.py
  4. 936 0
      agent/tools/docker_runner.py
  5. 21 0
      agent/trace/run_api.py
  6. 1 3
      api_server.py
  7. 7 0
      docker-compose.yml
  8. 6 4
      docker/Dockerfile.workspace
  9. 3 3
      gateway/README.md
  10. 3 4
      gateway/core/channels/__init__.py
  11. 0 5
      gateway/core/channels/backends/__init__.py
  12. 0 32
      gateway/core/channels/backends/memory_trace.py
  13. 24 5
      gateway/core/channels/feishu/__init__.py
  14. 2 2
      gateway/core/channels/feishu/api.py
  15. 216 133
      gateway/core/channels/feishu/bridge.py
  16. 37 26
      gateway/core/channels/feishu/manager.py
  17. 35 0
      gateway/core/channels/feishu/protocols.py
  18. 65 52
      gateway/core/channels/feishu/router.py
  19. 1 19
      gateway/core/channels/manager.py
  20. 35 3
      gateway/core/channels/protocols.py
  21. 9 10
      gateway/core/channels/router.py
  22. 16 0
      gateway/core/lifecycle/__init__.py
  23. 144 0
      gateway/core/lifecycle/config_watcher.py
  24. 6 0
      gateway/core/lifecycle/errors.py
  25. 9 0
      gateway/core/lifecycle/trace/__init__.py
  26. 92 0
      gateway/core/lifecycle/trace/backend.py
  27. 130 0
      gateway/core/lifecycle/trace/manager.py
  28. 14 0
      gateway/core/lifecycle/workspace/__init__.py
  29. 199 0
      gateway/core/lifecycle/workspace/docker_runner.py
  30. 221 0
      gateway/core/lifecycle/workspace/manager.py
  31. 6 5
      gateway/docs/architecture.md
  32. 6 2
      gateway/docs/core/executor.md
  33. 11 5
      gateway/docs/core/lifecycle.md
  34. 7 1
      requirements.txt
  35. 5 0
      utils/__init__.py
  36. 41 0
      utils/env_parse.py

+ 31 - 21
agent/core/runner.py

@@ -37,6 +37,7 @@ from agent.skill.models import Skill
 from agent.skill.skill_loader import load_skills_from_dir
 from agent.tools import ToolRegistry, get_tool_registry
 from agent.tools.builtin.knowledge import KnowledgeConfig
+from agent.tools.docker_runner import GatewayExecResolver, active_gateway_exec
 from agent.core.prompts import (
     DEFAULT_SYSTEM_PREFIX,
     TRUNCATION_HINT,
@@ -1455,27 +1456,36 @@ class AgentRunner:
                     args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
                     logger.info(f"[Tool Call] {tool_name}({args_display})")
 
-                    tool_result = await self.tools.execute(
-                        tool_name,
-                        tool_args,
-                        uid=config.uid or "",
-                        context={
-                            "uid": config.uid or "",
-                            "store": self.trace_store,
-                            "trace_id": trace_id,
-                            "goal_id": current_goal_id,
-                            "runner": self,
-                            "goal_tree": goal_tree,
-                            "knowledge_config": config.knowledge,
-                            # 新增:侧分支信息
-                            "side_branch": {
-                                "type": side_branch_ctx.type,
-                                "branch_id": side_branch_ctx.branch_id,
-                                "is_side_branch": True,
-                                "max_turns": side_branch_ctx.max_turns,
-                            } if side_branch_ctx else None,
-                        },
-                    )
+                    _gw_tok = None
+                    _gw = GatewayExecResolver.for_trace_context(trace.context or {})
+                    if _gw:
+                        _gw_tok = active_gateway_exec.set(_gw)
+                    try:
+                        tool_result = await self.tools.execute(
+                            tool_name,
+                            tool_args,
+                            uid=config.uid or "",
+                            context={
+                                "uid": config.uid or "",
+                                "store": self.trace_store,
+                                "trace_id": trace_id,
+                                "goal_id": current_goal_id,
+                                "runner": self,
+                                "goal_tree": goal_tree,
+                                "knowledge_config": config.knowledge,
+                                "trace_context": trace.context or {},
+                                # 新增:侧分支信息
+                                "side_branch": {
+                                    "type": side_branch_ctx.type,
+                                    "branch_id": side_branch_ctx.branch_id,
+                                    "is_side_branch": True,
+                                    "max_turns": side_branch_ctx.max_turns,
+                                } if side_branch_ctx else None,
+                            },
+                        )
+                    finally:
+                        if _gw_tok is not None:
+                            active_gateway_exec.reset(_gw_tok)
 
                     # 如果是 goal 工具,记录执行后的状态
                     if tool_name == "goal" and goal_tree:

+ 23 - 0
agent/tools/__init__.py

@@ -2,6 +2,8 @@
 Tools 包 - 工具注册和 Schema 生成
 """
 
+import os
+
 from agent.tools.registry import ToolRegistry, tool, get_tool_registry
 from agent.tools.schema import SchemaGenerator
 from agent.tools.models import ToolResult, ToolContext, ToolContextImpl
@@ -10,6 +12,27 @@ from agent.tools.models import ToolResult, ToolContext, ToolContextImpl
 # noqa: F401 表示这是故意的副作用导入
 import agent.tools.builtin  # noqa: F401
 
+# 默认:bash / 文件类工具在存在 gateway_exec 或 AGENT_DEFAULT_DOCKER_CONTAINER 时走 docker exec
+# (见 agent.tools.docker_runner)。本机执行可设 AGENT_DISABLE_GATEWAY_WORKSPACE_DISPATCH /
+# AGENT_DISABLE_BASH_GATEWAY_DISPATCH=true。
+_reg = get_tool_registry()
+if os.getenv("AGENT_DISABLE_GATEWAY_WORKSPACE_DISPATCH", "").strip().lower() not in (
+    "1",
+    "true",
+    "yes",
+):
+    from agent.tools.docker_runner import install_workspace_file_tools_dispatch
+
+    install_workspace_file_tools_dispatch(_reg)
+if os.getenv("AGENT_DISABLE_BASH_GATEWAY_DISPATCH", "").strip().lower() not in (
+    "1",
+    "true",
+    "yes",
+):
+    from agent.tools.docker_runner import install_bash_gateway_dispatch
+
+    install_bash_gateway_dispatch(_reg)
+
 __all__ = [
 	"ToolRegistry",
 	"tool",

+ 55 - 40
agent/tools/builtin/skill.py

@@ -4,7 +4,9 @@ Skill 工具 - 按需加载 Skill 文件
 Agent 可以调用此工具来加载特定的 skill 文档
 """
 
+import importlib.util
 import os
+import shutil
 import subprocess
 from pathlib import Path
 from typing import Optional
@@ -12,25 +14,54 @@ from typing import Optional
 from agent.tools import tool, ToolResult
 from agent.skill.skill_loader import SkillLoader
 
-# 默认 skills 目录(优先级:项目 skills > 框架 skills)
+# 飞书 openclaw-lark 子模块 skills 根目录(整体搬迁时只改此处或环境变量)
+_FEISHU_OPENCLAW_SKILLS_ROOT = os.path.join(
+    os.getenv("FEISHU_OPENCLAW_ROOT", "./gateway/core/channels/feishu/openclaw-lark"),
+    "skills",
+)
+_FEISHU_OPENCLAW_SKILL_NAMES = (
+    "feishu-bitable",
+    "feishu-calendar",
+    "feishu-channel-rules",
+    "feishu-create-doc",
+    "feishu-fetch-doc",
+    "feishu-im-read",
+    "feishu-task",
+    "feishu-troubleshoot",
+    "feishu-update-doc",
+)
+
+# 默认 skills 目录(优先级:项目 skills > 框架 skills > 飞书 openclaw skills)
 DEFAULT_SKILLS_DIRS = [
-    os.getenv("SKILLS_DIR", "./skills"),      # 项目特定 skills(优先)
-    "./agent/skill/skills",                    # 框架内置 skills
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-bitable",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-calendar",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-channel-rules",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-create-doc",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-fetch-doc",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-im-read",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-task",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-troubleshoot",
-    "./gateway/core/channels/feishu/openclaw-lark/skills/feishu-update-doc",
+    os.getenv("SKILLS_DIR", "./skills"),
+    "./agent/skill/skills",
+    *(os.path.join(_FEISHU_OPENCLAW_SKILLS_ROOT, name) for name in _FEISHU_OPENCLAW_SKILL_NAMES),
 ]
 
 # 默认单一目录(用于 list_skills)
 DEFAULT_SKILLS_DIR = DEFAULT_SKILLS_DIRS[0]
 
 
+def _browser_use_python_package_installed() -> bool:
+    return importlib.util.find_spec("browser_use") is not None
+
+
+def _browser_use_runtime_likely_ready() -> bool:
+    """启发式:常见系统浏览器、Playwright 缓存或 browser-use 配置目录是否存在。"""
+    try:
+        for name in ("chromium", "chromium-browser", "google-chrome", "chrome"):
+            if shutil.which(name):
+                return True
+        pw = Path.home() / ".cache" / "ms-playwright"
+        if pw.is_dir() and any(pw.glob("chromium-*")):
+            return True
+        if (Path.home() / ".browser_use").is_dir():
+            return True
+    except OSError:
+        pass
+    return False
+
+
 def _check_skill_setup(skill_name: str) -> Optional[str]:
     """
     检查 skill 的环境配置,返回缺失依赖的警告信息
@@ -41,35 +72,19 @@ def _check_skill_setup(skill_name: str) -> Optional[str]:
     Returns:
         警告信息(如果有缺失的依赖),否则返回 None
     """
-    # 特殊处理:browser-use skill
+    # browser-use:仓库内从未存在 agent.skill.skills.browser_use.setup,旧逻辑会恒 ImportError 并被吞掉
     if skill_name in ["browser-use", "browser_use"]:
-        try:
-            # 动态导入 browser-use skill 的 setup 模块
-            from agent.skill.skills.browser_use.setup import (
-                _check_browser_use_cli,
-                _check_chromium_installed
-            )
-
-            cli_installed = _check_browser_use_cli()
-            chromium_installed = _check_chromium_installed()
-
-            if not cli_installed or not chromium_installed:
-                warning = "\n⚠️ **Setup Required**\n\n"
-                warning += "The following dependencies are missing:\n\n"
-
-                if not cli_installed:
-                    warning += "- `pip install browser-use`\n"
-                if not chromium_installed:
-                    warning += "- `uvx browser-use install`\n"
-
-                warning += "\nYou can also use the setup tools:\n"
-                warning += "- `check_browser_use()` - Check dependency status\n"
-                warning += "- `install_browser_use_chromium()` - Auto-install Chromium\n\n"
-
-                return warning
-        except ImportError:
-            # Setup 模块不存在,跳过检查
-            pass
+        pkg_ok = _browser_use_python_package_installed()
+        runtime_ok = _browser_use_runtime_likely_ready()
+        if not pkg_ok or not runtime_ok:
+            warning = "\n⚠️ **Setup Required**\n\n"
+            warning += "The following dependencies may be missing:\n\n"
+            if not pkg_ok:
+                warning += "- Python 包:`pip install browser-use`\n"
+            if not runtime_ok:
+                warning += "- 浏览器运行时:安装 Chromium(例如 `uvx browser-use install` 或 `playwright install chromium`)\n"
+            warning += "\n若已安装仍提示缺失,可忽略本段(检测为启发式)。\n\n"
+            return warning
 
     return None
 

+ 936 - 0
agent/tools/docker_runner.py

@@ -0,0 +1,936 @@
+"""
+Docker 内执行工具(模块 ``agent.tools.docker_runner``;与 ``agent.core.runner.AgentRunner`` 无关)。
+
+解析顺序:**ContextVar(Runner 注入)** → **Trace.context['gateway_exec']** → **环境变量默认容器**
+(``AGENT_DEFAULT_DOCKER_CONTAINER``,可选 ``AGENT_DEFAULT_DOCKER_WORKDIR`` /
+``AGENT_DEFAULT_DOCKER_USER``)。有有效 ``docker_container`` 时,``bash_command``、
+``read_file`` / ``write_file`` / ``edit_file`` / ``glob_files`` / ``grep_content`` 走容器内
+``docker exec``;否则仍走原有 builtin(本机)。
+
+- ``GatewayExecResolver`` / ``active_gateway_exec``:``AgentRunner`` 在 ``tools.execute`` 前后 set/reset ContextVar。
+- ``BashGatewayDispatcher`` / ``WorkspaceFileToolsDispatcher``:在 ``import agent.tools``(builtin 注册之后)时向 ``ToolRegistry`` 注册包装函数。
+
+需要 API 进程能访问 Docker(例如挂载 ``/var/run/docker.sock``)。
+"""
+
+from __future__ import annotations
+
+import asyncio
+import base64
+import io
+import json
+import logging
+import mimetypes
+import os
+import posixpath
+import tarfile
+from contextvars import ContextVar
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Callable, ClassVar, Coroutine, Dict, List, Optional, Tuple
+from urllib.parse import urlparse
+
+from agent.tools.builtin.file.edit import replace as edit_replace
+from agent.tools.builtin.file.grep import LIMIT as GREP_LIMIT
+from agent.tools.builtin.file.read import DEFAULT_READ_LIMIT, MAX_BYTES, MAX_LINE_LENGTH
+from agent.tools.builtin.file.write import _create_diff
+from agent.tools.builtin.glob_tool import LIMIT as GLOB_LIMIT
+from agent.tools.models import ToolContext, ToolResult
+
+if TYPE_CHECKING:
+    from agent.tools.registry import ToolRegistry
+
+logger = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# Trace.gateway_exec:ContextVar + 路径解析
+# ---------------------------------------------------------------------------
+
+
+class GatewayExecResolver:
+    """从工具 context / ContextVar 解析 ``gateway_exec``,并把用户路径映射到容器内路径。"""
+
+    ACTIVE: ClassVar[ContextVar[Optional[dict[str, Any]]]] = ContextVar(
+        "active_gateway_exec", default=None
+    )
+
+    @classmethod
+    def from_tool_context(cls, context: Any) -> dict[str, Any] | None:
+        if not isinstance(context, dict):
+            return None
+        tc = context.get("trace_context")
+        if not isinstance(tc, dict):
+            return None
+        ge = tc.get("gateway_exec")
+        return ge if isinstance(ge, dict) else None
+
+    @classmethod
+    def default_gateway_exec_from_env(cls) -> dict[str, Any] | None:
+        """无 Trace.gateway_exec 时,用环境变量指定默认 Workspace 容器(直连 API / 本地调试)。"""
+        container = os.getenv("AGENT_DEFAULT_DOCKER_CONTAINER", "").strip()
+        if not container:
+            return None
+        out: dict[str, Any] = {"docker_container": container}
+        wd = os.getenv("AGENT_DEFAULT_DOCKER_WORKDIR", "").strip()
+        if wd:
+            out["container_workdir"] = wd
+        user = os.getenv("AGENT_DEFAULT_DOCKER_USER", "").strip()
+        if user:
+            out["container_user"] = user
+        return out
+
+    @classmethod
+    def for_trace_context(cls, trace_context: dict[str, Any] | None) -> dict[str, Any] | None:
+        """Trace.context 中的 gateway_exec 优先,否则环境变量默认容器。"""
+        tc = trace_context or {}
+        ge = tc.get("gateway_exec")
+        if isinstance(ge, dict) and str(ge.get("docker_container") or "").strip():
+            return ge
+        return cls.default_gateway_exec_from_env()
+
+    @classmethod
+    def effective(cls, context: Any) -> dict[str, Any] | None:
+        ge = cls.ACTIVE.get()
+        if isinstance(ge, dict) and str(ge.get("docker_container") or "").strip():
+            return ge
+        if isinstance(context, dict):
+            tc = context.get("trace_context")
+            if isinstance(tc, dict):
+                return cls.for_trace_context(tc)
+        return cls.default_gateway_exec_from_env()
+
+    @staticmethod
+    def workdir(ge: dict[str, Any]) -> str:
+        w = str(ge.get("container_workdir") or "/home/agent/workspace").strip()
+        return w.rstrip("/") or "/home/agent/workspace"
+
+    @staticmethod
+    def user(ge: dict[str, Any]) -> str:
+        u = str(ge.get("container_user") or "agent").strip()
+        return u or "agent"
+
+    @staticmethod
+    def _host_mapping_root() -> str | None:
+        raw = os.getenv("AGENT_WORKSPACE_HOST_PROJECT_ROOT", "").strip()
+        if raw:
+            return str(Path(raw).resolve())
+        try:
+            return str(Path.cwd().resolve())
+        except Exception:
+            return None
+
+    @classmethod
+    def resolve_path(cls, ge: dict[str, Any], user_path: str | None, *, is_dir: bool) -> str | None:
+        wd = cls.workdir(ge)
+        if not user_path or not str(user_path).strip():
+            return wd if is_dir else None
+
+        raw = str(user_path).strip().replace("\\", "/")
+        host_root = cls._host_mapping_root()
+
+        if posixpath.isabs(raw):
+            norm = posixpath.normpath(raw)
+            if norm == wd or norm.startswith(wd + "/"):
+                return norm
+            if host_root:
+                hr = host_root.replace("\\", "/").rstrip("/")
+                if norm == hr or norm.startswith(hr + "/"):
+                    rel = posixpath.relpath(norm, hr)
+                    if rel.startswith("../"):
+                        return None
+                    candidate = posixpath.normpath(posixpath.join(wd, rel))
+                    if candidate == wd or candidate.startswith(wd + "/"):
+                        return candidate
+                    return None
+            return None
+
+        for seg in raw.split("/"):
+            if seg == "..":
+                return None
+        candidate = posixpath.normpath(posixpath.join(wd, raw))
+        if candidate == wd or candidate.startswith(wd + "/"):
+            return candidate
+        return None
+
+
+# 兼容旧导入:runner 使用 ``active_gateway_exec.set`` / ``reset``
+active_gateway_exec = GatewayExecResolver.ACTIVE
+
+gateway_exec_from_tool_context = GatewayExecResolver.from_tool_context
+effective_gateway_exec = GatewayExecResolver.effective
+container_workdir = GatewayExecResolver.workdir
+container_user = GatewayExecResolver.user
+resolve_container_path = GatewayExecResolver.resolve_path
+
+
+# ---------------------------------------------------------------------------
+# 单会话:Docker 容器内 exec / 读写 / 工具级 read/write/glob/grep/bash
+# ---------------------------------------------------------------------------
+
+
+class DockerWorkspaceClient:
+    """绑定一份 ``gateway_exec`` 字典,封装对该 Workspace 容器的所有 I/O。"""
+
+    __slots__ = ("_ge",)
+
+    _BINARY_EXTS = frozenset({
+        ".zip", ".tar", ".gz", ".exe", ".dll", ".so", ".class",
+        ".jar", ".war", ".7z", ".doc", ".docx", ".xls", ".xlsx",
+        ".ppt", ".pptx", ".odt", ".ods", ".odp", ".bin", ".dat",
+        ".obj", ".o", ".a", ".lib", ".wasm", ".pyc", ".pyo",
+    })
+
+    def __init__(self, ge: dict[str, Any]) -> None:
+        self._ge = ge
+
+    @property
+    def ge(self) -> dict[str, Any]:
+        return self._ge
+
+    def container_id(self) -> str | None:
+        c = self._ge.get("docker_container")
+        if c is None:
+            return None
+        s = str(c).strip()
+        return s or None
+
+    def _docker_container(self):
+        import docker
+
+        cid = self.container_id()
+        if not cid:
+            raise ValueError("gateway_exec 缺少 docker_container")
+        return docker.from_env().containers.get(cid)
+
+    def sync_exec_argv(
+        self,
+        argv: List[str],
+        *,
+        workdir: str,
+        environment: Optional[Dict[str, str]] = None,
+    ) -> Tuple[int, bytes, bytes]:
+        ct = self._docker_container()
+        user = GatewayExecResolver.user(self._ge)
+        exit_code, output = ct.exec_run(
+            argv,
+            user=user,
+            workdir=workdir,
+            environment=environment,
+            demux=True,
+        )
+        if isinstance(output, tuple) and len(output) == 2:
+            stdout_b, stderr_b = output
+        else:
+            stdout_b = output if isinstance(output, (bytes, bytearray)) else b""
+            stderr_b = b""
+        if stdout_b is None:
+            stdout_b = b""
+        if stderr_b is None:
+            stderr_b = b""
+        code = int(exit_code) if exit_code is not None else -1
+        return code, bytes(stdout_b), bytes(stderr_b)
+
+    async def async_exec_argv(
+        self,
+        argv: List[str],
+        *,
+        workdir: str,
+        environment: Optional[Dict[str, str]] = None,
+    ) -> Tuple[int, bytes, bytes]:
+        loop = asyncio.get_running_loop()
+        return await loop.run_in_executor(
+            None,
+            lambda: self.sync_exec_argv(argv, workdir=workdir, environment=environment),
+        )
+
+    def sync_read_file_bytes(self, container_path: str) -> bytes:
+        ct = self._docker_container()
+        try:
+            _stat, stream = ct.get_archive(container_path)
+        except Exception as e:
+            logger.debug("get_archive failed path=%s: %s", container_path, e)
+            raise FileNotFoundError(container_path) from e
+        chunks = b"".join(stream)
+        bio = io.BytesIO(chunks)
+        with tarfile.open(fileobj=bio, mode="r") as tar:
+            member = tar.next()
+            if member is None:
+                return b""
+            if member.isdir():
+                raise IsADirectoryError(container_path)
+            ef = tar.extractfile(member)
+            if ef is None:
+                return b""
+            return ef.read()
+
+    @staticmethod
+    def _posixpath_dir(p: str) -> str:
+        return os.path.dirname(p.replace("\\", "/"))
+
+    @staticmethod
+    def _posixpath_basename(p: str) -> str:
+        return os.path.basename(p.replace("\\", "/"))
+
+    def sync_write_file_bytes(self, container_path: str, data: bytes) -> None:
+        ct = self._docker_container()
+        parent = self._posixpath_dir(container_path) or "/"
+        base = self._posixpath_basename(container_path)
+        if not base:
+            raise ValueError("invalid container_path")
+
+        code, _out, err = self.sync_exec_argv(
+            ["mkdir", "-p", parent],
+            workdir="/",
+        )
+        if code != 0:
+            raise RuntimeError(
+                f"mkdir -p failed: {parent!r} code={code} stderr={err.decode('utf-8', errors='replace')}"
+            )
+
+        tar_stream = io.BytesIO()
+        with tarfile.open(fileobj=tar_stream, mode="w") as tar:
+            ti = tarfile.TarInfo(name=base)
+            ti.size = len(data)
+            ti.mode = 0o644
+            tar.addfile(ti, io.BytesIO(data))
+        tar_stream.seek(0)
+        ok = ct.put_archive(parent, tar_stream)
+        if not ok:
+            raise RuntimeError(f"put_archive failed: {container_path!r}")
+
+    async def async_read_file_bytes(self, container_path: str) -> bytes:
+        loop = asyncio.get_running_loop()
+        return await loop.run_in_executor(None, lambda: self.sync_read_file_bytes(container_path))
+
+    async def async_write_file_bytes(self, container_path: str, data: bytes) -> None:
+        loop = asyncio.get_running_loop()
+        await loop.run_in_executor(None, lambda: self.sync_write_file_bytes(container_path, data))
+
+    def sync_path_exists(self, container_path: str, *, is_dir: bool) -> bool:
+        flag = "d" if is_dir else "f"
+        code, _, _ = self.sync_exec_argv(
+            ["test", "-" + flag, container_path],
+            workdir="/",
+        )
+        return code == 0
+
+    @classmethod
+    def _is_binary_buffer(cls, data: bytes, suffix: str) -> bool:
+        if suffix.lower() in cls._BINARY_EXTS:
+            return True
+        if not data:
+            return False
+        buf = data[:4096]
+        if b"\x00" in buf:
+            return True
+        non_printable = sum(1 for b in buf if b < 9 or (13 < b < 32))
+        return (non_printable / len(buf)) > 0.3 if buf else False
+
+    MAX_BASH_OUT: ClassVar[int] = 50_000
+
+    async def run_bash_tool(
+        self,
+        command: str,
+        *,
+        timeout: Optional[int],
+        workdir: Optional[str],
+        env: Optional[Dict[str, str]],
+        description: str,
+    ) -> ToolResult:
+        _ = description
+        timeout_sec = timeout if timeout is not None and timeout > 0 else 120
+        container = self.container_id()
+        if not container:
+            return ToolResult(title="配置错误", output="gateway_exec 缺少 docker_container", error="missing_container")
+
+        default_wd = GatewayExecResolver.workdir(self._ge)
+        inner_wd = str(workdir).strip() if workdir else default_wd
+
+        loop = asyncio.get_running_loop()
+        try:
+            code, stdout, stderr = await asyncio.wait_for(
+                loop.run_in_executor(
+                    None,
+                    lambda: self.sync_exec_argv(
+                        ["bash", "-lc", command],
+                        workdir=inner_wd,
+                        environment=env,
+                    ),
+                ),
+                timeout=timeout_sec,
+            )
+        except asyncio.TimeoutError:
+            return ToolResult(
+                title="命令超时",
+                output=f"docker exec 超时(>{timeout_sec}s): {command[:100]}",
+                error="Timeout",
+                metadata={"command": command, "timeout": timeout_sec},
+            )
+        except Exception as e:
+            logger.exception("docker exec 失败 container=%s", container)
+            return ToolResult(
+                title="Docker 执行失败",
+                output=(
+                    f"{e}\n\n请确认 API 容器已挂载 /var/run/docker.sock,"
+                    "且已安装 docker Python 包;容器名在 gateway_exec.docker_container。"
+                ),
+                error="docker_error",
+            )
+
+        stdout_text = stdout.decode("utf-8", errors="replace") if stdout else ""
+        stderr_text = stderr.decode("utf-8", errors="replace") if stderr else ""
+        truncated = False
+        if len(stdout_text) > self.MAX_BASH_OUT:
+            stdout_text = stdout_text[: self.MAX_BASH_OUT] + f"\n\n(输出被截断,总长度: {len(stdout_text)} 字符)"
+            truncated = True
+
+        parts: list[str] = []
+        if stdout_text:
+            parts.append(stdout_text)
+        if stderr_text:
+            parts.append("\n\n--- stderr ---\n" + stderr_text)
+        output = "\n".join(parts) if parts else "(命令无输出)"
+        ok = code == 0
+        meta: dict[str, Any] = {"exit_code": code, "docker_container": container, "truncated": truncated}
+        return ToolResult(
+            title=f"docker bash (exit {code})",
+            output=output,
+            error=None if ok else f"exit code {code}",
+            metadata=meta,
+        )
+
+    async def tool_read_file(self, file_path: str, offset: int, limit: int) -> ToolResult:
+        cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False)
+        if not cpath:
+            return ToolResult(
+                title="路径无效",
+                output="在 Workspace 模式下路径须相对于工作区根,或为工作区/映射项目根下的绝对路径。",
+                error="invalid_path",
+            )
+        name = Path(cpath.replace("\\", "/")).name
+
+        try:
+            raw = await self.async_read_file_bytes(cpath)
+        except FileNotFoundError:
+            return ToolResult(
+                title="文件未找到",
+                output=f"文件不存在: {file_path}",
+                error="File not found",
+            )
+        except IsADirectoryError:
+            return ToolResult(
+                title="路径错误",
+                output=f"路径是目录: {file_path}",
+                error="Is a directory",
+            )
+        except Exception as e:
+            logger.exception("workspace read_file")
+            return ToolResult(title="读取失败", output=str(e), error=str(e))
+
+        mime_type, _ = mimetypes.guess_type(name)
+        mime_type = mime_type or ""
+
+        if mime_type.startswith("image/") and mime_type not in ("image/svg+xml", "image/vnd.fastbidsheet"):
+            b64_data = base64.b64encode(raw).decode("ascii")
+            return ToolResult(
+                title=name,
+                output=f"图片文件: {name} (MIME: {mime_type}, {len(raw)} bytes)",
+                metadata={"mime_type": mime_type, "truncated": False, "workspace_container": True},
+                images=[{"type": "base64", "media_type": mime_type, "data": b64_data}],
+            )
+
+        if mime_type == "application/pdf":
+            return ToolResult(
+                title=name,
+                output=f"PDF 文件: {name}",
+                metadata={"mime_type": mime_type, "truncated": False, "workspace_container": True},
+            )
+
+        if self._is_binary_buffer(raw, Path(name).suffix):
+            return ToolResult(
+                title="二进制文件",
+                output=f"无法读取二进制文件: {name}",
+                error="Binary file",
+            )
+
+        try:
+            text = raw.decode("utf-8")
+        except UnicodeDecodeError:
+            return ToolResult(
+                title="编码错误",
+                output=f"无法解码文件(非 UTF-8): {name}",
+                error="Encoding error",
+            )
+
+        lines_no_keep = text.splitlines()
+        total_lines = len(lines_no_keep)
+        end_line = min(offset + limit, total_lines)
+        output_lines: list[str] = []
+        total_bytes = 0
+        truncated_by_bytes = False
+        for i in range(offset, end_line):
+            line = lines_no_keep[i]
+            if len(line) > MAX_LINE_LENGTH:
+                line = line[:MAX_LINE_LENGTH] + "..."
+            line_bytes = len(line.encode("utf-8")) + (1 if output_lines else 0)
+            if total_bytes + line_bytes > MAX_BYTES:
+                truncated_by_bytes = True
+                break
+            output_lines.append(line)
+            total_bytes += line_bytes
+
+        formatted = [f"{offset + idx + 1:5d}| {ln}" for idx, ln in enumerate(output_lines)]
+        output = "<file>\n" + "\n".join(formatted)
+        last_read_line = offset + len(output_lines)
+        has_more = total_lines > last_read_line
+        truncated = has_more or truncated_by_bytes
+        if truncated_by_bytes:
+            output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 offset 读取第 {last_read_line} 行之后)"
+        elif has_more:
+            output += f"\n\n(还有更多内容。使用 offset 读取第 {last_read_line} 行之后)"
+        else:
+            output += f"\n\n(文件结束 - 共 {total_lines} 行)"
+        output += "\n</file>"
+        preview = "\n".join(output_lines[:20])
+        return ToolResult(
+            title=name,
+            output=output,
+            metadata={
+                "preview": preview,
+                "truncated": truncated,
+                "total_lines": total_lines,
+                "read_lines": len(output_lines),
+                "workspace_container": True,
+            },
+        )
+
+    async def tool_write_file(self, file_path: str, content: str, append: bool) -> ToolResult:
+        cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False)
+        if not cpath:
+            return ToolResult(title="路径无效", output="路径不在工作区内。", error="invalid_path")
+        name = Path(cpath.replace("\\", "/")).name
+
+        if self.sync_path_exists(cpath, is_dir=True):
+            return ToolResult(title="路径错误", output=f"路径是目录: {file_path}", error="Path is a directory")
+
+        existed = self.sync_path_exists(cpath, is_dir=False)
+        old_content = ""
+        if existed:
+            try:
+                old_content = (await self.async_read_file_bytes(cpath)).decode("utf-8", errors="replace")
+            except Exception:
+                old_content = ""
+
+        if append and existed:
+            new_content = old_content + content
+        else:
+            new_content = content
+
+        if existed and old_content:
+            diff = _create_diff(str(file_path), old_content, new_content)
+        else:
+            diff = f"(新建文件: {name})"
+
+        try:
+            await self.async_write_file_bytes(cpath, new_content.encode("utf-8"))
+        except Exception as e:
+            logger.exception("workspace write_file")
+            return ToolResult(title="写入失败", output=str(e), error=str(e))
+
+        lines = new_content.count("\n")
+        if append and existed:
+            operation = "追加内容到"
+        elif existed:
+            operation = "覆盖"
+        else:
+            operation = "创建"
+        return ToolResult(
+            title=name,
+            output=f"文件写入成功 ({operation})\n\n{diff}",
+            metadata={"existed": existed, "append": append, "lines": lines, "diff": diff, "workspace_container": True},
+            long_term_memory=f"{operation}文件 {name}",
+        )
+
+    async def tool_edit_file(
+        self,
+        file_path: str,
+        old_string: str,
+        new_string: str,
+        replace_all: bool,
+    ) -> ToolResult:
+        cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False)
+        if not cpath:
+            return ToolResult(title="路径无效", output="路径不在工作区内。", error="invalid_path")
+        name = Path(cpath.replace("\\", "/")).name
+
+        if not self.sync_path_exists(cpath, is_dir=False):
+            return ToolResult(title="文件未找到", output=f"文件不存在: {file_path}", error="File not found")
+        if self.sync_path_exists(cpath, is_dir=True):
+            return ToolResult(title="路径错误", output=f"路径是目录: {file_path}", error="Path is a directory")
+
+        try:
+            content_old = (await self.async_read_file_bytes(cpath)).decode("utf-8")
+        except Exception as e:
+            return ToolResult(title="读取失败", output=str(e), error=str(e))
+
+        try:
+            content_new = edit_replace(content_old, old_string, new_string, replace_all)
+        except ValueError as e:
+            return ToolResult(title="替换失败", output=str(e), error=str(e))
+
+        diff = _create_diff(file_path, content_old, content_new)
+        try:
+            await self.async_write_file_bytes(cpath, content_new.encode("utf-8"))
+        except Exception as e:
+            return ToolResult(title="写入失败", output=str(e), error=str(e))
+
+        return ToolResult(
+            title=name,
+            output=f"编辑成功\n\n{diff}",
+            metadata={
+                "replace_all": replace_all,
+                "workspace_container": True,
+                "old_lines": content_old.count("\n"),
+                "new_lines": content_new.count("\n"),
+            },
+            long_term_memory=f"编辑文件 {name}",
+        )
+
+    async def tool_glob(self, pattern: str, path: Optional[str]) -> ToolResult:
+        wd = GatewayExecResolver.workdir(self._ge)
+        sp = GatewayExecResolver.resolve_path(self._ge, path, is_dir=True) if path else wd
+        if not sp:
+            return ToolResult(title="路径无效", output="搜索目录无效。", error="invalid_path")
+        if not self.sync_path_exists(sp, is_dir=True):
+            return ToolResult(title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found")
+
+        cfg = json.dumps({"pattern": pattern, "root": sp, "fetch": GLOB_LIMIT + 1}, ensure_ascii=False)
+        script = (
+            "import glob,json,os;"
+            "from pathlib import Path;"
+            "c=json.loads(__import__('os').environ['GW_GLOB_CFG']);"
+            "os.chdir(c['root']);pat=c['pattern'];n=int(c['fetch']);"
+            "paths=[str(p) for p in Path('.').glob(pat) if p.is_file()] if '**' in pat "
+            "else [p for p in glob.glob(pat) if os.path.isfile(p)];"
+            "mt=sorted([(p,os.path.getmtime(p)) for p in paths],key=lambda x:-x[1]);"
+            "print(json.dumps([p for p,_ in mt[:n]]))"
+        )
+        code, out, err = await self.async_exec_argv(
+            ["python3", "-c", script],
+            workdir=sp,
+            environment={"GW_GLOB_CFG": cfg},
+        )
+        if code != 0:
+            return ToolResult(
+                title="glob 失败",
+                output=err.decode("utf-8", errors="replace") or out.decode("utf-8", errors="replace"),
+                error="glob_failed",
+            )
+        try:
+            file_paths: List[str] = json.loads(out.decode("utf-8") or "[]")
+        except json.JSONDecodeError:
+            return ToolResult(title="glob 解析失败", output=out.decode("utf-8", errors="replace"), error="bad_json")
+
+        truncated = len(file_paths) > GLOB_LIMIT
+        file_paths = file_paths[:GLOB_LIMIT]
+        if not file_paths:
+            output = "未找到匹配的文件"
+        else:
+            output = "\n".join(file_paths)
+            if truncated:
+                output += "\n\n(结果已截断。考虑使用更具体的路径或模式。)"
+        return ToolResult(
+            title=f"Glob: {pattern}",
+            output=output,
+            metadata={"count": len(file_paths), "truncated": truncated, "workspace_container": True},
+        )
+
+    async def tool_grep(
+        self,
+        pattern: str,
+        path: Optional[str],
+        include: Optional[str],
+    ) -> ToolResult:
+        wd = GatewayExecResolver.workdir(self._ge)
+        search_path = GatewayExecResolver.resolve_path(self._ge, path, is_dir=True) if path else wd
+        if not search_path:
+            return ToolResult(title="路径无效", output="搜索目录无效。", error="invalid_path")
+        if not self.sync_path_exists(search_path, is_dir=True):
+            return ToolResult(title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found")
+
+        args: List[str] = [
+            "rg", "-nH", "--hidden", "--follow", "--no-messages",
+            "--field-match-separator=|", "--regexp", pattern,
+        ]
+        if include:
+            args.extend(["--glob", include])
+        args.append(search_path)
+
+        code, stdout_b, stderr_b = await self.async_exec_argv(args, workdir=search_path)
+        if code == 1:
+            matches: List[Tuple[str, int, str]] = []
+        elif code != 0 and code != 2:
+            return ToolResult(
+                title="ripgrep 失败",
+                output=stderr_b.decode("utf-8", errors="replace"),
+                error="rg_failed",
+            )
+        else:
+            matches = []
+            for line in stdout_b.decode("utf-8", errors="replace").strip().split("\n"):
+                if not line:
+                    continue
+                parts = line.split("|", 2)
+                if len(parts) < 3:
+                    continue
+                file_path_str, line_num_str, line_text = parts
+                try:
+                    matches.append((file_path_str, int(line_num_str), line_text))
+                except ValueError:
+                    continue
+
+        matches.sort(key=lambda x: x[0], reverse=True)
+        truncated = len(matches) > GREP_LIMIT
+        matches = matches[:GREP_LIMIT]
+
+        if not matches:
+            output = "未找到匹配"
+        else:
+            output = f"找到 {len(matches)} 个匹配\n"
+            current_file = None
+            for file_path_str, line_num, line_text in matches:
+                if current_file != file_path_str:
+                    if current_file is not None:
+                        output += "\n"
+                    current_file = file_path_str
+                    output += f"\n{file_path_str}:\n"
+                if len(line_text) > 2000:
+                    line_text = line_text[:2000] + "..."
+                output += f"  Line {line_num}: {line_text}\n"
+            if truncated:
+                output += "\n(结果已截断。考虑使用更具体的路径或模式。)"
+
+        return ToolResult(
+            title=f"搜索: {pattern}",
+            output=output,
+            metadata={"matches": len(matches), "truncated": truncated, "pattern": pattern, "workspace_container": True},
+        )
+
+
+# ---------------------------------------------------------------------------
+# 注册表:bash / 文件工具分发
+# ---------------------------------------------------------------------------
+
+
+class BashGatewayDispatcher:
+    """将 ``bash_command`` 覆盖为:有 ``gateway_exec`` 时走 ``DockerWorkspaceClient.run_bash_tool``。"""
+
+    _builtin: ClassVar[Callable[..., Coroutine[Any, Any, ToolResult]] | None] = None
+    _installed: ClassVar[bool] = False
+
+    @classmethod
+    def install(cls, registry: ToolRegistry) -> None:
+        if cls._installed:
+            return
+        entry = registry._tools.get("bash_command")
+        if not entry:
+            logger.warning("docker_runner: bash_command 未注册,跳过覆盖")
+            return
+        cls._builtin = entry["func"]
+        schema = entry["schema"]
+        hidden = list(entry.get("hidden_params") or ["context"])
+        dispatch = cls._make_dispatch()
+        registry.register(
+            dispatch,
+            schema=schema,
+            hidden_params=hidden,
+            inject_params=dict(entry.get("inject_params") or {}),
+            requires_confirmation=entry["ui_metadata"].get("requires_confirmation", False),
+            editable_params=list(entry["ui_metadata"].get("editable_params") or []),
+            display=dict(entry["ui_metadata"].get("display") or {}),
+            url_patterns=entry.get("url_patterns"),
+        )
+        cls._installed = True
+        logger.info("bash_command 已启用 gateway_exec → docker exec 分发")
+
+    @classmethod
+    def _make_dispatch(cls) -> Callable[..., Coroutine[Any, Any, ToolResult]]:
+        async def bash_command(
+            command: str,
+            timeout: Optional[int] = None,
+            workdir: Optional[str] = None,
+            env: Optional[Dict[str, str]] = None,
+            description: str = "",
+            context: Optional[ToolContext] = None,
+        ) -> ToolResult:
+            ge = GatewayExecResolver.effective(context)
+            if ge:
+                ws = DockerWorkspaceClient(ge)
+                if ws.container_id():
+                    return await ws.run_bash_tool(
+                        command,
+                        timeout=timeout,
+                        workdir=workdir,
+                        env=env,
+                        description=description,
+                    )
+            if cls._builtin is None:
+                return ToolResult(title="内部错误", output="builtin bash_command 未初始化", error="no_builtin")
+            return await cls._builtin(
+                command=command,
+                timeout=timeout,
+                workdir=workdir,
+                env=env,
+                description=description,
+                context=context,
+            )
+
+        bash_command.__name__ = "bash_command"
+        bash_command.__doc__ = (
+            "执行 bash 命令(Trace.gateway_exec 或 AGENT_DEFAULT_DOCKER_CONTAINER 时在容器内 docker exec)"
+        )
+        return bash_command
+
+
+class WorkspaceFileToolsDispatcher:
+    """将 read/write/edit/glob/grep 在有 ``gateway_exec`` 时转发到 ``DockerWorkspaceClient``。"""
+
+    _orig: ClassVar[dict[str, Callable[..., Coroutine[Any, Any, ToolResult]]]] = {}
+    _installed: ClassVar[bool] = False
+
+    @classmethod
+    def install(cls, registry: ToolRegistry) -> None:
+        if cls._installed:
+            return
+
+        async def read_file(
+            file_path: str,
+            offset: int = 0,
+            limit: int = DEFAULT_READ_LIMIT,
+            context: Optional[ToolContext] = None,
+        ) -> ToolResult:
+            ge = GatewayExecResolver.effective(context)
+            parsed = urlparse(file_path)
+            if parsed.scheme in ("http", "https"):
+                return await cls._orig["read_file"](file_path=file_path, offset=offset, limit=limit, context=context)
+            if ge and ge.get("docker_container"):
+                return await DockerWorkspaceClient(ge).tool_read_file(file_path, offset, limit)
+            return await cls._orig["read_file"](file_path=file_path, offset=offset, limit=limit, context=context)
+
+        async def write_file(
+            file_path: str,
+            content: str,
+            append: bool = False,
+            context: Optional[ToolContext] = None,
+        ) -> ToolResult:
+            ge = GatewayExecResolver.effective(context)
+            if ge and ge.get("docker_container"):
+                return await DockerWorkspaceClient(ge).tool_write_file(file_path, content, append)
+            return await cls._orig["write_file"](file_path=file_path, content=content, append=append, context=context)
+
+        async def edit_file(
+            file_path: str,
+            old_string: str,
+            new_string: str,
+            replace_all: bool = False,
+            context: Optional[ToolContext] = None,
+        ) -> ToolResult:
+            ge = GatewayExecResolver.effective(context)
+            if ge and ge.get("docker_container"):
+                return await DockerWorkspaceClient(ge).tool_edit_file(
+                    file_path, old_string, new_string, replace_all
+                )
+            return await cls._orig["edit_file"](
+                file_path=file_path,
+                old_string=old_string,
+                new_string=new_string,
+                replace_all=replace_all,
+                context=context,
+            )
+
+        async def glob_files(
+            pattern: str,
+            path: Optional[str] = None,
+            context: Optional[ToolContext] = None,
+        ) -> ToolResult:
+            ge = GatewayExecResolver.effective(context)
+            if ge and ge.get("docker_container"):
+                return await DockerWorkspaceClient(ge).tool_glob(pattern, path)
+            return await cls._orig["glob_files"](pattern=pattern, path=path, context=context)
+
+        async def grep_content(
+            pattern: str,
+            path: Optional[str] = None,
+            include: Optional[str] = None,
+            context: Optional[ToolContext] = None,
+        ) -> ToolResult:
+            ge = GatewayExecResolver.effective(context)
+            if ge and ge.get("docker_container"):
+                return await DockerWorkspaceClient(ge).tool_grep(pattern, path, include)
+            orig = cls._orig["grep_content"]
+            return await orig(pattern=pattern, path=path, include=include, context=context)
+
+        read_file.__name__ = "read_file"
+        write_file.__name__ = "write_file"
+        edit_file.__name__ = "edit_file"
+        glob_files.__name__ = "glob_files"
+        grep_content.__name__ = "grep_content"
+
+        for name, fn in [
+            ("read_file", read_file),
+            ("write_file", write_file),
+            ("edit_file", edit_file),
+            ("glob_files", glob_files),
+            ("grep_content", grep_content),
+        ]:
+            cls._register_override(registry, name, fn)
+
+        cls._installed = True
+        logger.info("read/write/edit/glob/grep 已启用 gateway_exec → Workspace 容器分发")
+
+    @classmethod
+    def _register_override(
+        cls,
+        registry: ToolRegistry,
+        name: str,
+        dispatch: Callable[..., Coroutine[Any, Any, ToolResult]],
+    ) -> None:
+        entry = registry._tools.get(name)
+        if not entry:
+            logger.warning("docker_runner: 工具 %s 未注册,跳过覆盖", name)
+            return
+        cls._orig[name] = entry["func"]
+        registry.register(
+            dispatch,
+            schema=entry["schema"],
+            hidden_params=list(entry.get("hidden_params") or []),
+            inject_params=dict(entry.get("inject_params") or {}),
+            requires_confirmation=entry["ui_metadata"].get("requires_confirmation", False),
+            editable_params=list(entry["ui_metadata"].get("editable_params") or []),
+            display=dict(entry["ui_metadata"].get("display") or {}),
+            url_patterns=entry.get("url_patterns"),
+        )
+
+
+def install_bash_gateway_dispatch(registry: ToolRegistry) -> None:
+    BashGatewayDispatcher.install(registry)
+
+
+def install_workspace_file_tools_dispatch(registry: ToolRegistry) -> None:
+    WorkspaceFileToolsDispatcher.install(registry)
+
+
+__all__ = [
+    "GatewayExecResolver",
+    "DockerWorkspaceClient",
+    "BashGatewayDispatcher",
+    "WorkspaceFileToolsDispatcher",
+    "active_gateway_exec",
+    "gateway_exec_from_tool_context",
+    "effective_gateway_exec",
+    "container_workdir",
+    "container_user",
+    "resolve_container_path",
+    "install_bash_gateway_dispatch",
+    "install_workspace_file_tools_dispatch",
+]

+ 21 - 0
agent/trace/run_api.py

@@ -72,6 +72,10 @@ class CreateRequest(BaseModel):
         None,
         description="写入 Trace.context['feishu_adapter'],供 feishu_adapter_tool_call 合并到 Node /tool-call 的 context",
     )
+    gateway_exec: Optional[Dict[str, Any]] = Field(
+        None,
+        description="写入 Trace.context['gateway_exec'](如 docker_container),供 bash 等在 Workspace 容器内执行",
+    )
 
 
 class TraceRunRequest(BaseModel):
@@ -88,6 +92,10 @@ class TraceRunRequest(BaseModel):
         None,
         description="合并到 Trace.context['feishu_adapter'](覆盖同名字段),用于更新当前消息的 message_id 等",
     )
+    gateway_exec: Optional[Dict[str, Any]] = Field(
+        None,
+        description="合并到 Trace.context['gateway_exec'](续跑时更新 Workspace 容器等信息)",
+    )
 
 
 class ReflectRequest(BaseModel):
@@ -194,6 +202,8 @@ async def create_and_run(req: CreateRequest):
                 ex_ctx["project_name"] = req.project_name
                 if req.feishu_adapter:
                     ex_ctx["feishu_adapter"] = dict(req.feishu_adapter)
+                if req.gateway_exec:
+                    ex_ctx["gateway_exec"] = dict(req.gateway_exec)
                 config = RunConfig(
                     model=req.model or default_config.model,
                     temperature=req.temperature if req.temperature is not None else default_config.temperature,
@@ -221,6 +231,8 @@ async def create_and_run(req: CreateRequest):
             ctx["project_name"] = req.project_name
         if req.feishu_adapter:
             ctx["feishu_adapter"] = dict(req.feishu_adapter)
+        if req.gateway_exec:
+            ctx["gateway_exec"] = dict(req.gateway_exec)
         config = RunConfig(
             model=req.model,
             temperature=req.temperature,
@@ -384,6 +396,15 @@ async def run_trace(trace_id: str, req: TraceRunRequest):
             merged_ctx["feishu_adapter"] = prev_d
             await runner.trace_store.update_trace(trace_id, context=merged_ctx)
 
+        if req.gateway_exec:
+            tr_ge = await runner.trace_store.get_trace(trace_id)
+            if tr_ge:
+                merged_ctx = dict(tr_ge.context or {})
+                prev_ge = dict(merged_ctx.get("gateway_exec") or {})
+                prev_ge.update(dict(req.gateway_exec))
+                merged_ctx["gateway_exec"] = prev_ge
+                await runner.trace_store.update_trace(trace_id, context=merged_ctx)
+
         # 自动检查并清理不完整的工具调用
         if after_sequence is not None and req.messages:
             adjusted_seq = await _cleanup_incomplete_tool_calls(

+ 1 - 3
api_server.py

@@ -9,9 +9,7 @@ API Server - FastAPI 应用入口
 """
 
 import logging
-import json
-import os
-from fastapi import FastAPI, Request, WebSocket
+from fastapi import FastAPI, WebSocket
 from fastapi.middleware.cors import CORSMiddleware
 import uvicorn
 

+ 7 - 0
docker-compose.yml

@@ -34,6 +34,8 @@ services:
       - .env
     environment:
       - FEISHU_HTTP_BASE_URL=http://feishu:4380
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock:ro
     ports:
       - "8001:8000"
     entrypoint: "python /app/api_server.py"
@@ -51,6 +53,11 @@ services:
       - GATEWAY_AGENT_API_BASE_URL=http://api:8000
       - CHANNELS_DISPATCH_REACTIONS=false
       - CHANNELS_DISPATCH_CARD_ACTIONS=true
+      # Workspace 沙箱:与下方命名卷一致(Compose 项目名为 agent 时卷名为 agent_workspace_root)
+      - GATEWAY_WORKSPACE_DOCKER_NETWORK=agent
+      - GATEWAY_WORKSPACE_MOUNT_MODE=volume_subpath
+      - GATEWAY_WORKSPACE_DOCKER_VOLUME=agent_workspace_root
+      - GATEWAY_SHARED_DOCKER_VOLUME=agent_workspace_shared
     volumes:
       # workspace 与 shared 数据卷
       - workspace_root:/root/.gateway/workspaces

+ 6 - 4
docker/Dockerfile.workspace

@@ -15,11 +15,13 @@ ENV PYTHONUNBUFFERED=1 \
 # 1、安装必要的软件包
 RUN sed -i 's/deb.debian.org/mirrors.ustc.edu.cn/g' /etc/apt/sources.list.d/debian.sources \
     && apt-get update && apt-get install -y --no-install-recommends \
-    sudo git curl ca-certificates zip unzip tar build-essential pkg-config jq \
+    sudo git curl ca-certificates zip unzip tar build-essential pkg-config jq ripgrep \
     && rm -rf /var/lib/apt/lists/*
 
-# 2、创建 agent 用户
-RUN useradd -m agent && echo "agent ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
+# 2、创建 agent 用户与共享目录
+RUN useradd -m agent && echo "agent ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers \
+    && mkdir -p /home/agent/workspace /home/agent/shared \
+    && chown -R agent:agent /home/agent/workspace /home/agent/shared
 USER agent
 WORKDIR /home/agent/workspace
 
@@ -41,6 +43,6 @@ RUN brew install fnm \
     && fnm use 24 \
     && npm config set registry https://registry.npmmirror.com
 
-VOLUME [ "/home/agent/workspace" ]
+VOLUME [ "/home/agent/workspace", "/home/agent/shared" ]
 
 ENTRYPOINT [ "sleep", "infinity" ]

+ 3 - 3
gateway/README.md

@@ -80,9 +80,9 @@ gateway/
 │   │   └── channel_manager.py     # 渠道管理
 │   │
 │   ├── lifecycle/                 # Agent 生命周期管理
-│   │   ├── trace_manager.py      # Trace 注册和查询
-│   │   ├── workspace_manager.py  # Workspace 管理
-│   │   └── config_watcher.py     # 配置热重载
+│   │   ├── workspace/             # Workspace 目录与 Docker 沙箱
+│   │   ├── trace/                 # Trace 代理与 TraceBackend
+│   │   └── config_watcher.py      # 配置热重载
 │   │
 │   └── executor/                  # 任务执行调度
 │       ├── task_manager.py        # 任务管理

+ 3 - 4
gateway/core/channels/__init__.py

@@ -6,15 +6,14 @@ HTTP 路由由各渠道 Api 类(如 ``FeishuChannelApi.build_router``)构建
 ``/api/channels/feishu/inbound/webhook``)。
 """
 
-from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 from gateway.core.channels.feishu.api import FeishuChannelApi
 from gateway.core.channels.feishu.connector import FeishuConnector
-from gateway.core.channels.feishu.http_run_executor import FeishuHttpRunApiExecutor
+from gateway.core.channels.feishu.bridge import FeishuHttpRunApiExecutor
 from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
 from gateway.core.channels.manager import ChannelRegistry
-from gateway.core.channels.protocols import ExecutorBackend, UserIdentityResolver
+from gateway.core.channels.protocols import ExecutorBackend, TraceBackend, UserIdentityResolver
 from gateway.core.channels.router import ChannelTraceRouter
 from gateway.core.channels.types import CHANNEL_FEISHU, CHANNEL_WECHAT, RouteResult
 
@@ -28,6 +27,7 @@ __all__ = [
     "ChannelRegistry",
     "ChannelTraceRouter",
     "ExecutorBackend",
+    "TraceBackend",
     "FeishuHttpRunApiExecutor",
     "FeishuChannelConfig",
     "FeishuChannelManager",
@@ -36,7 +36,6 @@ __all__ = [
     "FeishuMessageRouter",
     "FeishuReplyContext",
     "IncomingFeishuEvent",
-    "MemoryTraceBackend",
     "MessageRouter",
     "RouteResult",
     "UserIdentityResolver",

+ 0 - 5
gateway/core/channels/backends/__init__.py

@@ -1,5 +0,0 @@
-from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
-
-__all__ = [
-    "MemoryTraceBackend",
-]

+ 0 - 32
gateway/core/channels/backends/memory_trace.py

@@ -1,32 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-import uuid
-from typing import Any
-
-
-class MemoryTraceBackend:
-    """进程内 (channel, user_id) → trace_id;接入 Lifecycle 后可替换为 TraceManager。"""
-
-    def __init__(self) -> None:
-        self._map: dict[tuple[str, str], str] = {}
-        self._lock = asyncio.Lock()
-
-    async def get_or_create_trace(
-        self,
-        *,
-        channel: str,
-        user_id: str,
-        workspace_id: str,
-        agent_type: str,
-        metadata: dict[str, Any],
-    ) -> str:
-        _ = workspace_id, agent_type, metadata
-        key = (channel, user_id)
-        async with self._lock:
-            if key not in self._map:
-                self._map[key] = str(uuid.uuid4())
-            return self._map[key]
-
-    def clear(self) -> None:
-        self._map.clear()

+ 24 - 5
gateway/core/channels/feishu/__init__.py

@@ -1,12 +1,30 @@
+"""
+飞书渠道(Gateway Python 包)
+
+**Python 模块**
+- ``types``:规范化入站事件 ``IncomingFeishuEvent``、回复上下文 ``FeishuReplyContext``
+- ``connector``:请求飞书 HTTP 适配层(通常即本目录旁的 Node 服务)
+- ``identity``:``open_id`` 等 → 渠道 ``user_id``
+- ``protocols``:收窄的 ``FeishuExecutorBackend`` / ``FeishuUserIdentityResolver``
+- ``router``:``FeishuMessageRouter``(Trace 会话准备 → 调执行器 → 绑定 Agent ``trace_id``)
+- ``bridge``:``FeishuHttpRunApiExecutor``(Agent Trace HTTP 建链/续跑、WS 跟单、assistant 回推飞书)
+- ``manager``:``FeishuChannelManager`` 组装 Connector / Lifecycle / bridge / router
+- ``api``:FastAPI 挂载 ``/api/channels/feishu/...``
+
+**同目录非 Python 包(勿当模块 import)**
+- ``openclaw-lark/``:git 子模块,飞书适配上游;Docker 见 ``docker/Dockerfile.feishu``
+- ``openclaw-lark-patch/``:构建时覆盖补丁;compose 可挂 ``config.yml`` 等
+
+类名 ``FeishuHttpRunApiExecutor`` 保留历史含义(调用 Agent Trace HTTP);实现位于 ``bridge``。
+"""
+
+from gateway.core.channels.feishu.bridge import FeishuHttpRunApiExecutor
 from gateway.core.channels.feishu.api import FeishuChannelApi
 from gateway.core.channels.feishu.connector import FeishuConnector, TYPING_REACTION_EMOJI
 from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
-from gateway.core.channels.feishu.router import (
-    FeishuExecutorBackend,
-    FeishuMessageRouter,
-    FeishuUserIdentityResolver,
-)
+from gateway.core.channels.feishu.protocols import FeishuExecutorBackend, FeishuUserIdentityResolver
+from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.feishu.types import (
     FeishuReplyContext,
     IncomingFeishuEvent,
@@ -16,6 +34,7 @@ from gateway.core.channels.feishu.types import (
 
 __all__ = [
     "FeishuChannelApi",
+    "FeishuHttpRunApiExecutor",
     "DefaultUserIdentityResolver",
     "FeishuChannelConfig",
     "FeishuChannelManager",

+ 2 - 2
gateway/core/channels/feishu/api.py

@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
 class FeishuChannelApi:
     """飞书渠道 HTTP 路由:持有 manager 引用,以方法作为路由处理器。
 
-    实现 ``ChannelPlugin`` Protocol,可通过 ``ChannelLoader`` 自动注册
+    实现 ``ChannelPlugin`` Protocol,由 ``load_enabled_channels`` 自动挂载路由
     """
 
     def __init__(self, channel_manager: FeishuChannelManager) -> None:
@@ -28,7 +28,7 @@ class FeishuChannelApi:
 
     @classmethod
     def from_env(cls) -> FeishuChannelApi:
-        """从环境变量构造实例,供 ``ChannelLoader`` 自动调用。"""
+        """从环境变量构造实例,供 ``load_enabled_channels`` 调用。"""
         return cls(FeishuChannelManager.from_env())
 
     async def inbound_webhook(self, request: Request) -> dict[str, Any]:

+ 216 - 133
gateway/core/channels/feishu/http_run_executor.py → gateway/core/channels/feishu/bridge.py

@@ -1,11 +1,19 @@
 """
-飞书执行器:HTTP 调用 Agent ``run_api``,WebSocket 订阅 ``/api/traces/{id}/watch``,
-将 assistant 消息转发到飞书(不轮询 messages)。
+飞书 ↔ Agent Trace 桥接(模块 ``gateway.core.channels.feishu.bridge``)。
 
-转发规则:
-- 不转发 ``branch_type=reflection``(完成后知识提取侧分支)
-- 不转发仍含 ``tool_calls`` 的中间轮,只推工具执行后的最终回复
-- 提取正文时避免 ``description`` 与 ``text`` 重复拼接
+职责概览:用户从飞书发来消息后,经 ``FeishuHttpRunApiExecutor`` 调用 Agent 的 Trace HTTP API
+(``POST /api/traces`` 建链或 ``POST /api/traces/{id}/run`` 续跑),再经 WebSocket 订阅
+``/api/traces/{id}/watch`` 跟单,把 **assistant 最终回复** 推回飞书;可选挂载 Workspace /
+``gateway_exec``(Docker 容器)生命周期,与 Trace 终态联动。
+
+文件内分区(单模块,避免过度拆包):
+
+1. **Agent 请求体 / 飞书上下文** — ``append_feishu_context_block``、``feishu_adapter_payload`` 等
+2. **Trace / WS 消息解析** — ``TERMINAL_STATUSES``、assistant 正文提取、``trace_watch_ws_url``
+3. **跟单与 Typing** — ``poll_assistants_to_feishu``、``schedule_trace_followup``
+4. **``FeishuHttpRunApiExecutor``** — 飞书入站入口
+
+转发规则:不推 ``branch_type=reflection``;不推仍含 ``tool_calls`` 的中间轮;避免 ``description`` 与 ``text`` 重复拼接。
 """
 
 from __future__ import annotations
@@ -15,31 +23,41 @@ import json
 import logging
 import time
 import uuid
+from collections.abc import Awaitable, Callable
 from copy import copy
 from typing import Any
 
 import httpx
 
 from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
+from gateway.core.lifecycle.trace.backend import LifecycleTraceBackend
+from gateway.core.lifecycle.workspace import WorkspaceManager
+
+__all__ = [
+    "FeishuHttpRunApiExecutor",
+    "FollowupFinishedCallback",
+    "TERMINAL_STATUSES",
+    "append_feishu_context_block",
+    "feishu_adapter_payload",
+    "format_api_error",
+    "normalized_agent_trace_id",
+    "schedule_trace_followup",
+]
 
 logger = logging.getLogger(__name__)
 
-_TERMINAL_STATUSES = frozenset({"completed", "failed", "stopped"})
 
-# 同一 trace 仅一个跟单任务,避免并发重复推送
-_poll_tasks: dict[str, asyncio.Task[None]] = {}
-_poll_tasks_lock = asyncio.Lock()
-# trace_id → 已成功推送到飞书的 assistant sequence(跨多次 run,避免重复发送)
-_assistant_sent_sequences: dict[str, set[int]] = {}
-# trace_id → 待任务结束时移除 Typing 表情的用户消息
-_typing_cleanup_lock = asyncio.Lock()
-_pending_typing_by_trace: dict[str, list[tuple[str, str | None]]] = {}
+# =============================================================================
+# 1. Agent 请求体与飞书上下文
+# =============================================================================
 
 
-# ----- HTTP / Agent API -----
+def normalized_agent_trace_id(raw: str) -> str | None:
+    t = (raw or "").strip()
+    return t if t else None
 
 
-def _format_api_error(status_code: int, body_text: str) -> str:
+def format_api_error(status_code: int, body_text: str) -> str:
     try:
         data = json.loads(body_text)
         detail = data.get("detail")
@@ -54,15 +72,11 @@ def _format_api_error(status_code: int, body_text: str) -> str:
     return (body_text or "")[:800] or f"HTTP {status_code}"
 
 
-# ----- 飞书上下文(用户消息 / Trace.context)-----
-
-
-def _append_feishu_context_block(
+def append_feishu_context_block(
     text: str,
     event: IncomingFeishuEvent,
     reply_context: FeishuReplyContext,
 ) -> str:
-    """在用户文本后附加结构化上下文,便于后续工具(Feishu HTTP)读取。"""
     core = text.strip() if text else ""
     if not core:
         core = "(空消息)"
@@ -80,11 +94,10 @@ def _append_feishu_context_block(
     return "\n".join(lines)
 
 
-def _feishu_adapter_payload(
+def feishu_adapter_payload(
     event: IncomingFeishuEvent,
     reply_context: FeishuReplyContext,
 ) -> dict[str, str]:
-    """写入 Trace.context['feishu_adapter'],供 feishu_adapter_tool_call 对齐 Node /tool-call。"""
     return {
         "account_id": reply_context.account_id or "",
         "app_id": reply_context.app_id,
@@ -95,11 +108,14 @@ def _feishu_adapter_payload(
     }
 
 
-# ----- Trace assistant → 飞书正文 -----
+# =============================================================================
+# 2. Trace / WebSocket 消息解析
+# =============================================================================
 
+TERMINAL_STATUSES = frozenset({"completed", "failed", "stopped"})
 
-def _assistant_content_has_tool_calls(msg: dict[str, Any]) -> bool:
-    """assistant 是否仍带有待执行的 tool_calls(中间轮,不当最终回复推给用户)。"""
+
+def assistant_content_has_tool_calls(msg: dict[str, Any]) -> bool:
     if msg.get("role") != "assistant":
         return False
     c = msg.get("content")
@@ -113,8 +129,7 @@ def _assistant_content_has_tool_calls(msg: dict[str, Any]) -> bool:
     return bool(tc)
 
 
-def _assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
-    """从 Trace 消息 dict 提取可发给用户的文本。"""
+def assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
     if msg.get("role") != "assistant":
         return None
     content = msg.get("content")
@@ -143,14 +158,13 @@ def _assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
     return "\n".join(parts)
 
 
-def _truncate_for_im(text: str, max_chars: int) -> str:
+def truncate_for_im(text: str, max_chars: int) -> str:
     if len(text) <= max_chars:
         return text
     return text[: max_chars - 80] + "\n\n…(内容过长已截断)"
 
 
-def _trace_watch_ws_url(http_base: str, trace_id: str) -> str:
-    """Agent HTTP 根地址 → ``/api/traces/{id}/watch`` 的 WebSocket URL。"""
+def trace_watch_ws_url(http_base: str, trace_id: str) -> str:
     b = http_base.strip().rstrip("/")
     if b.startswith("https://"):
         origin = "wss://" + b[8:]
@@ -161,7 +175,7 @@ def _trace_watch_ws_url(http_base: str, trace_id: str) -> str:
     return f"{origin}/api/traces/{trace_id}/watch"
 
 
-def _message_sequence(msg: dict[str, Any]) -> int | None:
+def message_sequence(msg: dict[str, Any]) -> int | None:
     s = msg.get("sequence")
     if s is None:
         return None
@@ -180,7 +194,7 @@ def _message_sequence(msg: dict[str, Any]) -> int | None:
         return None
 
 
-def _watch_ws_payload_to_dict(raw: Any) -> dict[str, Any] | None:
+def watch_ws_payload_to_dict(raw: Any) -> dict[str, Any] | None:
     if isinstance(raw, (bytes, bytearray)):
         raw = raw.decode("utf-8", errors="replace")
     if not isinstance(raw, str):
@@ -192,10 +206,21 @@ def _watch_ws_payload_to_dict(raw: Any) -> dict[str, Any] | None:
     return data if isinstance(data, dict) else None
 
 
-# ----- Typing 表情 -----
+# =============================================================================
+# 3. Trace 跟单、Typing、HTTP 兜底
+# =============================================================================
 
+FollowupFinishedCallback = Callable[[str, str], Awaitable[None]]
+"""``(trace_id, reason)`` · ``terminal`` | ``timeout`` | ``not_found``"""
 
-async def _remove_typing_reaction_safe(
+_poll_tasks: dict[str, asyncio.Task[None]] = {}
+_poll_tasks_lock = asyncio.Lock()
+_assistant_sent_sequences: dict[str, set[int]] = {}
+_typing_cleanup_lock = asyncio.Lock()
+_pending_typing_by_trace: dict[str, list[tuple[str, str | None]]] = {}
+
+
+async def remove_typing_reaction_safe(
     connector: Any,
     message_id: str,
     account_id: str | None,
@@ -220,7 +245,7 @@ async def _remove_typing_reaction_safe(
         logger.exception("%s: remove reaction exception mid=%s", log_label, message_id)
 
 
-async def _register_pending_typing_cleanup(
+async def register_pending_typing_cleanup(
     trace_id: str,
     message_id: str,
     account_id: str | None,
@@ -229,7 +254,7 @@ async def _register_pending_typing_cleanup(
         _pending_typing_by_trace.setdefault(trace_id, []).append((message_id, account_id))
 
 
-async def _remove_typing_immediate(
+async def remove_typing_immediate(
     connector: Any,
     message_id: str | None,
     account_id: str | None,
@@ -237,7 +262,7 @@ async def _remove_typing_immediate(
 ) -> None:
     if not message_id:
         return
-    await _remove_typing_reaction_safe(
+    await remove_typing_reaction_safe(
         connector,
         message_id,
         account_id,
@@ -246,7 +271,7 @@ async def _remove_typing_immediate(
     )
 
 
-async def _flush_pending_typing_cleanups(
+async def flush_pending_typing_cleanups(
     connector: Any,
     trace_id: str,
     emoji: str,
@@ -254,7 +279,7 @@ async def _flush_pending_typing_cleanups(
     async with _typing_cleanup_lock:
         pairs = _pending_typing_by_trace.pop(trace_id, [])
     for mid, acc in pairs:
-        await _remove_typing_reaction_safe(
+        await remove_typing_reaction_safe(
             connector,
             mid,
             acc,
@@ -263,10 +288,25 @@ async def _flush_pending_typing_cleanups(
         )
 
 
-# ----- 跟单:WS 转发 assistant -----
+async def inbound_fail_reply(
+    connector: Any,
+    reply_context: FeishuReplyContext,
+    *,
+    typing_placed: bool,
+    typing_emoji: str,
+    message: str,
+) -> None:
+    if typing_placed:
+        await remove_typing_immediate(
+            connector,
+            reply_context.message_id,
+            reply_context.account_id,
+            typing_emoji,
+        )
+    await connector.send_text(reply_context, message)
 
 
-async def _forward_one_assistant_to_feishu(
+async def forward_one_assistant_to_feishu(
     m: dict[str, Any],
     *,
     sent_sequences: set[int],
@@ -274,7 +314,7 @@ async def _forward_one_assistant_to_feishu(
     connector: Any,
     max_text_chars: int,
 ) -> None:
-    seq = _message_sequence(m)
+    seq = message_sequence(m)
     if seq is None or m.get("role") != "assistant":
         return
     if seq in sent_sequences:
@@ -282,14 +322,14 @@ async def _forward_one_assistant_to_feishu(
     if m.get("branch_type") == "reflection":
         sent_sequences.add(seq)
         return
-    if _assistant_content_has_tool_calls(m):
+    if assistant_content_has_tool_calls(m):
         sent_sequences.add(seq)
         return
-    body = _assistant_wire_to_feishu_text(m)
+    body = assistant_wire_to_feishu_text(m)
     if body is None:
         sent_sequences.add(seq)
         return
-    body = _truncate_for_im(body, max_text_chars)
+    body = truncate_for_im(body, max_text_chars)
     try:
         result = await connector.send_text(reply_ctx, body)
         if result.get("ok"):
@@ -300,7 +340,7 @@ async def _forward_one_assistant_to_feishu(
         logger.exception("feishu forward: send_text exception seq=%s", seq)
 
 
-async def _poll_assistants_to_feishu(
+async def poll_assistants_to_feishu(
     *,
     agent_base_url: str,
     trace_id: str,
@@ -313,11 +353,8 @@ async def _poll_assistants_to_feishu(
     max_text_chars: int,
     forward_assistants: bool = True,
     typing_emoji_for_cleanup: str = "Typing",
+    on_finished: FollowupFinishedCallback | None = None,
 ) -> None:
-    """
-    WebSocket 订阅直至终态;转发 ``message_added`` 中的 assistant。
-    WS 不可用时仅 ``GET /api/traces/{id}`` 轮询状态(结束跟单 + 清理 Typing),不拉 messages。
-    """
     if trace_id not in _assistant_sent_sequences:
         _assistant_sent_sequences[trace_id] = set()
     sent_sequences = _assistant_sent_sequences[trace_id]
@@ -330,7 +367,7 @@ async def _poll_assistants_to_feishu(
         import websockets
 
         ws = await websockets.connect(
-            _trace_watch_ws_url(base, trace_id),
+            trace_watch_ws_url(base, trace_id),
             max_size=10_000_000,
             ping_interval=20,
             ping_timeout=60,
@@ -341,13 +378,14 @@ async def _poll_assistants_to_feishu(
         ws = None
 
     forward_warned = False
+    exit_reason: str | None = None
 
     async def _dispatch_watch_event(data: dict[str, Any]) -> str:
         ev = data.get("event")
         if ev == "message_added" and forward_assistants:
             msg = data.get("message")
             if isinstance(msg, dict):
-                await _forward_one_assistant_to_feishu(
+                await forward_one_assistant_to_feishu(
                     msg,
                     sent_sequences=sent_sequences,
                     reply_ctx=reply_ctx,
@@ -356,7 +394,7 @@ async def _poll_assistants_to_feishu(
                 )
         if ev == "trace_status_changed":
             st = data.get("status")
-            if isinstance(st, str) and st in _TERMINAL_STATUSES:
+            if isinstance(st, str) and st in TERMINAL_STATUSES:
                 return st
         if ev == "trace_completed":
             return "completed"
@@ -370,32 +408,34 @@ async def _poll_assistants_to_feishu(
                     trace_id,
                     poll_max_seconds,
                 )
+                exit_reason = "timeout"
                 break
 
             status_hint = "running"
 
             if ws is not None:
+                stream = ws
                 try:
-                    raw = await asyncio.wait_for(ws.recv(), timeout=poll_interval)
+                    raw = await asyncio.wait_for(stream.recv(), timeout=poll_interval)
                 except asyncio.TimeoutError:
                     raw = None
                 except Exception as e:
                     logger.warning("feishu watch WS error, HTTP status fallback: %s", e)
                     try:
-                        await ws.close()
+                        await stream.close()
                     except Exception:
                         pass
                     ws = None
                     raw = None
 
                 while raw is not None:
-                    data = _watch_ws_payload_to_dict(raw)
+                    data = watch_ws_payload_to_dict(raw)
                     if data is not None:
                         st = await _dispatch_watch_event(data)
-                        if st in _TERMINAL_STATUSES:
+                        if st in TERMINAL_STATUSES:
                             status_hint = st
                     try:
-                        raw = await asyncio.wait_for(ws.recv(), timeout=0.001)
+                        raw = await asyncio.wait_for(stream.recv(), timeout=0.001)
                     except asyncio.TimeoutError:
                         raw = None
                     except Exception:
@@ -416,6 +456,7 @@ async def _poll_assistants_to_feishu(
                         tr = await client.get(f"{base}/api/traces/{trace_id}")
                         if tr.status_code == 404:
                             logger.warning("feishu watch: trace %s not found, stop", trace_id)
+                            exit_reason = "not_found"
                             break
                         if tr.status_code >= 400:
                             logger.warning(
@@ -427,14 +468,15 @@ async def _poll_assistants_to_feishu(
                             body = tr.json()
                             trace_obj = body.get("trace") or {}
                             st = str(trace_obj.get("status") or "running")
-                            if st in _TERMINAL_STATUSES:
+                            if st in TERMINAL_STATUSES:
                                 effective = st
                 except httpx.RequestError as exc:
                     logger.warning("feishu watch: HTTP status check error trace_id=%s err=%s", trace_id, exc)
 
-            if effective in _TERMINAL_STATUSES:
+            if effective in TERMINAL_STATUSES:
                 grace += 1
                 if grace >= terminal_grace_rounds:
+                    exit_reason = "terminal"
                     break
             else:
                 grace = 0
@@ -444,14 +486,23 @@ async def _poll_assistants_to_feishu(
                 await ws.close()
             except Exception:
                 pass
-        await _flush_pending_typing_cleanups(connector, trace_id, typing_emoji_for_cleanup)
+        await flush_pending_typing_cleanups(connector, trace_id, typing_emoji_for_cleanup)
         cur = asyncio.current_task()
         async with _poll_tasks_lock:
             if _poll_tasks.get(trace_id) is cur:
                 _ = _poll_tasks.pop(trace_id, None)
+        if on_finished is not None and exit_reason is not None:
+            try:
+                await on_finished(trace_id, exit_reason)
+            except Exception:
+                logger.exception(
+                    "feishu watch: on_finished failed trace_id=%s reason=%s",
+                    trace_id,
+                    exit_reason,
+                )
 
 
-def _schedule_trace_followup(
+def schedule_trace_followup(
     *,
     agent_base_url: str,
     trace_id: str,
@@ -464,11 +515,10 @@ def _schedule_trace_followup(
     max_text_chars: int,
     forward_assistants: bool,
     typing_emoji: str,
+    on_finished: FollowupFinishedCallback | None = None,
 ) -> None:
-    """同一 trace 仅保留一个活跃跟单任务。"""
-
     async def _runner() -> None:
-        await _poll_assistants_to_feishu(
+        await poll_assistants_to_feishu(
             agent_base_url=agent_base_url,
             trace_id=trace_id,
             reply_ctx=reply_context,
@@ -480,6 +530,7 @@ def _schedule_trace_followup(
             max_text_chars=max_text_chars,
             forward_assistants=forward_assistants,
             typing_emoji_for_cleanup=typing_emoji,
+            on_finished=on_finished,
         )
 
     async def _spawn() -> None:
@@ -496,30 +547,13 @@ def _schedule_trace_followup(
     _ = loop.create_task(_spawn())
 
 
-# ----- 入站:提交 Agent -----
-
-
-async def _inbound_fail_reply(
-    connector: Any,
-    reply_context: FeishuReplyContext,
-    *,
-    typing_placed: bool,
-    typing_emoji: str,
-    message: str,
-) -> None:
-    """错误路径:先摘 Typing(若曾加上),再向用户发送说明。"""
-    if typing_placed:
-        await _remove_typing_immediate(
-            connector,
-            reply_context.message_id,
-            reply_context.account_id,
-            typing_emoji,
-        )
-    await connector.send_text(reply_context, message)
+# =============================================================================
+# 4. FeishuHttpRunApiExecutor
+# =============================================================================
 
 
 class FeishuHttpRunApiExecutor:
-    """调用 Agent Trace HTTP APIWebSocket 将 assistant 转发到飞书。"""
+    """调用 Agent Trace HTTP API;WebSocket 将 assistant 转发到飞书。"""
 
     def __init__(
         self,
@@ -539,6 +573,13 @@ class FeishuHttpRunApiExecutor:
         assistant_max_text_chars: int = 8000,
         typing_reaction_enabled: bool = True,
         typing_reaction_emoji: str = "Typing",
+        workspace_manager: WorkspaceManager | None = None,
+        workspace_prefix: str = "feishu",
+        channel_id: str = "feishu",
+        lifecycle_trace_backend: LifecycleTraceBackend | None = None,
+        stop_container_on_trace_terminal: bool = True,
+        stop_container_on_trace_not_found: bool = True,
+        release_ref_on_trace_terminal: bool = False,
     ) -> None:
         self._base = base_url.rstrip("/")
         self._timeout = timeout
@@ -555,25 +596,42 @@ class FeishuHttpRunApiExecutor:
         self._assistant_max_chars = assistant_max_text_chars
         self._typing_reaction_enabled = typing_reaction_enabled
         self._typing_emoji = typing_reaction_emoji
-        self._map_lock = asyncio.Lock()
-        self._api_trace_by_user: dict[str, str] = {}
+        self._workspace_manager = workspace_manager
+        self._workspace_prefix = workspace_prefix
+        self._channel_id = channel_id
+        self._lifecycle_trace_backend = lifecycle_trace_backend
+        self._stop_container_on_trace_terminal = stop_container_on_trace_terminal
+        self._stop_container_on_trace_not_found = stop_container_on_trace_not_found
+        self._release_ref_on_trace_terminal = release_ref_on_trace_terminal
+
+    def _gateway_exec_for_user(self, user_id: str) -> dict[str, Any] | None:
+        wm = self._workspace_manager
+        if wm is None:
+            return None
+        wid = f"{self._workspace_prefix}:{user_id}"
+        cid = wm.get_workspace_container_id(wid)
+        if not cid:
+            return None
+        return {
+            "docker_container": cid,
+            "container_user": "agent",
+            "container_workdir": "/home/agent/workspace",
+        }
 
     async def handle_inbound_message(
         self,
-        trace_id: str,
+        existing_agent_trace_id: str,
         text: str,
         reply_context: FeishuReplyContext,
         connector: Any,
         *,
         event: IncomingFeishuEvent,
-    ) -> str:
-        _ = trace_id
+    ) -> tuple[str, str]:
         user_id = self._identity.resolve_user_id(event)
-        content = _append_feishu_context_block(text, event, reply_context)
+        content = append_feishu_context_block(text, event, reply_context)
         task_id = f"task-{uuid.uuid4()}"
 
         typing_placed = False
-        # 仅对用户发来的 IM 消息打「输入中」表情;卡片交互 / 表情续跑等事件的 message_id 常为机器人消息,避免对其加 reaction。
         if (
             self._typing_reaction_enabled
             and reply_context.message_id
@@ -598,99 +656,97 @@ class FeishuHttpRunApiExecutor:
                     reply_context.message_id,
                 )
 
-        async with self._map_lock:
-            api_trace_id = self._api_trace_by_user.get(user_id)
-
-        feishu_adapter = _feishu_adapter_payload(event, reply_context)
+        api_trace_id = normalized_agent_trace_id(existing_agent_trace_id)
+        adapter = feishu_adapter_payload(event, reply_context)
+        gateway_exec = self._gateway_exec_for_user(user_id)
 
         try:
             async with httpx.AsyncClient(timeout=self._timeout) as client:
                 if api_trace_id is None:
-                    resp = await client.post(
-                        f"{self._base}/api/traces",
-                        json={
-                            "messages": [{"role": "user", "content": content}],
-                            "model": self._model,
-                            "temperature": self._temperature,
-                            "max_iterations": self._max_iterations,
-                            "uid": user_id,
-                            "name": f"feishu-{user_id}",
-                            "feishu_adapter": feishu_adapter,
-                        },
-                    )
+                    body: dict[str, Any] = {
+                        "messages": [{"role": "user", "content": content}],
+                        "model": self._model,
+                        "temperature": self._temperature,
+                        "max_iterations": self._max_iterations,
+                        "uid": user_id,
+                        "name": f"feishu-{user_id}",
+                        "feishu_adapter": adapter,
+                    }
+                    if gateway_exec:
+                        body["gateway_exec"] = gateway_exec
+                    resp = await client.post(f"{self._base}/api/traces", json=body)
                 else:
+                    body = {
+                        "messages": [{"role": "user", "content": content}],
+                        "feishu_adapter": adapter,
+                    }
+                    if gateway_exec:
+                        body["gateway_exec"] = gateway_exec
                     resp = await client.post(
                         f"{self._base}/api/traces/{api_trace_id}/run",
-                        json={
-                            "messages": [{"role": "user", "content": content}],
-                            "feishu_adapter": feishu_adapter,
-                        },
+                        json=body,
                     )
         except httpx.RequestError as exc:
             logger.exception("FeishuHttpRunApiExecutor: Agent API 请求失败 user_id=%s", user_id)
-            await _inbound_fail_reply(
+            await inbound_fail_reply(
                 connector,
                 reply_context,
                 typing_placed=typing_placed,
                 typing_emoji=self._typing_emoji,
                 message=f"[Gateway] 无法连接 Agent API({self._base}):{exc}",
             )
-            return task_id
+            return task_id, ""
 
         body_text = resp.text
         if resp.status_code == 409:
-            await _inbound_fail_reply(
+            await inbound_fail_reply(
                 connector,
                 reply_context,
                 typing_placed=typing_placed,
                 typing_emoji=self._typing_emoji,
                 message="[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。",
             )
-            return task_id
+            return task_id, ""
 
         if resp.status_code >= 400:
-            err = _format_api_error(resp.status_code, body_text)
+            err = format_api_error(resp.status_code, body_text)
             logger.warning(
                 "FeishuHttpRunApiExecutor: API 错误 status=%s user_id=%s detail=%s",
                 resp.status_code,
                 user_id,
                 err,
             )
-            await _inbound_fail_reply(
+            await inbound_fail_reply(
                 connector,
                 reply_context,
                 typing_placed=typing_placed,
                 typing_emoji=self._typing_emoji,
                 message=f"[Gateway] Agent 启动失败({resp.status_code}):{err}",
             )
-            return task_id
+            return task_id, ""
 
         try:
             data = resp.json()
         except Exception:
-            await _inbound_fail_reply(
+            await inbound_fail_reply(
                 connector,
                 reply_context,
                 typing_placed=typing_placed,
                 typing_emoji=self._typing_emoji,
                 message="[Gateway] Agent API 返回非 JSON,已放弃解析。",
             )
-            return task_id
+            return task_id, ""
 
         resolved_id = data.get("trace_id")
         if not isinstance(resolved_id, str) or not resolved_id:
-            await _inbound_fail_reply(
+            await inbound_fail_reply(
                 connector,
                 reply_context,
                 typing_placed=typing_placed,
                 typing_emoji=self._typing_emoji,
                 message="[Gateway] Agent API 响应缺少 trace_id。",
             )
-            return task_id
-
-        async with self._map_lock:
-            if user_id not in self._api_trace_by_user:
-                self._api_trace_by_user[user_id] = resolved_id
+            return task_id, ""
 
         if self._notify:
             await connector.send_text(
@@ -701,14 +757,40 @@ class FeishuHttpRunApiExecutor:
         if typing_placed:
             user_mid = reply_context.message_id
             if user_mid:
-                await _register_pending_typing_cleanup(
+                await register_pending_typing_cleanup(
                     resolved_id,
                     user_mid,
                     reply_context.account_id,
                 )
 
         if self._poll_assistants or typing_placed:
-            _schedule_trace_followup(
+            wid = f"{self._workspace_prefix}:{user_id}"
+
+            async def _on_followup_finished(tid: str, reason: str) -> None:
+                if tid != resolved_id:
+                    return
+                wm = self._workspace_manager
+                if wm is None:
+                    return
+                stop = False
+                if reason == "terminal" and self._stop_container_on_trace_terminal:
+                    stop = True
+                elif reason == "not_found" and self._stop_container_on_trace_not_found:
+                    stop = True
+                if stop:
+                    await wm.stop_workspace_sandbox(wid)
+                if (
+                    reason == "terminal"
+                    and self._release_ref_on_trace_terminal
+                    and self._lifecycle_trace_backend is not None
+                ):
+                    await self._lifecycle_trace_backend.forget_trace_binding(
+                        self._channel_id,
+                        user_id,
+                        workspace_id=wid,
+                    )
+
+            schedule_trace_followup(
                 agent_base_url=self._base,
                 trace_id=resolved_id,
                 reply_context=copy(reply_context),
@@ -720,6 +802,7 @@ class FeishuHttpRunApiExecutor:
                 max_text_chars=self._assistant_max_chars,
                 forward_assistants=self._poll_assistants,
                 typing_emoji=self._typing_emoji,
+                on_finished=_on_followup_finished,
             )
 
-        return task_id
+        return task_id, resolved_id

+ 37 - 26
gateway/core/channels/feishu/manager.py

@@ -1,17 +1,17 @@
 from __future__ import annotations
 
-import os
 from collections.abc import Mapping
 from dataclasses import dataclass
 from typing import Any
 
-from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
+from gateway.core.channels.feishu.bridge import FeishuHttpRunApiExecutor
 from gateway.core.channels.feishu.connector import FeishuConnector, WebhookParseError
-from gateway.core.channels.feishu.http_run_executor import FeishuHttpRunApiExecutor
 from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.manager import ChannelRegistry
 from gateway.core.channels.types import RouteResult
+from gateway.core.lifecycle import LifecycleTraceBackend, TraceManager, WorkspaceManager
+from utils.env_parse import env_bool, env_float, env_int, env_str
 
 
 @dataclass
@@ -39,6 +39,10 @@ class FeishuChannelConfig:
     assistant_max_text_chars: int = 8000
     typing_reaction_enabled: bool = True
     typing_reaction_emoji: str = "Typing"
+    # Trace 跟单结束后的生命周期(Workspace 沙箱 / 渠道绑定)
+    stop_container_on_trace_terminal: bool = True
+    stop_container_on_trace_not_found: bool = True
+    release_ref_on_trace_terminal: bool = False
 
 
 class FeishuChannelManager(ChannelRegistry):
@@ -53,7 +57,9 @@ class FeishuChannelManager(ChannelRegistry):
             feishu_http_base_url=self._config.feishu_http_base_url,
             timeout=self._config.http_timeout,
         )
-        self._trace_backend = MemoryTraceBackend()
+        self._workspace_manager = WorkspaceManager.from_env()
+        self._trace_manager = TraceManager.from_env(self._workspace_manager)
+        self._trace_backend = LifecycleTraceBackend(self._trace_manager)
         self._identity = DefaultUserIdentityResolver()
         self._executor = FeishuHttpRunApiExecutor(
             base_url=self._config.agent_api_base_url,
@@ -71,6 +77,13 @@ class FeishuChannelManager(ChannelRegistry):
             assistant_max_text_chars=self._config.assistant_max_text_chars,
             typing_reaction_enabled=self._config.typing_reaction_enabled,
             typing_reaction_emoji=self._config.typing_reaction_emoji,
+            workspace_manager=self._workspace_manager,
+            workspace_prefix=self._config.workspace_prefix,
+            channel_id=self._config.channel_id,
+            lifecycle_trace_backend=self._trace_backend,
+            stop_container_on_trace_terminal=self._config.stop_container_on_trace_terminal,
+            stop_container_on_trace_not_found=self._config.stop_container_on_trace_not_found,
+            release_ref_on_trace_terminal=self._config.release_ref_on_trace_terminal,
         )
         self._router = FeishuMessageRouter(
             connector=self._connector,
@@ -101,28 +114,26 @@ class FeishuChannelManager(ChannelRegistry):
         """从环境变量构造实例(与 docker-compose / .env 配合)。"""
         return cls(
             FeishuChannelConfig(
-                feishu_http_base_url=os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").strip(),
-                http_timeout=float(os.getenv("FEISHU_HTTP_TIMEOUT", "120")),
-                dispatch_reactions=os.getenv("CHANNELS_DISPATCH_REACTIONS", "false").lower() in ("1", "true", "yes"),
-                dispatch_card_actions=os.getenv("CHANNELS_DISPATCH_CARD_ACTIONS", "true").lower()
-                in ("1", "true", "yes"),
-                agent_api_base_url=os.getenv("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000").strip(),
-                agent_run_model=os.getenv("FEISHU_AGENT_RUN_MODEL", "qwen3.5-flash").strip(),
-                agent_run_max_iterations=int(os.getenv("FEISHU_AGENT_RUN_MAX_ITERATIONS", "200")),
-                agent_run_temperature=float(os.getenv("FEISHU_AGENT_RUN_TEMPERATURE", "0.3")),
-                feishu_run_notify_on_submit=os.getenv("CHANNELS_FEISHU_RUN_NOTIFY", "true").lower()
-                in ("1", "true", "yes"),
-                poll_assistant_messages=os.getenv("FEISHU_AGENT_POLL_ASSISTANTS", "true").lower()
-                in ("1", "true", "yes"),
-                poll_interval_seconds=float(os.getenv("FEISHU_AGENT_POLL_INTERVAL", "1.0")),
-                poll_request_timeout=float(os.getenv("FEISHU_AGENT_POLL_REQUEST_TIMEOUT", "30")),
-                poll_terminal_grace_rounds=int(os.getenv("FEISHU_AGENT_POLL_GRACE_ROUNDS", "2")),
-                poll_max_seconds=float(os.getenv("FEISHU_AGENT_POLL_MAX_SECONDS", "0")),
-                assistant_max_text_chars=int(os.getenv("FEISHU_AGENT_ASSISTANT_MAX_CHARS", "8000")),
-                typing_reaction_enabled=os.getenv("FEISHU_TYPING_REACTION", "true").lower()
-                in ("1", "true", "yes"),
-                typing_reaction_emoji=os.getenv("FEISHU_TYPING_REACTION_EMOJI", "Typing").strip()
-                or "Typing",
+                feishu_http_base_url=env_str("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380"),
+                http_timeout=env_float("FEISHU_HTTP_TIMEOUT", 120.0),
+                dispatch_reactions=env_bool("CHANNELS_DISPATCH_REACTIONS", False),
+                dispatch_card_actions=env_bool("CHANNELS_DISPATCH_CARD_ACTIONS", True),
+                agent_api_base_url=env_str("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000"),
+                agent_run_model=env_str("FEISHU_AGENT_RUN_MODEL", "qwen3.5-flash"),
+                agent_run_max_iterations=env_int("FEISHU_AGENT_RUN_MAX_ITERATIONS", 200),
+                agent_run_temperature=env_float("FEISHU_AGENT_RUN_TEMPERATURE", 0.3),
+                feishu_run_notify_on_submit=env_bool("CHANNELS_FEISHU_RUN_NOTIFY", True),
+                poll_assistant_messages=env_bool("FEISHU_AGENT_POLL_ASSISTANTS", True),
+                poll_interval_seconds=env_float("FEISHU_AGENT_POLL_INTERVAL", 1.0),
+                poll_request_timeout=env_float("FEISHU_AGENT_POLL_REQUEST_TIMEOUT", 30.0),
+                poll_terminal_grace_rounds=env_int("FEISHU_AGENT_POLL_GRACE_ROUNDS", 2),
+                poll_max_seconds=env_float("FEISHU_AGENT_POLL_MAX_SECONDS", 0.0),
+                assistant_max_text_chars=env_int("FEISHU_AGENT_ASSISTANT_MAX_CHARS", 8000),
+                typing_reaction_enabled=env_bool("FEISHU_TYPING_REACTION", True),
+                typing_reaction_emoji=env_str("FEISHU_TYPING_REACTION_EMOJI", "Typing") or "Typing",
+                stop_container_on_trace_terminal=env_bool("GATEWAY_WORKSPACE_STOP_ON_TRACE_TERMINAL", True),
+                stop_container_on_trace_not_found=env_bool("GATEWAY_WORKSPACE_STOP_ON_TRACE_NOT_FOUND", True),
+                release_ref_on_trace_terminal=env_bool("GATEWAY_LIFECYCLE_RELEASE_REF_ON_TRACE_TERMINAL", False),
             )
         )
 

+ 35 - 0
gateway/core/channels/feishu/protocols.py

@@ -0,0 +1,35 @@
+"""飞书渠道在通用 ``ExecutorBackend`` / ``UserIdentityResolver`` 上的类型收窄。"""
+
+from __future__ import annotations
+
+from typing import Any, Protocol, runtime_checkable
+
+from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
+from gateway.core.channels.protocols import ExecutorBackend, UserIdentityResolver
+
+__all__ = ["FeishuExecutorBackend", "FeishuUserIdentityResolver"]
+
+
+@runtime_checkable
+class FeishuExecutorBackend(ExecutorBackend, Protocol):
+    """飞书执行器——窄化 ``ExecutorBackend`` 的参数类型为飞书专属结构。"""
+
+    async def handle_inbound_message(
+        self,
+        existing_agent_trace_id: str,
+        text: str,
+        reply_context: FeishuReplyContext,
+        connector: Any,
+        *,
+        event: IncomingFeishuEvent,
+    ) -> tuple[str, str]:
+        """返回 ``(task_id, agent_trace_id)``;失败时 ``agent_trace_id`` 为空。"""
+        ...
+
+
+@runtime_checkable
+class FeishuUserIdentityResolver(UserIdentityResolver, Protocol):
+    """飞书用户身份解析器——窄化 ``UserIdentityResolver`` 的事件类型为 ``IncomingFeishuEvent``。"""
+
+    def resolve_user_id(self, event: IncomingFeishuEvent) -> str:
+        ...

+ 65 - 52
gateway/core/channels/feishu/router.py

@@ -1,24 +1,29 @@
+"""飞书入站事件路由:Trace 会话准备、Executor 提交、绑定 Agent ``trace_id``。"""
+
 from __future__ import annotations
 
 import logging
 from collections.abc import Mapping
-from typing import Any, Protocol, runtime_checkable
+from typing import Any
 
 from gateway.core.channels.feishu.connector import FeishuConnector
+from gateway.core.channels.feishu.protocols import (
+    FeishuExecutorBackend,
+    FeishuUserIdentityResolver,
+)
 from gateway.core.channels.feishu.types import (
     FeishuReplyContext,
     IncomingFeishuEvent,
     feishu_event_to_mapping,
 )
-from gateway.core.channels.manager import TraceBackend
-from gateway.core.channels.protocols import ExecutorBackend, UserIdentityResolver
+from gateway.core.channels.protocols import TraceBackend
 from gateway.core.channels.router import ChannelTraceRouter
 from gateway.core.channels.types import CHANNEL_FEISHU, RouteResult
 
 logger = logging.getLogger(__name__)
 
 
-def _routing_from_card_action_raw(raw: dict[str, Any]) -> tuple[str | None, str | None, str | None]:
+def routing_from_card_action_raw(raw: dict[str, Any]) -> tuple[str | None, str | None, str | None]:
     """
     当规范化 JSON 未带 chat_id 时,从飞书 card.action.trigger 原始体兜底解析。
     常见路径:event.context.open_chat_id / open_message_id(或顶层 open_chat_id)。
@@ -44,37 +49,19 @@ def _routing_from_card_action_raw(raw: dict[str, Any]) -> tuple[str | None, str
     return chat_id, message_id, chat_type
 
 
-@runtime_checkable
-class FeishuExecutorBackend(ExecutorBackend, Protocol):
-    """飞书执行器——窄化 ``ExecutorBackend`` 的参数类型为飞书专属结构。"""
-
-    async def handle_inbound_message(
-        self,
-        trace_id: str,
-        text: str,
-        reply_context: FeishuReplyContext,
-        connector: Any,
-        *,
-        event: IncomingFeishuEvent,
-    ) -> str:
-        """返回 task_id 或占位 id。"""
-        ...
-
-
-@runtime_checkable
-class FeishuUserIdentityResolver(UserIdentityResolver, Protocol):
-    """飞书用户身份解析器——窄化 ``UserIdentityResolver`` 的事件类型为 ``IncomingFeishuEvent``。"""
-
-    def resolve_user_id(self, event: IncomingFeishuEvent) -> str:
-        ...
+def as_opt_str(v: Any) -> str | None:
+    if v is None:
+        return None
+    s = str(v)
+    return s if s else None
 
 
 class FeishuMessageRouter(ChannelTraceRouter):
     """
-    飞书消息路由:用户 → trace_id → Executor;与 channels.md 中 MessageRouter 一致
+    飞书消息路由:prepare_session → Executor → bind_agent_trace_id。
 
-    非 message 事件:reaction / card_action 由 ``dispatch_*`` 控制是否续跑 Agent。
-    card_action 常用于 OAuth / 权限卡片点击后触发继续流程(须开启 ``dispatch_card_actions``)
+    ``reaction`` / ``card_action`` 是否触发续跑由 ``dispatch_*`` 控制;
+    ``card_action`` 多用于 OAuth 等卡片交互后让 Agent 继续执行。
     """
 
     def __init__(
@@ -106,7 +93,7 @@ class FeishuMessageRouter(ChannelTraceRouter):
         chat_id = event.chat_id
         message_id = event.message_id
         if not chat_id and event.event_type == "card_action":
-            c, m, _ = _routing_from_card_action_raw(event.raw)
+            c, m, _ = routing_from_card_action_raw(event.raw)
             chat_id = chat_id or c
             message_id = message_id or m
         if not chat_id:
@@ -159,34 +146,45 @@ class FeishuMessageRouter(ChannelTraceRouter):
         if not self._auto_create:
             return RouteResult(ok=False, error="auto_create_trace_disabled", user_id=user_id)
 
-        trace_id = await self._trace.get_or_create_trace(
+        meta = feishu_event_to_mapping(event)
+        await self._trace.prepare_session(
             channel=CHANNEL_FEISHU,
             user_id=user_id,
             workspace_id=workspace_id,
             agent_type=self._agent_type,
-            metadata=feishu_event_to_mapping(event),
+            metadata=meta,
         )
+        existing_agent_trace_id = await self._trace.get_existing_trace_id(CHANNEL_FEISHU, user_id)
 
         ctx = self._reply_context_from_event(event)
         if ctx is None:
             return RouteResult(
                 ok=False,
                 error="missing_chat_id_for_reply",
-                trace_id=trace_id,
+                trace_id=existing_agent_trace_id or "",
                 user_id=user_id,
                 workspace_id=workspace_id,
             )
 
-        task_id = await self._executor.handle_inbound_message(
-            trace_id,
+        task_id, agent_trace_id = await self._executor.handle_inbound_message(
+            existing_agent_trace_id or "",
             text or "",
             ctx,
             self._connector,
             event=event,
         )
+        if agent_trace_id:
+            await self._trace.bind_agent_trace_id(
+                channel=CHANNEL_FEISHU,
+                user_id=user_id,
+                workspace_id=workspace_id,
+                agent_trace_id=agent_trace_id,
+                agent_type=self._agent_type,
+                metadata=meta,
+            )
         return RouteResult(
             ok=True,
-            trace_id=trace_id,
+            trace_id=agent_trace_id or existing_agent_trace_id or "",
             task_id=task_id,
             user_id=user_id,
             workspace_id=workspace_id,
@@ -198,13 +196,22 @@ class FeishuMessageRouter(ChannelTraceRouter):
         (account_id, app_id, chat_id, message_id, open_id)。
         """
         text = str(message.get("text") or message.get("content") or "")
-        trace_id = await self.get_trace_id(channel, user_id)
+        workspace_id = self._workspace_id_for_user(user_id)
+        meta = dict(message) if isinstance(message, dict) else {}
+        await self._trace.prepare_session(
+            channel=channel,
+            user_id=user_id,
+            workspace_id=workspace_id,
+            agent_type=self._agent_type,
+            metadata=meta,
+        )
+        existing = await self._trace.get_existing_trace_id(channel, user_id)
         ctx = FeishuReplyContext(
-            account_id=_as_opt_str(message.get("account_id")),
+            account_id=as_opt_str(message.get("account_id")),
             app_id=str(message.get("app_id") or ""),
             chat_id=str(message.get("chat_id") or ""),
-            message_id=_as_opt_str(message.get("message_id")),
-            open_id=_as_opt_str(message.get("open_id")),
+            message_id=as_opt_str(message.get("message_id")),
+            open_id=as_opt_str(message.get("open_id")),
         )
         if not ctx.app_id or not ctx.chat_id:
             raise ValueError("route_message requires app_id and chat_id in message for Feishu reply")
@@ -213,15 +220,25 @@ class FeishuMessageRouter(ChannelTraceRouter):
             app_id=ctx.app_id,
             account_id=ctx.account_id,
             open_id=ctx.open_id,
-            chat_type=_as_opt_str(message.get("chat_type")),
+            chat_type=as_opt_str(message.get("chat_type")),
             chat_id=ctx.chat_id,
             message_id=ctx.message_id,
             content=text,
             raw=dict(message) if isinstance(message, dict) else {},
         )
-        return await self._executor.handle_inbound_message(
-            trace_id, text, ctx, self._connector, event=synthetic
+        task_id, agent_trace_id = await self._executor.handle_inbound_message(
+            existing or "", text, ctx, self._connector, event=synthetic
         )
+        if agent_trace_id:
+            await self._trace.bind_agent_trace_id(
+                channel=channel,
+                user_id=user_id,
+                workspace_id=workspace_id,
+                agent_trace_id=agent_trace_id,
+                agent_type=self._agent_type,
+                metadata=meta,
+            )
+        return task_id
 
     async def send_agent_reply(
         self,
@@ -236,11 +253,11 @@ class FeishuMessageRouter(ChannelTraceRouter):
         """
         meta = dict(metadata or {})
         ctx = FeishuReplyContext(
-            account_id=_as_opt_str(meta.get("account_id")),
+            account_id=as_opt_str(meta.get("account_id")),
             app_id=str(meta.get("app_id") or ""),
             chat_id=str(meta.get("chat_id") or ""),
-            message_id=_as_opt_str(meta.get("message_id")),
-            open_id=_as_opt_str(meta.get("open_id")),
+            message_id=as_opt_str(meta.get("message_id")),
+            open_id=as_opt_str(meta.get("open_id")),
         )
         if not ctx.chat_id:
             return {"ok": False, "error": "metadata missing chat_id", "trace_id": trace_id}
@@ -248,8 +265,4 @@ class FeishuMessageRouter(ChannelTraceRouter):
         return await self._connector.send_text(ctx, content)
 
 
-def _as_opt_str(v: Any) -> str | None:
-    if v is None:
-        return None
-    s = str(v)
-    return s if s else None
+__all__ = ["FeishuMessageRouter", "routing_from_card_action_raw", "as_opt_str"]

+ 1 - 19
gateway/core/channels/manager.py

@@ -3,22 +3,6 @@ from __future__ import annotations
 from typing import Protocol, runtime_checkable
 
 
-@runtime_checkable
-class TraceBackend(Protocol):
-    """与 Lifecycle.TraceManager 对接前的抽象:按渠道用户解析 trace_id。"""
-
-    async def get_or_create_trace(
-        self,
-        *,
-        channel: str,
-        user_id: str,
-        workspace_id: str,
-        agent_type: str,
-        metadata: dict[str, object],
-    ) -> str:
-        ...
-
-
 @runtime_checkable
 class ChannelRegistration(Protocol):
     """注册到 ``ChannelRegistry`` 的渠道配置需至少提供 ``enabled``。"""
@@ -27,9 +11,7 @@ class ChannelRegistration(Protocol):
 
 
 class ChannelRegistry:
-    """
-    与具体 IM 无关:渠道注册、启停、状态查询。
-    """
+    """渠道注册、启停、状态查询(与具体 IM 无关)。"""
 
     def __init__(self) -> None:
         self._registry: dict[str, ChannelRegistration] = {}

+ 35 - 3
gateway/core/channels/protocols.py

@@ -2,7 +2,7 @@
 渠道层通用 Protocol——所有 IM 渠道(飞书、微信等)共用的最小接口约定。
 
 各渠道可在自己的模块中声明更严格的子 Protocol(窄化参数类型),
-但 backends/ 下的通用实现只需满足此处的宽松签名即可跨渠道复用。
+实现 TraceBackend / ExecutorBackend 时只需满足此处的宽松签名即可跨渠道复用。
 """
 
 from __future__ import annotations
@@ -13,6 +13,37 @@ if TYPE_CHECKING:
     from fastapi import APIRouter
 
 
+@runtime_checkable
+class TraceBackend(Protocol):
+    """渠道会话与 Agent ``trace_id``:先准备 Workspace,再在 API 返回后绑定。"""
+
+    async def prepare_session(
+        self,
+        *,
+        channel: str,
+        user_id: str,
+        workspace_id: str,
+        agent_type: str,
+        metadata: dict[str, object],
+    ) -> None:
+        ...
+
+    async def get_existing_trace_id(self, channel: str, user_id: str) -> str | None:
+        ...
+
+    async def bind_agent_trace_id(
+        self,
+        *,
+        channel: str,
+        user_id: str,
+        workspace_id: str,
+        agent_trace_id: str,
+        agent_type: str,
+        metadata: dict[str, object],
+    ) -> None:
+        ...
+
+
 @runtime_checkable
 class UserIdentityResolver(Protocol):
     """将渠道入站事件映射为网关内统一 user_id。
@@ -35,13 +66,14 @@ class ExecutorBackend(Protocol):
 
     async def handle_inbound_message(
         self,
-        trace_id: str,
+        existing_agent_trace_id: str,
         text: str,
         reply_context: Any,
         connector: Any,
         *,
         event: Any,
-    ) -> str:
+    ) -> tuple[str, str]:
+        """返回 ``(task_id, agent_trace_id)``;无已绑定 trace 时 ``existing_agent_trace_id`` 传空串。"""
         ...
 
 

+ 9 - 10
gateway/core/channels/router.py

@@ -1,11 +1,11 @@
 from __future__ import annotations
 
-from gateway.core.channels.manager import TraceBackend
+from gateway.core.channels.protocols import TraceBackend
 
 
 class ChannelTraceRouter:
     """
-    与具体 IM 无关:按渠道 user_id 解析 workspace_id,并委托 TraceBackend 获取/创建 trace
+    与具体 IM 无关:按渠道 user_id 解析 workspace_id,并委托 TraceBackend 与 Agent ``trace_id`` 对齐
 
     飞书等渠道的入站消息路由见 ``gateway.core.channels.feishu.router.FeishuMessageRouter``。
     """
@@ -25,15 +25,14 @@ class ChannelTraceRouter:
         return f"{self._workspace_prefix}:{user_id}"
 
     async def get_trace_id(self, channel: str, user_id: str, *, create_if_missing: bool = True) -> str:
-        """获取或创建 Trace ID(对应 channels.md get_trace_id / create_trace_for_user 语义)。"""
+        """返回已绑定的 Agent trace_id;不存在时除非 ``create_if_missing=False`` 否则抛错(不再预分配 UUID)。"""
+        tid = await self._trace.get_existing_trace_id(channel, user_id)
+        if tid:
+            return tid
         if not create_if_missing:
-            raise NotImplementedError("仅内存后端支持 create_if_missing=False 时需扩展 TraceBackend")
-        return await self._trace.get_or_create_trace(
-            channel=channel,
-            user_id=user_id,
-            workspace_id=self._workspace_id_for_user(user_id),
-            agent_type=self._agent_type,
-            metadata={"source": "channels.channel_trace_router"},
+            raise NotImplementedError("无已绑定 trace_id 且 create_if_missing=False")
+        raise RuntimeError(
+            "尚无已绑定的 Agent trace_id:请先完成一次渠道入站(executor 成功返回后再 bind)。"
         )
 
     async def create_trace_for_user(self, channel: str, user_id: str) -> str:

+ 16 - 0
gateway/core/lifecycle/__init__.py

@@ -0,0 +1,16 @@
+"""
+Gateway Agent 生命周期:Workspace(含 Docker 沙箱容器)、Trace 元数据代理、配置热重载。
+
+子包:``workspace``(目录与容器)、``trace``(Agent trace 代理与 ``TraceBackend``)。
+"""
+
+from gateway.core.lifecycle.config_watcher import ConfigWatcher
+from gateway.core.lifecycle.trace import LifecycleTraceBackend, TraceManager
+from gateway.core.lifecycle.workspace import WorkspaceManager
+
+__all__ = [
+    "ConfigWatcher",
+    "LifecycleTraceBackend",
+    "TraceManager",
+    "WorkspaceManager",
+]

+ 144 - 0
gateway/core/lifecycle/config_watcher.py

@@ -0,0 +1,144 @@
+"""
+监听 Workspace 下技能/配置目录变化并触发热重载回调(不影响已在跑的 Trace 执行线程,
+仅通知上层重新加载配置)。
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import os
+from collections.abc import Callable, Coroutine
+from pathlib import Path
+from typing import Any
+
+logger = logging.getLogger(__name__)
+
+Callback = Callable[[str, list[str]], Coroutine[Any, Any, None]]
+
+
+class ConfigWatcher:
+    def __init__(self, debounce_seconds: float = 0.5) -> None:
+        self._debounce = debounce_seconds
+        self._watchers: dict[str, Any] = {}
+        self._tasks: dict[str, asyncio.Task[None]] = {}
+        self._lock = asyncio.Lock()
+
+    @classmethod
+    def from_env(cls) -> ConfigWatcher:
+        debounce = float(os.getenv("GATEWAY_CONFIG_WATCH_DEBOUNCE", "0.5"))
+        return cls(debounce_seconds=debounce)
+
+    async def watch(self, workspace_id: str, workspace_path: str, callback: Callback) -> None:
+        """监听 ``workspace_path`` 下常见技能目录;``callback(workspace_id, changed_paths)``。"""
+        async with self._lock:
+            await self.stop_watch(workspace_id)
+            root = Path(workspace_path)
+            watch_roots = [root, root / "skills", root / "skills-config"]
+            existing = [p for p in watch_roots if p.is_dir()]
+            if not existing:
+                existing = [root]
+                root.mkdir(parents=True, exist_ok=True)
+
+            try:
+                from watchdog.events import FileSystemEventHandler
+                from watchdog.observers import Observer
+            except ImportError:
+                logger.warning("未安装 watchdog,ConfigWatcher 使用轮询降级模式")
+                task = asyncio.create_task(
+                    self._poll_loop(workspace_id, workspace_path, callback),
+                    name=f"config-watch-poll-{workspace_id}",
+                )
+                self._tasks[workspace_id] = task
+                return
+
+            loop = asyncio.get_running_loop()
+            debounce = self._debounce
+            active: list[asyncio.Task[None] | None] = [None]
+
+            class _Handler(FileSystemEventHandler):
+                def on_any_event(self, event):  # type: ignore[no-untyped-def]
+                    if event.is_directory:
+                        return
+                    src = getattr(event, "src_path", None)
+                    if not src:
+                        return
+                    src_s = str(src)
+
+                    async def _fire() -> None:
+                        await asyncio.sleep(debounce)
+                        try:
+                            await callback(workspace_id, [src_s])
+                        except Exception:
+                            logger.exception("ConfigWatcher 回调失败 workspace_id=%s", workspace_id)
+
+                    cur = active[0]
+                    if cur is not None and not cur.done():
+                        cur.cancel()
+                    active[0] = loop.create_task(_fire())
+
+            handler = _Handler()
+            observer = Observer()
+            for p in existing:
+                try:
+                    observer.schedule(handler, str(p), recursive=True)
+                except Exception as e:
+                    logger.warning("ConfigWatcher 无法监听 %s: %s", p, e)
+            observer.start()
+            self._watchers[workspace_id] = observer
+            logger.info("ConfigWatcher 已启动 workspace_id=%s paths=%s", workspace_id, existing)
+
+    async def _poll_loop(self, workspace_id: str, workspace_path: str, callback: Callback) -> None:
+        root = Path(workspace_path)
+        known: dict[str, float] = {}
+
+        def scan(*, initial: bool) -> list[str]:
+            changed: list[str] = []
+            patterns = ("*.yaml", "*.yml", "*.json", "*.toml")
+            for sub in [root, root / "skills", root / ".cursor"]:
+                if not sub.is_dir():
+                    continue
+                for pattern in patterns:
+                    for f in sub.rglob(pattern):
+                        if not f.is_file():
+                            continue
+                        try:
+                            m = f.stat().st_mtime
+                        except OSError:
+                            continue
+                        key = str(f)
+                        if initial:
+                            known[key] = m
+                        elif known.get(key) != m:
+                            known[key] = m
+                            changed.append(key)
+            return changed
+
+        scan(initial=True)
+        while True:
+            await asyncio.sleep(max(self._debounce, 2.0))
+            try:
+                ch = scan(initial=False)
+                if ch:
+                    await callback(workspace_id, ch)
+            except asyncio.CancelledError:
+                raise
+            except Exception:
+                logger.exception("ConfigWatcher 轮询失败 workspace_id=%s", workspace_id)
+
+    async def stop_watch(self, workspace_id: str) -> None:
+        async with self._lock:
+            obs = self._watchers.pop(workspace_id, None)
+            if obs is not None:
+                try:
+                    obs.stop()
+                    obs.join(timeout=5.0)
+                except Exception:
+                    logger.exception("ConfigWatcher 停止 observer 异常 workspace_id=%s", workspace_id)
+            task = self._tasks.pop(workspace_id, None)
+            if task is not None:
+                task.cancel()
+                try:
+                    await task
+                except asyncio.CancelledError:
+                    pass

+ 6 - 0
gateway/core/lifecycle/errors.py

@@ -0,0 +1,6 @@
+class LifecycleError(Exception):
+    """生命周期相关错误(Workspace / Docker / Trace 代理失败等)。"""
+
+
+class WorkspaceDockerError(LifecycleError):
+    """Workspace 容器创建或启动失败。"""

+ 9 - 0
gateway/core/lifecycle/trace/__init__.py

@@ -0,0 +1,9 @@
+"""Trace 代理与渠道 TraceBackend 实现。"""
+
+from gateway.core.lifecycle.trace.backend import LifecycleTraceBackend
+from gateway.core.lifecycle.trace.manager import TraceManager
+
+__all__ = [
+    "LifecycleTraceBackend",
+    "TraceManager",
+]

+ 92 - 0
gateway/core/lifecycle/trace/backend.py

@@ -0,0 +1,92 @@
+"""
+实现 ``gateway.core.channels.protocols.TraceBackend``:
+prepare_workspace → bind_agent_trace(与 Agent API 返回的 trace_id 对齐)。
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from typing import Any
+
+from gateway.core.lifecycle.trace.manager import TraceManager
+
+logger = logging.getLogger(__name__)
+
+
+class LifecycleTraceBackend:
+    def __init__(self, trace_manager: TraceManager) -> None:
+        self._tm = trace_manager
+        self._lock = asyncio.Lock()
+        self._channel_user_trace: dict[tuple[str, str], str] = {}
+
+    async def prepare_session(
+        self,
+        *,
+        channel: str,
+        user_id: str,
+        workspace_id: str,
+        agent_type: str,
+        metadata: dict[str, object],
+    ) -> None:
+        _ = user_id, agent_type, metadata
+        await self._tm.prepare_workspace_session(workspace_id)
+
+    async def get_existing_trace_id(self, channel: str, user_id: str) -> str | None:
+        async with self._lock:
+            return self._channel_user_trace.get((channel, user_id))
+
+    async def bind_agent_trace_id(
+        self,
+        *,
+        channel: str,
+        user_id: str,
+        workspace_id: str,
+        agent_trace_id: str,
+        agent_type: str,
+        metadata: dict[str, object],
+    ) -> None:
+        key = (channel, user_id)
+        async with self._lock:
+            prev_tid = self._channel_user_trace.get(key)
+            if prev_tid == agent_trace_id:
+                return
+        if prev_tid:
+            await self._tm.release_agent_trace(workspace_id, prev_tid)
+            logger.info(
+                "Lifecycle: 已解除旧 trace_id=%s workspace_id=%s(将绑定新 trace)",
+                prev_tid,
+                workspace_id,
+            )
+        meta_any: dict[str, Any] = {k: v for k, v in metadata.items()}
+        meta_any["channel"] = channel
+        meta_any["user_id"] = user_id
+        await self._tm.bind_agent_trace(
+            workspace_id,
+            agent_trace_id,
+            agent_type,
+            metadata=meta_any,
+        )
+        async with self._lock:
+            self._channel_user_trace[key] = agent_trace_id
+        logger.info(
+            "Lifecycle: 已绑定 Agent trace_id=%s workspace_id=%s channel=%s user=%s",
+            agent_trace_id,
+            workspace_id,
+            channel,
+            user_id,
+        )
+
+    async def forget_trace_binding(self, channel: str, user_id: str, *, workspace_id: str) -> None:
+        """清除渠道侧 (channel,user)→trace 映射,并 release_agent_trace(用于 Trace 终态后放弃续跑同 trace)。"""
+        async with self._lock:
+            tid = self._channel_user_trace.pop((channel, user_id), None)
+        if tid:
+            await self._tm.release_agent_trace(workspace_id, tid)
+            logger.info(
+                "Lifecycle: 已 forget 绑定 trace_id=%s workspace_id=%s channel=%s user=%s",
+                tid,
+                workspace_id,
+                channel,
+                user_id,
+            )

+ 130 - 0
gateway/core/lifecycle/trace/manager.py

@@ -0,0 +1,130 @@
+"""
+Trace 元数据以 Agent API(HTTP)为准;Gateway 在 Agent 返回 trace_id 后通过 ``bind_agent_trace`` 登记引用。
+
+``get_trace`` / ``list_traces``:优先请求 Agent API,失败时返回本地登记信息。
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+import httpx
+
+from utils.env_parse import env_float, env_str
+
+from gateway.core.lifecycle.errors import LifecycleError
+from gateway.core.lifecycle.workspace import WorkspaceManager
+
+logger = logging.getLogger(__name__)
+
+
+class TraceManager:
+    def __init__(
+        self,
+        *,
+        workspace_manager: WorkspaceManager,
+        agent_api_base_url: str,
+        http_timeout: float,
+    ) -> None:
+        self._wm = workspace_manager
+        self._base = agent_api_base_url.rstrip("/")
+        self._timeout = http_timeout
+        self._local_meta: dict[str, dict[str, Any]] = {}
+
+    @classmethod
+    def from_env(cls, workspace_manager: WorkspaceManager) -> TraceManager:
+        return cls(
+            workspace_manager=workspace_manager,
+            agent_api_base_url=env_str("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000"),
+            http_timeout=env_float("GATEWAY_AGENT_API_TIMEOUT", 60.0),
+        )
+
+    async def prepare_workspace_session(self, workspace_id: str) -> None:
+        await self._wm.ensure_session(workspace_id)
+
+    async def bind_agent_trace(
+        self,
+        workspace_id: str,
+        agent_trace_id: str,
+        agent_type: str,
+        metadata: dict[str, Any] | None = None,
+    ) -> None:
+        await self._wm.add_trace_ref(workspace_id, agent_trace_id)
+        self._local_meta[agent_trace_id] = {
+            "trace_id": agent_trace_id,
+            "workspace_id": workspace_id,
+            "agent_type": agent_type,
+            "metadata": dict(metadata or {}),
+            "agent_api_status": "bound",
+        }
+
+    async def release_agent_trace(self, workspace_id: str, agent_trace_id: str) -> None:
+        """解除 Trace 与 Workspace 的绑定(Gateway 本地登记 + meta 引用)。"""
+        await self._wm.remove_trace_ref(workspace_id, agent_trace_id)
+        self._local_meta.pop(agent_trace_id, None)
+
+    async def get_trace(self, trace_id: str) -> dict[str, Any]:
+        async with httpx.AsyncClient(timeout=self._timeout) as client:
+            try:
+                r = await client.get(f"{self._base}/api/traces/{trace_id}")
+                if r.status_code == 200:
+                    body = r.json()
+                    trace = body.get("trace")
+                    if isinstance(trace, dict):
+                        return {"source": "agent_api", **trace}
+            except httpx.RequestError as e:
+                logger.warning("TraceManager.get_trace HTTP 失败 trace_id=%s err=%s", trace_id, e)
+
+        local = self._local_meta.get(trace_id)
+        if local:
+            return {"source": "gateway_local", **local}
+        wid = self._wm.get_workspace_id_for_trace(trace_id)
+        if wid:
+            return {
+                "source": "gateway_local",
+                "trace_id": trace_id,
+                "workspace_id": wid,
+                "agent_api_status": "unknown",
+            }
+        raise LifecycleError(f"Trace 不存在: {trace_id}")
+
+    async def list_traces(
+        self,
+        workspace_id: str | None = None,
+        agent_type: str | None = None,
+        *,
+        limit: int = 50,
+    ) -> list[dict[str, Any]]:
+        params: dict[str, str | int] = {"limit": min(limit, 100)}
+        if agent_type:
+            params["agent_type"] = agent_type
+        # Agent API 使用 uid 过滤;飞书侧 uid 常为裸 user_id,workspace_id 形如 feishu:<uid>
+        if workspace_id and ":" in workspace_id:
+            prefix, _, rest = workspace_id.partition(":")
+            if prefix == "feishu" and rest:
+                params["uid"] = rest
+
+        async with httpx.AsyncClient(timeout=self._timeout) as client:
+            try:
+                r = await client.get(f"{self._base}/api/traces", params=params)
+                if r.status_code == 200:
+                    data = r.json()
+                    traces = data.get("traces")
+                    if isinstance(traces, list):
+                        return traces
+            except httpx.RequestError as e:
+                logger.warning("TraceManager.list_traces HTTP 失败 err=%s", e)
+
+        if workspace_id:
+            return [m for m in self._local_meta.values() if m.get("workspace_id") == workspace_id]
+        return list(self._local_meta.values())
+
+    def get_workspace_id(self, trace_id: str) -> str:
+        wid = self._wm.get_workspace_id_for_trace(trace_id)
+        if wid:
+            return wid
+        m = self._local_meta.get(trace_id)
+        if m and m.get("workspace_id"):
+            return str(m["workspace_id"])
+        raise LifecycleError(f"无法解析 Trace 的 workspace_id: {trace_id}")

+ 14 - 0
gateway/core/lifecycle/workspace/__init__.py

@@ -0,0 +1,14 @@
+"""Workspace 目录、Docker 沙箱容器。"""
+
+from gateway.core.lifecycle.workspace.docker_runner import (
+    WorkspaceDockerRunner,
+    container_name_for_subdir,
+)
+from gateway.core.lifecycle.workspace.manager import WorkspaceManager, workspace_subdir_key
+
+__all__ = [
+    "WorkspaceDockerRunner",
+    "WorkspaceManager",
+    "container_name_for_subdir",
+    "workspace_subdir_key",
+]

+ 199 - 0
gateway/core/lifecycle/workspace/docker_runner.py

@@ -0,0 +1,199 @@
+"""
+为每个 Workspace 启动 ``agent/workspace:latest`` 类沙箱容器,挂载:
+
+- 该 Workspace 目录 → 容器 ``/home/agent/workspace``
+- 共享目录 → 容器 ``/home/agent/shared``
+
+挂载策略(``GATEWAY_WORKSPACE_MOUNT_MODE``):
+
+- ``bind``(默认):使用宿主机/当前命名空间下的目录路径做 bind mount(Gateway 在本机直连 Docker 时可用)。
+- ``volume_subpath``:使用命名卷 + ``VolumeOptions.Subpath``(Gateway 在 Compose 内且与数据卷在同一 Docker 守护进程时推荐;需较新 Docker Engine)。
+"""
+
+from __future__ import annotations
+
+import logging
+import re
+from pathlib import Path
+from typing import Any
+
+from utils.env_parse import env_bool, env_str
+
+from gateway.core.lifecycle.errors import WorkspaceDockerError
+
+logger = logging.getLogger(__name__)
+
+_SAFE_NAME_RE = re.compile(r"[^a-z0-9._-]+", re.IGNORECASE)
+
+
+def container_name_for_subdir(workspace_subdir: str) -> str:
+    """Docker 容器名最长 63;workspace_subdir 为 64 位 hex,截断前缀保证唯一性足够。"""
+    safe = _SAFE_NAME_RE.sub("-", workspace_subdir.lower()).strip("-")
+    if not safe:
+        safe = "ws"
+    base = f"gws-{safe[:50]}"
+    return base[:63]
+
+
+class WorkspaceDockerRunner:
+    def __init__(
+        self,
+        *,
+        image: str,
+        network: str | None,
+        mount_mode: str,
+        workspace_volume: str | None,
+        shared_volume: str | None,
+        docker_enabled: bool,
+    ) -> None:
+        self._image = image
+        self._network = network
+        self._mount_mode = mount_mode
+        self._workspace_volume = workspace_volume
+        self._shared_volume = shared_volume
+        self._enabled = docker_enabled
+        self._client: Any = None
+
+    @classmethod
+    def from_env(cls) -> WorkspaceDockerRunner:
+        net = env_str("GATEWAY_WORKSPACE_DOCKER_NETWORK", "")
+        wvol = env_str("GATEWAY_WORKSPACE_DOCKER_VOLUME", "")
+        svol = env_str("GATEWAY_SHARED_DOCKER_VOLUME", "")
+        return cls(
+            image=env_str("GATEWAY_WORKSPACE_IMAGE", "agent/workspace:latest"),
+            network=net or None,
+            mount_mode=env_str("GATEWAY_WORKSPACE_MOUNT_MODE", "bind").lower(),
+            workspace_volume=wvol or None,
+            shared_volume=svol or None,
+            docker_enabled=env_bool("GATEWAY_WORKSPACE_DOCKER_ENABLED", True),
+        )
+
+    def _get_client(self) -> Any:
+        if self._client is not None:
+            return self._client
+        import docker
+
+        try:
+            self._client = docker.from_env()
+        except Exception as e:
+            raise WorkspaceDockerError(f"无法连接 Docker:{e}") from e
+        return self._client
+
+    def _build_mounts(
+        self,
+        *,
+        workspace_host_path: Path,
+        shared_host_path: Path,
+        workspace_subdir: str,
+    ) -> list[dict[str, Any]]:
+        if self._mount_mode == "volume_subpath":
+            if not self._workspace_volume or not self._shared_volume:
+                raise WorkspaceDockerError(
+                    "volume_subpath 模式需设置 GATEWAY_WORKSPACE_DOCKER_VOLUME 与 GATEWAY_SHARED_DOCKER_VOLUME"
+                )
+            m_ws: dict[str, Any] = {
+                "Type": "volume",
+                "Source": self._workspace_volume,
+                "Target": "/home/agent/workspace",
+                "VolumeOptions": {"Subpath": workspace_subdir},
+            }
+            m_sh: dict[str, Any] = {
+                "Type": "volume",
+                "Source": self._shared_volume,
+                "Target": "/home/agent/shared",
+            }
+            return [m_ws, m_sh]
+
+        ws_abs = str(workspace_host_path.resolve())
+        sh_abs = str(shared_host_path.resolve())
+        return [
+            {"Type": "bind", "Source": ws_abs, "Target": "/home/agent/workspace"},
+            {"Type": "bind", "Source": sh_abs, "Target": "/home/agent/shared"},
+        ]
+
+    def ensure_workspace_container(
+        self,
+        *,
+        workspace_subdir: str,
+        workspace_host_path: Path,
+        shared_host_path: Path,
+    ) -> str | None:
+        """
+        保证存在运行中的 Workspace 容器。返回 container id;未启用 Docker 时返回 None。
+        """
+        if not self._enabled:
+            return None
+
+        name = container_name_for_subdir(workspace_subdir)
+        client = self._get_client()
+
+        from docker.errors import APIError, NotFound
+
+        try:
+            existing = client.containers.get(name)
+            if existing.status != "running":
+                existing.start()
+            return existing.id
+        except NotFound:
+            pass
+        except APIError as e:
+            if getattr(e, "status_code", None) != 404:
+                raise WorkspaceDockerError(f"查询容器 {name} 失败:{e}") from e
+
+        try:
+            run_kw: dict[str, Any] = {
+                "image": self._image,
+                "name": name,
+                "detach": True,
+                "remove": False,
+                "entrypoint": ["sleep", "infinity"],
+            }
+            if self._network:
+                run_kw["network"] = self._network
+
+            if self._mount_mode == "volume_subpath":
+                run_kw["mounts"] = self._build_mounts(
+                    workspace_host_path=workspace_host_path,
+                    shared_host_path=shared_host_path,
+                    workspace_subdir=workspace_subdir,
+                )
+            else:
+                ws_abs = str(workspace_host_path.resolve())
+                sh_abs = str(shared_host_path.resolve())
+                run_kw["volumes"] = {
+                    ws_abs: {"bind": "/home/agent/workspace", "mode": "rw"},
+                    sh_abs: {"bind": "/home/agent/shared", "mode": "rw"},
+                }
+
+            container = client.containers.run(**run_kw)
+            cid = getattr(container, "id", None) or container.get("Id")
+            logger.info(
+                "Workspace 容器已启动 name=%s id=%s image=%s mode=%s",
+                name,
+                cid,
+                self._image,
+                self._mount_mode,
+            )
+            return str(cid) if cid else name
+        except Exception as e:
+            raise WorkspaceDockerError(f"启动 Workspace 容器失败({name}):{e}") from e
+
+    def stop_workspace_container(self, workspace_subdir: str) -> None:
+        """停止该 workspace 对应的沙箱容器;不存在或已停止则忽略(不抛 WorkspaceDockerError)。"""
+        if not self._enabled:
+            return
+        name = container_name_for_subdir(workspace_subdir)
+        try:
+            client = self._get_client()
+            from docker.errors import NotFound
+
+            try:
+                c = client.containers.get(name)
+            except NotFound:
+                return
+            st = getattr(c, "status", None) or ""
+            if st == "running":
+                c.stop(timeout=15)
+                logger.info("Workspace 容器已停止 name=%s", name)
+        except Exception as e:
+            logger.warning("停止 Workspace 容器失败 name=%s: %s", name, e)

+ 221 - 0
gateway/core/lifecycle/workspace/manager.py

@@ -0,0 +1,221 @@
+"""
+Workspace 目录、引用计数、Docker 沙箱容器编排。
+
+目录布局(与 docker-compose 卷一致)::
+
+    {workspaces_root}/          # 默认 /root/.gateway/workspaces
+      <sha256(workspace_id)>/   # 实际数据目录
+        .gateway/meta.json
+    {shared_root}/              # 默认 /root/.gateway/shared
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+import hashlib
+from pathlib import Path
+from typing import Any
+
+from utils.env_parse import env_bool, env_str
+
+from gateway.core.lifecycle.errors import LifecycleError, WorkspaceDockerError
+from gateway.core.lifecycle.workspace.docker_runner import WorkspaceDockerRunner
+
+logger = logging.getLogger(__name__)
+
+
+def workspace_subdir_key(workspace_id: str) -> str:
+    return hashlib.sha256(workspace_id.encode("utf-8")).hexdigest()
+
+
+class WorkspaceManager:
+    def __init__(
+        self,
+        *,
+        workspaces_root: Path,
+        shared_root: Path,
+        docker_runner: WorkspaceDockerRunner,
+        docker_required: bool,
+    ) -> None:
+        self._workspaces_root = workspaces_root
+        self._shared_root = shared_root
+        self._docker = docker_runner
+        self._docker_required = docker_required
+        self._lock = asyncio.Lock()
+        self._refs: dict[str, set[str]] = {}
+        self._trace_to_workspace: dict[str, str] = {}
+
+    @classmethod
+    def from_env(cls, docker_runner: WorkspaceDockerRunner | None = None) -> WorkspaceManager:
+        ws = Path(env_str("GATEWAY_WORKSPACES_ROOT", "/root/.gateway/workspaces")).expanduser()
+        sh = Path(env_str("GATEWAY_SHARED_ROOT", "/root/.gateway/shared")).expanduser()
+        runner = docker_runner or WorkspaceDockerRunner.from_env()
+        required = env_bool("GATEWAY_WORKSPACE_DOCKER_REQUIRED", False)
+        return cls(
+            workspaces_root=ws,
+            shared_root=sh,
+            docker_runner=runner,
+            docker_required=required,
+        )
+
+    def _workspace_dir(self, workspace_id: str) -> Path:
+        return self._workspaces_root / workspace_subdir_key(workspace_id)
+
+    def _meta_path(self, workspace_id: str) -> Path:
+        return self._workspace_dir(workspace_id) / ".gateway" / "meta.json"
+
+    def _load_meta(self, workspace_id: str) -> dict[str, Any]:
+        p = self._meta_path(workspace_id)
+        if not p.is_file():
+            return {}
+        try:
+            return json.loads(p.read_text(encoding="utf-8"))
+        except (OSError, json.JSONDecodeError):
+            return {}
+
+    def get_workspace_container_id(self, workspace_id: str) -> str | None:
+        """同步读取 meta 中的 Workspace 沙箱容器 ID(供 Gateway 调用 Agent API 时注入)。"""
+        cid = self._load_meta(workspace_id).get("workspace_container_id")
+        if cid is None:
+            return None
+        s = str(cid).strip()
+        return s or None
+
+    def _save_meta(self, workspace_id: str, data: dict[str, Any]) -> None:
+        d = self._workspace_dir(workspace_id) / ".gateway"
+        d.mkdir(parents=True, exist_ok=True)
+        p = d / "meta.json"
+        p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
+
+    async def create_workspace(self, workspace_id: str) -> str:
+        """创建 Workspace 目录并返回绝对路径(幂等)。"""
+        async with self._lock:
+            return await self._create_workspace_unlocked(workspace_id)
+
+    async def _create_workspace_unlocked(self, workspace_id: str) -> str:
+        path = self._workspace_dir(workspace_id)
+        path.mkdir(parents=True, exist_ok=True)
+        self._shared_root.mkdir(parents=True, exist_ok=True)
+
+        meta = self._load_meta(workspace_id)
+        meta.setdefault("workspace_id", workspace_id)
+        meta.setdefault("trace_refs", [])
+        self._save_meta(workspace_id, meta)
+
+        for tid in meta.get("trace_refs") or []:
+            if isinstance(tid, str) and tid:
+                self._refs.setdefault(workspace_id, set()).add(tid)
+                self._trace_to_workspace[tid] = workspace_id
+        return str(path.resolve())
+
+    async def get_workspace_path(self, workspace_id: str) -> str:
+        path = self._workspace_dir(workspace_id)
+        if not path.is_dir():
+            raise LifecycleError(f"Workspace 不存在: {workspace_id}")
+        return str(path.resolve())
+
+    async def ensure_session(self, workspace_id: str) -> str:
+        """
+        会话启动:保证目录、共享目录存在,并按策略启动 Workspace 容器。
+        返回 Workspace 目录绝对路径。
+        """
+        async with self._lock:
+            ws_path_str = await self._create_workspace_unlocked(workspace_id)
+            ws_path = Path(ws_path_str)
+            subdir = workspace_subdir_key(workspace_id)
+            try:
+                cid = self._docker.ensure_workspace_container(
+                    workspace_subdir=subdir,
+                    workspace_host_path=ws_path,
+                    shared_host_path=self._shared_root,
+                )
+                meta = self._load_meta(workspace_id)
+                if cid:
+                    meta["workspace_container_id"] = cid
+                self._save_meta(workspace_id, meta)
+            except WorkspaceDockerError as e:
+                logger.exception("Workspace Docker 失败 workspace_id=%s", workspace_id)
+                if self._docker_required:
+                    raise
+                logger.warning("Docker 未强制要求,继续无沙箱容器:%s", e)
+            return ws_path_str
+
+    async def add_trace_ref(self, workspace_id: str, trace_id: str) -> None:
+        async with self._lock:
+            await self._create_workspace_unlocked(workspace_id)
+            s = self._refs.setdefault(workspace_id, set())
+            s.add(trace_id)
+            self._trace_to_workspace[trace_id] = workspace_id
+            meta = self._load_meta(workspace_id)
+            meta["workspace_id"] = workspace_id
+            meta["trace_refs"] = sorted(s)
+            self._save_meta(workspace_id, meta)
+
+    async def remove_trace_ref(self, workspace_id: str, trace_id: str) -> None:
+        async with self._lock:
+            s = self._refs.setdefault(workspace_id, set())
+            s.discard(trace_id)
+            self._trace_to_workspace.pop(trace_id, None)
+            meta = self._load_meta(workspace_id)
+            meta["trace_refs"] = sorted(s)
+            self._save_meta(workspace_id, meta)
+
+    async def cleanup_workspace(self, workspace_id: str, *, force: bool = False) -> None:
+        async with self._lock:
+            refs = self._refs.get(workspace_id) or set()
+            meta_refs = set(self._load_meta(workspace_id).get("trace_refs") or [])
+            active = refs | meta_refs
+            if active and not force:
+                raise LifecycleError(f"Workspace 仍有 {len(active)} 个 Trace 引用,拒绝清理")
+            self._refs.pop(workspace_id, None)
+            for tid in list(meta_refs):
+                self._trace_to_workspace.pop(tid, None)
+            meta = self._load_meta(workspace_id)
+            meta["trace_refs"] = []
+            self._save_meta(workspace_id, meta)
+            if force:
+                import shutil
+
+                p = self._workspace_dir(workspace_id)
+                if p.is_dir():
+                    shutil.rmtree(p, ignore_errors=True)
+
+    async def list_workspaces(self) -> list[dict[str, Any]]:
+        async with self._lock:
+            out: list[dict[str, Any]] = []
+            if not self._workspaces_root.is_dir():
+                return out
+            for child in self._workspaces_root.iterdir():
+                if not child.is_dir() or child.name.startswith("."):
+                    continue
+                meta_path = child / ".gateway" / "meta.json"
+                if not meta_path.is_file():
+                    continue
+                try:
+                    meta = json.loads(meta_path.read_text(encoding="utf-8"))
+                except (OSError, json.JSONDecodeError):
+                    continue
+                wid = str(meta.get("workspace_id") or child.name)
+                ref_count = len(meta.get("trace_refs") or [])
+                container_id = meta.get("workspace_container_id")
+                mem_refs = len(self._refs.get(wid, ()))
+                ref_count = max(ref_count, mem_refs)
+                out.append(
+                    {
+                        "workspace_id": wid,
+                        "path": str(child.resolve()),
+                        "ref_count": ref_count,
+                        "workspace_container_id": container_id,
+                    }
+                )
+            return out
+
+    def get_workspace_id_for_trace(self, trace_id: str) -> str | None:
+        return self._trace_to_workspace.get(trace_id)
+
+    async def stop_workspace_sandbox(self, workspace_id: str) -> None:
+        """停止该 workspace 的沙箱容器(不删目录、不改 trace 引用)。"""
+        subdir = workspace_subdir_key(workspace_id)
+        await asyncio.to_thread(self._docker.stop_workspace_container, subdir)

+ 6 - 5
gateway/docs/architecture.md

@@ -94,9 +94,10 @@ gateway/
 │   │   └── channel_manager.py     # 渠道管理
 │   │
 │   ├── lifecycle/                 # Agent 生命周期管理
-│   │   ├── trace_manager.py      # Trace 注册和查询
-│   │   ├── workspace_manager.py  # Workspace 管理
-│   │   └── config_watcher.py     # 配置热重载
+│   │   ├── workspace/             # Workspace 目录与 Docker 沙箱
+│   │   ├── trace/                 # Trace 代理与 TraceBackend
+│   │   ├── errors.py
+│   │   └── config_watcher.py      # 配置热重载
 │   │
 │   └── executor/                  # 任务执行调度
 │       ├── task_manager.py        # 任务管理
@@ -140,8 +141,8 @@ gateway/
 - 监听配置变化并热重载
 
 **实现位置:**
-- `gateway/core/lifecycle/trace_manager.py`
-- `gateway/core/lifecycle/workspace_manager.py`
+- `gateway/core/lifecycle/trace/manager.py`、`trace/backend.py`
+- `gateway/core/lifecycle/workspace/manager.py`、`workspace/docker_runner.py`
 - `gateway/core/lifecycle/config_watcher.py`
 
 **详细文档:** [lifecycle.md](./core/lifecycle.md)

+ 6 - 2
gateway/docs/core/executor.md

@@ -230,12 +230,16 @@ Executor 依赖 Lifecycle 模块:
 
 1. **获取 Trace 信息**:
    ```python
-   trace_info = lifecycle.trace_manager.get_trace(trace_id)
+   from gateway.core.lifecycle import TraceManager
+
+   trace_info = await trace_manager.get_trace(trace_id)
    ```
 
 2. **获取 Workspace 路径**:
    ```python
-   workspace_path = lifecycle.workspace_manager.get_workspace_path(workspace_id)
+   from gateway.core.lifecycle import WorkspaceManager
+
+   workspace_path = await workspace_manager.get_workspace_path(workspace_id)
    ```
 
 3. **检查 Trace 状态**:

+ 11 - 5
gateway/docs/core/lifecycle.md

@@ -46,9 +46,15 @@ Agent 生命周期管理,包括:
 
 ```
 gateway/core/lifecycle/
-├── trace_manager.py      # Trace 注册和元数据管理
-├── workspace_manager.py  # Workspace 管理
-└── config_watcher.py     # 配置热重载
+├── __init__.py           # 聚合导出(TraceManager、WorkspaceManager 等)
+├── errors.py             # LifecycleError、WorkspaceDockerError
+├── config_watcher.py     # 配置热重载
+├── workspace/
+│   ├── manager.py        # WorkspaceManager(目录、引用计数)
+│   └── docker_runner.py  # WorkspaceDockerRunner(沙箱容器)
+└── trace/
+    ├── manager.py        # TraceManager(Agent API 代理与本地登记)
+    └── backend.py        # LifecycleTraceBackend(channels.TraceBackend)
 ```
 
 ---
@@ -57,7 +63,7 @@ gateway/core/lifecycle/
 
 ### TraceManager
 
-**实现位置:** `gateway/core/lifecycle/trace_manager.py`
+**实现位置:** `gateway/core/lifecycle/trace/manager.py`
 
 **职责:**
 - 调用 Agent 框架创建 Trace
@@ -96,7 +102,7 @@ class TraceManager:
 
 ### WorkspaceManager
 
-**实现位置:** `gateway/core/lifecycle/workspace_manager.py`
+**实现位置:** `gateway/core/lifecycle/workspace/manager.py`(Docker 编排见同目录 `docker_runner.py`)
 
 **职责:**
 - 创建和初始化 Workspace 目录

+ 7 - 1
requirements.txt

@@ -17,4 +17,10 @@ websockets>=13.0
 pydantic
 
 # 飞书
-lark-oapi==1.5.3
+lark-oapi==1.5.3
+
+# Workspace Docker Runner
+docker==7.1.0
+
+# Gateway ConfigWatcher
+watchdog==6.0.0

+ 5 - 0
utils/__init__.py

@@ -0,0 +1,5 @@
+"""仓库根级通用工具(与 ``agent.utils`` 区分)。"""
+
+from .env_parse import env_bool, env_float, env_int, env_str
+
+__all__ = ["env_bool", "env_float", "env_int", "env_str"]

+ 41 - 0
utils/env_parse.py

@@ -0,0 +1,41 @@
+"""环境变量读取(bool / int / float / str),避免各模块重复 ``.lower() in (...)`` 等逻辑。"""
+
+from __future__ import annotations
+
+import os
+
+__all__ = ["env_bool", "env_float", "env_int", "env_str"]
+
+
+def env_str(name: str, default: str) -> str:
+    v = os.getenv(name)
+    if v is None or not str(v).strip():
+        return default
+    return str(v).strip()
+
+
+def env_bool(name: str, default: bool) -> bool:
+    v = os.getenv(name)
+    if v is None:
+        return default
+    return v.lower() in ("1", "true", "yes")
+
+
+def env_int(name: str, default: int) -> int:
+    v = os.getenv(name)
+    if v is None or not str(v).strip():
+        return default
+    try:
+        return int(v.strip())
+    except ValueError:
+        return default
+
+
+def env_float(name: str, default: float) -> float:
+    v = os.getenv(name)
+    if v is None or not str(v).strip():
+        return default
+    try:
+        return float(v.strip())
+    except ValueError:
+        return default