| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936 |
- """
- Docker 内执行工具(模块 ``agent.tools.docker_runner``;与 ``agent.core.runner.AgentRunner`` 无关)。
- 解析顺序:**ContextVar(Runner 注入)** → **Trace.context['gateway_exec']** → **环境变量默认容器**
- (``AGENT_DEFAULT_DOCKER_CONTAINER``,可选 ``AGENT_DEFAULT_DOCKER_WORKDIR`` /
- ``AGENT_DEFAULT_DOCKER_USER``)。有有效 ``docker_container`` 时,``bash_command``、
- ``read_file`` / ``write_file`` / ``edit_file`` / ``glob_files`` / ``grep_content`` 走容器内
- ``docker exec``;否则仍走原有 builtin(本机)。
- - ``GatewayExecResolver`` / ``active_gateway_exec``:``AgentRunner`` 在 ``tools.execute`` 前后 set/reset ContextVar。
- - ``BashGatewayDispatcher`` / ``WorkspaceFileToolsDispatcher``:在 ``import agent.tools``(builtin 注册之后)时向 ``ToolRegistry`` 注册包装函数。
- 需要 API 进程能访问 Docker(例如挂载 ``/var/run/docker.sock``)。
- """
- from __future__ import annotations
- import asyncio
- import base64
- import io
- import json
- import logging
- import mimetypes
- import os
- import posixpath
- import tarfile
- from contextvars import ContextVar
- from pathlib import Path
- from typing import TYPE_CHECKING, Any, Callable, ClassVar, Coroutine, Dict, List, Optional, Tuple
- from urllib.parse import urlparse
- from agent.tools.builtin.file.edit import replace as edit_replace
- from agent.tools.builtin.file.grep import LIMIT as GREP_LIMIT
- from agent.tools.builtin.file.read import DEFAULT_READ_LIMIT, MAX_BYTES, MAX_LINE_LENGTH
- from agent.tools.builtin.file.write import _create_diff
- from agent.tools.builtin.glob_tool import LIMIT as GLOB_LIMIT
- from agent.tools.models import ToolContext, ToolResult
- if TYPE_CHECKING:
- from agent.tools.registry import ToolRegistry
- logger = logging.getLogger(__name__)
- # ---------------------------------------------------------------------------
- # Trace.gateway_exec:ContextVar + 路径解析
- # ---------------------------------------------------------------------------
- class GatewayExecResolver:
- """从工具 context / ContextVar 解析 ``gateway_exec``,并把用户路径映射到容器内路径。"""
- ACTIVE: ClassVar[ContextVar[Optional[dict[str, Any]]]] = ContextVar(
- "active_gateway_exec", default=None
- )
- @classmethod
- def from_tool_context(cls, context: Any) -> dict[str, Any] | None:
- if not isinstance(context, dict):
- return None
- tc = context.get("trace_context")
- if not isinstance(tc, dict):
- return None
- ge = tc.get("gateway_exec")
- return ge if isinstance(ge, dict) else None
- @classmethod
- def default_gateway_exec_from_env(cls) -> dict[str, Any] | None:
- """无 Trace.gateway_exec 时,用环境变量指定默认 Workspace 容器(直连 API / 本地调试)。"""
- container = os.getenv("AGENT_DEFAULT_DOCKER_CONTAINER", "").strip()
- if not container:
- return None
- out: dict[str, Any] = {"docker_container": container}
- wd = os.getenv("AGENT_DEFAULT_DOCKER_WORKDIR", "").strip()
- if wd:
- out["container_workdir"] = wd
- user = os.getenv("AGENT_DEFAULT_DOCKER_USER", "").strip()
- if user:
- out["container_user"] = user
- return out
- @classmethod
- def for_trace_context(cls, trace_context: dict[str, Any] | None) -> dict[str, Any] | None:
- """Trace.context 中的 gateway_exec 优先,否则环境变量默认容器。"""
- tc = trace_context or {}
- ge = tc.get("gateway_exec")
- if isinstance(ge, dict) and str(ge.get("docker_container") or "").strip():
- return ge
- return cls.default_gateway_exec_from_env()
- @classmethod
- def effective(cls, context: Any) -> dict[str, Any] | None:
- ge = cls.ACTIVE.get()
- if isinstance(ge, dict) and str(ge.get("docker_container") or "").strip():
- return ge
- if isinstance(context, dict):
- tc = context.get("trace_context")
- if isinstance(tc, dict):
- return cls.for_trace_context(tc)
- return cls.default_gateway_exec_from_env()
- @staticmethod
- def workdir(ge: dict[str, Any]) -> str:
- w = str(ge.get("container_workdir") or "/home/agent/workspace").strip()
- return w.rstrip("/") or "/home/agent/workspace"
- @staticmethod
- def user(ge: dict[str, Any]) -> str:
- u = str(ge.get("container_user") or "agent").strip()
- return u or "agent"
- @staticmethod
- def _host_mapping_root() -> str | None:
- raw = os.getenv("AGENT_WORKSPACE_HOST_PROJECT_ROOT", "").strip()
- if raw:
- return str(Path(raw).resolve())
- try:
- return str(Path.cwd().resolve())
- except Exception:
- return None
- @classmethod
- def resolve_path(cls, ge: dict[str, Any], user_path: str | None, *, is_dir: bool) -> str | None:
- wd = cls.workdir(ge)
- if not user_path or not str(user_path).strip():
- return wd if is_dir else None
- raw = str(user_path).strip().replace("\\", "/")
- host_root = cls._host_mapping_root()
- if posixpath.isabs(raw):
- norm = posixpath.normpath(raw)
- if norm == wd or norm.startswith(wd + "/"):
- return norm
- if host_root:
- hr = host_root.replace("\\", "/").rstrip("/")
- if norm == hr or norm.startswith(hr + "/"):
- rel = posixpath.relpath(norm, hr)
- if rel.startswith("../"):
- return None
- candidate = posixpath.normpath(posixpath.join(wd, rel))
- if candidate == wd or candidate.startswith(wd + "/"):
- return candidate
- return None
- return None
- for seg in raw.split("/"):
- if seg == "..":
- return None
- candidate = posixpath.normpath(posixpath.join(wd, raw))
- if candidate == wd or candidate.startswith(wd + "/"):
- return candidate
- return None
- # 兼容旧导入:runner 使用 ``active_gateway_exec.set`` / ``reset``
- active_gateway_exec = GatewayExecResolver.ACTIVE
- gateway_exec_from_tool_context = GatewayExecResolver.from_tool_context
- effective_gateway_exec = GatewayExecResolver.effective
- container_workdir = GatewayExecResolver.workdir
- container_user = GatewayExecResolver.user
- resolve_container_path = GatewayExecResolver.resolve_path
- # ---------------------------------------------------------------------------
- # 单会话:Docker 容器内 exec / 读写 / 工具级 read/write/glob/grep/bash
- # ---------------------------------------------------------------------------
- class DockerWorkspaceClient:
- """绑定一份 ``gateway_exec`` 字典,封装对该 Workspace 容器的所有 I/O。"""
- __slots__ = ("_ge",)
- _BINARY_EXTS = frozenset({
- ".zip", ".tar", ".gz", ".exe", ".dll", ".so", ".class",
- ".jar", ".war", ".7z", ".doc", ".docx", ".xls", ".xlsx",
- ".ppt", ".pptx", ".odt", ".ods", ".odp", ".bin", ".dat",
- ".obj", ".o", ".a", ".lib", ".wasm", ".pyc", ".pyo",
- })
- def __init__(self, ge: dict[str, Any]) -> None:
- self._ge = ge
- @property
- def ge(self) -> dict[str, Any]:
- return self._ge
- def container_id(self) -> str | None:
- c = self._ge.get("docker_container")
- if c is None:
- return None
- s = str(c).strip()
- return s or None
- def _docker_container(self):
- import docker
- cid = self.container_id()
- if not cid:
- raise ValueError("gateway_exec 缺少 docker_container")
- return docker.from_env().containers.get(cid)
- def sync_exec_argv(
- self,
- argv: List[str],
- *,
- workdir: str,
- environment: Optional[Dict[str, str]] = None,
- ) -> Tuple[int, bytes, bytes]:
- ct = self._docker_container()
- user = GatewayExecResolver.user(self._ge)
- exit_code, output = ct.exec_run(
- argv,
- user=user,
- workdir=workdir,
- environment=environment,
- demux=True,
- )
- if isinstance(output, tuple) and len(output) == 2:
- stdout_b, stderr_b = output
- else:
- stdout_b = output if isinstance(output, (bytes, bytearray)) else b""
- stderr_b = b""
- if stdout_b is None:
- stdout_b = b""
- if stderr_b is None:
- stderr_b = b""
- code = int(exit_code) if exit_code is not None else -1
- return code, bytes(stdout_b), bytes(stderr_b)
- async def async_exec_argv(
- self,
- argv: List[str],
- *,
- workdir: str,
- environment: Optional[Dict[str, str]] = None,
- ) -> Tuple[int, bytes, bytes]:
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(
- None,
- lambda: self.sync_exec_argv(argv, workdir=workdir, environment=environment),
- )
- def sync_read_file_bytes(self, container_path: str) -> bytes:
- ct = self._docker_container()
- try:
- _stat, stream = ct.get_archive(container_path)
- except Exception as e:
- logger.debug("get_archive failed path=%s: %s", container_path, e)
- raise FileNotFoundError(container_path) from e
- chunks = b"".join(stream)
- bio = io.BytesIO(chunks)
- with tarfile.open(fileobj=bio, mode="r") as tar:
- member = tar.next()
- if member is None:
- return b""
- if member.isdir():
- raise IsADirectoryError(container_path)
- ef = tar.extractfile(member)
- if ef is None:
- return b""
- return ef.read()
- @staticmethod
- def _posixpath_dir(p: str) -> str:
- return os.path.dirname(p.replace("\\", "/"))
- @staticmethod
- def _posixpath_basename(p: str) -> str:
- return os.path.basename(p.replace("\\", "/"))
- def sync_write_file_bytes(self, container_path: str, data: bytes) -> None:
- ct = self._docker_container()
- parent = self._posixpath_dir(container_path) or "/"
- base = self._posixpath_basename(container_path)
- if not base:
- raise ValueError("invalid container_path")
- code, _out, err = self.sync_exec_argv(
- ["mkdir", "-p", parent],
- workdir="/",
- )
- if code != 0:
- raise RuntimeError(
- f"mkdir -p failed: {parent!r} code={code} stderr={err.decode('utf-8', errors='replace')}"
- )
- tar_stream = io.BytesIO()
- with tarfile.open(fileobj=tar_stream, mode="w") as tar:
- ti = tarfile.TarInfo(name=base)
- ti.size = len(data)
- ti.mode = 0o644
- tar.addfile(ti, io.BytesIO(data))
- tar_stream.seek(0)
- ok = ct.put_archive(parent, tar_stream)
- if not ok:
- raise RuntimeError(f"put_archive failed: {container_path!r}")
- async def async_read_file_bytes(self, container_path: str) -> bytes:
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(None, lambda: self.sync_read_file_bytes(container_path))
- async def async_write_file_bytes(self, container_path: str, data: bytes) -> None:
- loop = asyncio.get_running_loop()
- await loop.run_in_executor(None, lambda: self.sync_write_file_bytes(container_path, data))
- def sync_path_exists(self, container_path: str, *, is_dir: bool) -> bool:
- flag = "d" if is_dir else "f"
- code, _, _ = self.sync_exec_argv(
- ["test", "-" + flag, container_path],
- workdir="/",
- )
- return code == 0
- @classmethod
- def _is_binary_buffer(cls, data: bytes, suffix: str) -> bool:
- if suffix.lower() in cls._BINARY_EXTS:
- return True
- if not data:
- return False
- buf = data[:4096]
- if b"\x00" in buf:
- return True
- non_printable = sum(1 for b in buf if b < 9 or (13 < b < 32))
- return (non_printable / len(buf)) > 0.3 if buf else False
- MAX_BASH_OUT: ClassVar[int] = 50_000
- async def run_bash_tool(
- self,
- command: str,
- *,
- timeout: Optional[int],
- workdir: Optional[str],
- env: Optional[Dict[str, str]],
- description: str,
- ) -> ToolResult:
- _ = description
- timeout_sec = timeout if timeout is not None and timeout > 0 else 120
- container = self.container_id()
- if not container:
- return ToolResult(title="配置错误", output="gateway_exec 缺少 docker_container", error="missing_container")
- default_wd = GatewayExecResolver.workdir(self._ge)
- inner_wd = str(workdir).strip() if workdir else default_wd
- loop = asyncio.get_running_loop()
- try:
- code, stdout, stderr = await asyncio.wait_for(
- loop.run_in_executor(
- None,
- lambda: self.sync_exec_argv(
- ["bash", "-lc", command],
- workdir=inner_wd,
- environment=env,
- ),
- ),
- timeout=timeout_sec,
- )
- except asyncio.TimeoutError:
- return ToolResult(
- title="命令超时",
- output=f"docker exec 超时(>{timeout_sec}s): {command[:100]}",
- error="Timeout",
- metadata={"command": command, "timeout": timeout_sec},
- )
- except Exception as e:
- logger.exception("docker exec 失败 container=%s", container)
- return ToolResult(
- title="Docker 执行失败",
- output=(
- f"{e}\n\n请确认 API 容器已挂载 /var/run/docker.sock,"
- "且已安装 docker Python 包;容器名在 gateway_exec.docker_container。"
- ),
- error="docker_error",
- )
- stdout_text = stdout.decode("utf-8", errors="replace") if stdout else ""
- stderr_text = stderr.decode("utf-8", errors="replace") if stderr else ""
- truncated = False
- if len(stdout_text) > self.MAX_BASH_OUT:
- stdout_text = stdout_text[: self.MAX_BASH_OUT] + f"\n\n(输出被截断,总长度: {len(stdout_text)} 字符)"
- truncated = True
- parts: list[str] = []
- if stdout_text:
- parts.append(stdout_text)
- if stderr_text:
- parts.append("\n\n--- stderr ---\n" + stderr_text)
- output = "\n".join(parts) if parts else "(命令无输出)"
- ok = code == 0
- meta: dict[str, Any] = {"exit_code": code, "docker_container": container, "truncated": truncated}
- return ToolResult(
- title=f"docker bash (exit {code})",
- output=output,
- error=None if ok else f"exit code {code}",
- metadata=meta,
- )
- async def tool_read_file(self, file_path: str, offset: int, limit: int) -> ToolResult:
- cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False)
- if not cpath:
- return ToolResult(
- title="路径无效",
- output="在 Workspace 模式下路径须相对于工作区根,或为工作区/映射项目根下的绝对路径。",
- error="invalid_path",
- )
- name = Path(cpath.replace("\\", "/")).name
- try:
- raw = await self.async_read_file_bytes(cpath)
- except FileNotFoundError:
- return ToolResult(
- title="文件未找到",
- output=f"文件不存在: {file_path}",
- error="File not found",
- )
- except IsADirectoryError:
- return ToolResult(
- title="路径错误",
- output=f"路径是目录: {file_path}",
- error="Is a directory",
- )
- except Exception as e:
- logger.exception("workspace read_file")
- return ToolResult(title="读取失败", output=str(e), error=str(e))
- mime_type, _ = mimetypes.guess_type(name)
- mime_type = mime_type or ""
- if mime_type.startswith("image/") and mime_type not in ("image/svg+xml", "image/vnd.fastbidsheet"):
- b64_data = base64.b64encode(raw).decode("ascii")
- return ToolResult(
- title=name,
- output=f"图片文件: {name} (MIME: {mime_type}, {len(raw)} bytes)",
- metadata={"mime_type": mime_type, "truncated": False, "workspace_container": True},
- images=[{"type": "base64", "media_type": mime_type, "data": b64_data}],
- )
- if mime_type == "application/pdf":
- return ToolResult(
- title=name,
- output=f"PDF 文件: {name}",
- metadata={"mime_type": mime_type, "truncated": False, "workspace_container": True},
- )
- if self._is_binary_buffer(raw, Path(name).suffix):
- return ToolResult(
- title="二进制文件",
- output=f"无法读取二进制文件: {name}",
- error="Binary file",
- )
- try:
- text = raw.decode("utf-8")
- except UnicodeDecodeError:
- return ToolResult(
- title="编码错误",
- output=f"无法解码文件(非 UTF-8): {name}",
- error="Encoding error",
- )
- lines_no_keep = text.splitlines()
- total_lines = len(lines_no_keep)
- end_line = min(offset + limit, total_lines)
- output_lines: list[str] = []
- total_bytes = 0
- truncated_by_bytes = False
- for i in range(offset, end_line):
- line = lines_no_keep[i]
- if len(line) > MAX_LINE_LENGTH:
- line = line[:MAX_LINE_LENGTH] + "..."
- line_bytes = len(line.encode("utf-8")) + (1 if output_lines else 0)
- if total_bytes + line_bytes > MAX_BYTES:
- truncated_by_bytes = True
- break
- output_lines.append(line)
- total_bytes += line_bytes
- formatted = [f"{offset + idx + 1:5d}| {ln}" for idx, ln in enumerate(output_lines)]
- output = "<file>\n" + "\n".join(formatted)
- last_read_line = offset + len(output_lines)
- has_more = total_lines > last_read_line
- truncated = has_more or truncated_by_bytes
- if truncated_by_bytes:
- output += f"\n\n(输出在 {MAX_BYTES} 字节处被截断。使用 offset 读取第 {last_read_line} 行之后)"
- elif has_more:
- output += f"\n\n(还有更多内容。使用 offset 读取第 {last_read_line} 行之后)"
- else:
- output += f"\n\n(文件结束 - 共 {total_lines} 行)"
- output += "\n</file>"
- preview = "\n".join(output_lines[:20])
- return ToolResult(
- title=name,
- output=output,
- metadata={
- "preview": preview,
- "truncated": truncated,
- "total_lines": total_lines,
- "read_lines": len(output_lines),
- "workspace_container": True,
- },
- )
- async def tool_write_file(self, file_path: str, content: str, append: bool) -> ToolResult:
- cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False)
- if not cpath:
- return ToolResult(title="路径无效", output="路径不在工作区内。", error="invalid_path")
- name = Path(cpath.replace("\\", "/")).name
- if self.sync_path_exists(cpath, is_dir=True):
- return ToolResult(title="路径错误", output=f"路径是目录: {file_path}", error="Path is a directory")
- existed = self.sync_path_exists(cpath, is_dir=False)
- old_content = ""
- if existed:
- try:
- old_content = (await self.async_read_file_bytes(cpath)).decode("utf-8", errors="replace")
- except Exception:
- old_content = ""
- if append and existed:
- new_content = old_content + content
- else:
- new_content = content
- if existed and old_content:
- diff = _create_diff(str(file_path), old_content, new_content)
- else:
- diff = f"(新建文件: {name})"
- try:
- await self.async_write_file_bytes(cpath, new_content.encode("utf-8"))
- except Exception as e:
- logger.exception("workspace write_file")
- return ToolResult(title="写入失败", output=str(e), error=str(e))
- lines = new_content.count("\n")
- if append and existed:
- operation = "追加内容到"
- elif existed:
- operation = "覆盖"
- else:
- operation = "创建"
- return ToolResult(
- title=name,
- output=f"文件写入成功 ({operation})\n\n{diff}",
- metadata={"existed": existed, "append": append, "lines": lines, "diff": diff, "workspace_container": True},
- long_term_memory=f"{operation}文件 {name}",
- )
- async def tool_edit_file(
- self,
- file_path: str,
- old_string: str,
- new_string: str,
- replace_all: bool,
- ) -> ToolResult:
- cpath = GatewayExecResolver.resolve_path(self._ge, file_path, is_dir=False)
- if not cpath:
- return ToolResult(title="路径无效", output="路径不在工作区内。", error="invalid_path")
- name = Path(cpath.replace("\\", "/")).name
- if not self.sync_path_exists(cpath, is_dir=False):
- return ToolResult(title="文件未找到", output=f"文件不存在: {file_path}", error="File not found")
- if self.sync_path_exists(cpath, is_dir=True):
- return ToolResult(title="路径错误", output=f"路径是目录: {file_path}", error="Path is a directory")
- try:
- content_old = (await self.async_read_file_bytes(cpath)).decode("utf-8")
- except Exception as e:
- return ToolResult(title="读取失败", output=str(e), error=str(e))
- try:
- content_new = edit_replace(content_old, old_string, new_string, replace_all)
- except ValueError as e:
- return ToolResult(title="替换失败", output=str(e), error=str(e))
- diff = _create_diff(file_path, content_old, content_new)
- try:
- await self.async_write_file_bytes(cpath, content_new.encode("utf-8"))
- except Exception as e:
- return ToolResult(title="写入失败", output=str(e), error=str(e))
- return ToolResult(
- title=name,
- output=f"编辑成功\n\n{diff}",
- metadata={
- "replace_all": replace_all,
- "workspace_container": True,
- "old_lines": content_old.count("\n"),
- "new_lines": content_new.count("\n"),
- },
- long_term_memory=f"编辑文件 {name}",
- )
- async def tool_glob(self, pattern: str, path: Optional[str]) -> ToolResult:
- wd = GatewayExecResolver.workdir(self._ge)
- sp = GatewayExecResolver.resolve_path(self._ge, path, is_dir=True) if path else wd
- if not sp:
- return ToolResult(title="路径无效", output="搜索目录无效。", error="invalid_path")
- if not self.sync_path_exists(sp, is_dir=True):
- return ToolResult(title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found")
- cfg = json.dumps({"pattern": pattern, "root": sp, "fetch": GLOB_LIMIT + 1}, ensure_ascii=False)
- script = (
- "import glob,json,os;"
- "from pathlib import Path;"
- "c=json.loads(__import__('os').environ['GW_GLOB_CFG']);"
- "os.chdir(c['root']);pat=c['pattern'];n=int(c['fetch']);"
- "paths=[str(p) for p in Path('.').glob(pat) if p.is_file()] if '**' in pat "
- "else [p for p in glob.glob(pat) if os.path.isfile(p)];"
- "mt=sorted([(p,os.path.getmtime(p)) for p in paths],key=lambda x:-x[1]);"
- "print(json.dumps([p for p,_ in mt[:n]]))"
- )
- code, out, err = await self.async_exec_argv(
- ["python3", "-c", script],
- workdir=sp,
- environment={"GW_GLOB_CFG": cfg},
- )
- if code != 0:
- return ToolResult(
- title="glob 失败",
- output=err.decode("utf-8", errors="replace") or out.decode("utf-8", errors="replace"),
- error="glob_failed",
- )
- try:
- file_paths: List[str] = json.loads(out.decode("utf-8") or "[]")
- except json.JSONDecodeError:
- return ToolResult(title="glob 解析失败", output=out.decode("utf-8", errors="replace"), error="bad_json")
- truncated = len(file_paths) > GLOB_LIMIT
- file_paths = file_paths[:GLOB_LIMIT]
- if not file_paths:
- output = "未找到匹配的文件"
- else:
- output = "\n".join(file_paths)
- if truncated:
- output += "\n\n(结果已截断。考虑使用更具体的路径或模式。)"
- return ToolResult(
- title=f"Glob: {pattern}",
- output=output,
- metadata={"count": len(file_paths), "truncated": truncated, "workspace_container": True},
- )
- async def tool_grep(
- self,
- pattern: str,
- path: Optional[str],
- include: Optional[str],
- ) -> ToolResult:
- wd = GatewayExecResolver.workdir(self._ge)
- search_path = GatewayExecResolver.resolve_path(self._ge, path, is_dir=True) if path else wd
- if not search_path:
- return ToolResult(title="路径无效", output="搜索目录无效。", error="invalid_path")
- if not self.sync_path_exists(search_path, is_dir=True):
- return ToolResult(title="目录不存在", output=f"搜索目录不存在: {path}", error="Directory not found")
- args: List[str] = [
- "rg", "-nH", "--hidden", "--follow", "--no-messages",
- "--field-match-separator=|", "--regexp", pattern,
- ]
- if include:
- args.extend(["--glob", include])
- args.append(search_path)
- code, stdout_b, stderr_b = await self.async_exec_argv(args, workdir=search_path)
- if code == 1:
- matches: List[Tuple[str, int, str]] = []
- elif code != 0 and code != 2:
- return ToolResult(
- title="ripgrep 失败",
- output=stderr_b.decode("utf-8", errors="replace"),
- error="rg_failed",
- )
- else:
- matches = []
- for line in stdout_b.decode("utf-8", errors="replace").strip().split("\n"):
- if not line:
- continue
- parts = line.split("|", 2)
- if len(parts) < 3:
- continue
- file_path_str, line_num_str, line_text = parts
- try:
- matches.append((file_path_str, int(line_num_str), line_text))
- except ValueError:
- continue
- matches.sort(key=lambda x: x[0], reverse=True)
- truncated = len(matches) > GREP_LIMIT
- matches = matches[:GREP_LIMIT]
- if not matches:
- output = "未找到匹配"
- else:
- output = f"找到 {len(matches)} 个匹配\n"
- current_file = None
- for file_path_str, line_num, line_text in matches:
- if current_file != file_path_str:
- if current_file is not None:
- output += "\n"
- current_file = file_path_str
- output += f"\n{file_path_str}:\n"
- if len(line_text) > 2000:
- line_text = line_text[:2000] + "..."
- output += f" Line {line_num}: {line_text}\n"
- if truncated:
- output += "\n(结果已截断。考虑使用更具体的路径或模式。)"
- return ToolResult(
- title=f"搜索: {pattern}",
- output=output,
- metadata={"matches": len(matches), "truncated": truncated, "pattern": pattern, "workspace_container": True},
- )
- # ---------------------------------------------------------------------------
- # 注册表:bash / 文件工具分发
- # ---------------------------------------------------------------------------
- class BashGatewayDispatcher:
- """将 ``bash_command`` 覆盖为:有 ``gateway_exec`` 时走 ``DockerWorkspaceClient.run_bash_tool``。"""
- _builtin: ClassVar[Callable[..., Coroutine[Any, Any, ToolResult]] | None] = None
- _installed: ClassVar[bool] = False
- @classmethod
- def install(cls, registry: ToolRegistry) -> None:
- if cls._installed:
- return
- entry = registry._tools.get("bash_command")
- if not entry:
- logger.warning("docker_runner: bash_command 未注册,跳过覆盖")
- return
- cls._builtin = entry["func"]
- schema = entry["schema"]
- hidden = list(entry.get("hidden_params") or ["context"])
- dispatch = cls._make_dispatch()
- registry.register(
- dispatch,
- schema=schema,
- hidden_params=hidden,
- inject_params=dict(entry.get("inject_params") or {}),
- requires_confirmation=entry["ui_metadata"].get("requires_confirmation", False),
- editable_params=list(entry["ui_metadata"].get("editable_params") or []),
- display=dict(entry["ui_metadata"].get("display") or {}),
- url_patterns=entry.get("url_patterns"),
- )
- cls._installed = True
- logger.info("bash_command 已启用 gateway_exec → docker exec 分发")
- @classmethod
- def _make_dispatch(cls) -> Callable[..., Coroutine[Any, Any, ToolResult]]:
- async def bash_command(
- command: str,
- timeout: Optional[int] = None,
- workdir: Optional[str] = None,
- env: Optional[Dict[str, str]] = None,
- description: str = "",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- ge = GatewayExecResolver.effective(context)
- if ge:
- ws = DockerWorkspaceClient(ge)
- if ws.container_id():
- return await ws.run_bash_tool(
- command,
- timeout=timeout,
- workdir=workdir,
- env=env,
- description=description,
- )
- if cls._builtin is None:
- return ToolResult(title="内部错误", output="builtin bash_command 未初始化", error="no_builtin")
- return await cls._builtin(
- command=command,
- timeout=timeout,
- workdir=workdir,
- env=env,
- description=description,
- context=context,
- )
- bash_command.__name__ = "bash_command"
- bash_command.__doc__ = (
- "执行 bash 命令(Trace.gateway_exec 或 AGENT_DEFAULT_DOCKER_CONTAINER 时在容器内 docker exec)"
- )
- return bash_command
- class WorkspaceFileToolsDispatcher:
- """将 read/write/edit/glob/grep 在有 ``gateway_exec`` 时转发到 ``DockerWorkspaceClient``。"""
- _orig: ClassVar[dict[str, Callable[..., Coroutine[Any, Any, ToolResult]]]] = {}
- _installed: ClassVar[bool] = False
- @classmethod
- def install(cls, registry: ToolRegistry) -> None:
- if cls._installed:
- return
- async def read_file(
- file_path: str,
- offset: int = 0,
- limit: int = DEFAULT_READ_LIMIT,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- ge = GatewayExecResolver.effective(context)
- parsed = urlparse(file_path)
- if parsed.scheme in ("http", "https"):
- return await cls._orig["read_file"](file_path=file_path, offset=offset, limit=limit, context=context)
- if ge and ge.get("docker_container"):
- return await DockerWorkspaceClient(ge).tool_read_file(file_path, offset, limit)
- return await cls._orig["read_file"](file_path=file_path, offset=offset, limit=limit, context=context)
- async def write_file(
- file_path: str,
- content: str,
- append: bool = False,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- ge = GatewayExecResolver.effective(context)
- if ge and ge.get("docker_container"):
- return await DockerWorkspaceClient(ge).tool_write_file(file_path, content, append)
- return await cls._orig["write_file"](file_path=file_path, content=content, append=append, context=context)
- async def edit_file(
- file_path: str,
- old_string: str,
- new_string: str,
- replace_all: bool = False,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- ge = GatewayExecResolver.effective(context)
- if ge and ge.get("docker_container"):
- return await DockerWorkspaceClient(ge).tool_edit_file(
- file_path, old_string, new_string, replace_all
- )
- return await cls._orig["edit_file"](
- file_path=file_path,
- old_string=old_string,
- new_string=new_string,
- replace_all=replace_all,
- context=context,
- )
- async def glob_files(
- pattern: str,
- path: Optional[str] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- ge = GatewayExecResolver.effective(context)
- if ge and ge.get("docker_container"):
- return await DockerWorkspaceClient(ge).tool_glob(pattern, path)
- return await cls._orig["glob_files"](pattern=pattern, path=path, context=context)
- async def grep_content(
- pattern: str,
- path: Optional[str] = None,
- include: Optional[str] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- ge = GatewayExecResolver.effective(context)
- if ge and ge.get("docker_container"):
- return await DockerWorkspaceClient(ge).tool_grep(pattern, path, include)
- orig = cls._orig["grep_content"]
- return await orig(pattern=pattern, path=path, include=include, context=context)
- read_file.__name__ = "read_file"
- write_file.__name__ = "write_file"
- edit_file.__name__ = "edit_file"
- glob_files.__name__ = "glob_files"
- grep_content.__name__ = "grep_content"
- for name, fn in [
- ("read_file", read_file),
- ("write_file", write_file),
- ("edit_file", edit_file),
- ("glob_files", glob_files),
- ("grep_content", grep_content),
- ]:
- cls._register_override(registry, name, fn)
- cls._installed = True
- logger.info("read/write/edit/glob/grep 已启用 gateway_exec → Workspace 容器分发")
- @classmethod
- def _register_override(
- cls,
- registry: ToolRegistry,
- name: str,
- dispatch: Callable[..., Coroutine[Any, Any, ToolResult]],
- ) -> None:
- entry = registry._tools.get(name)
- if not entry:
- logger.warning("docker_runner: 工具 %s 未注册,跳过覆盖", name)
- return
- cls._orig[name] = entry["func"]
- registry.register(
- dispatch,
- schema=entry["schema"],
- hidden_params=list(entry.get("hidden_params") or []),
- inject_params=dict(entry.get("inject_params") or {}),
- requires_confirmation=entry["ui_metadata"].get("requires_confirmation", False),
- editable_params=list(entry["ui_metadata"].get("editable_params") or []),
- display=dict(entry["ui_metadata"].get("display") or {}),
- url_patterns=entry.get("url_patterns"),
- )
- def install_bash_gateway_dispatch(registry: ToolRegistry) -> None:
- BashGatewayDispatcher.install(registry)
- def install_workspace_file_tools_dispatch(registry: ToolRegistry) -> None:
- WorkspaceFileToolsDispatcher.install(registry)
- __all__ = [
- "GatewayExecResolver",
- "DockerWorkspaceClient",
- "BashGatewayDispatcher",
- "WorkspaceFileToolsDispatcher",
- "active_gateway_exec",
- "gateway_exec_from_tool_context",
- "effective_gateway_exec",
- "container_workdir",
- "container_user",
- "resolve_container_path",
- "install_bash_gateway_dispatch",
- "install_workspace_file_tools_dispatch",
- ]
|