| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- """uv 本地环境管理 — 创建/运行/管理 uv 虚拟环境"""
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import subprocess
- from pathlib import Path
- from typing import Any
- from tool_agent.config import settings
- from tool_agent.models import ToolMeta
- logger = logging.getLogger(__name__)
- class LocalRunner:
- """uv 本地工具运行器 + 环境管理"""
- def __init__(self) -> None:
- self._background_procs: list[subprocess.Popen] = []
- self._proc_lock = asyncio.Lock() # 进程列表锁
- async def cleanup_background(self) -> int:
- """杀掉所有后台进程,返回清理数量"""
- async with self._proc_lock:
- killed = 0
- for proc in self._background_procs:
- try:
- proc.kill()
- proc.wait(timeout=5)
- killed += 1
- except Exception:
- pass
- self._background_procs.clear()
- if killed:
- logger.info(f"Cleaned up {killed} background processes")
- return killed
- # ---- 环境创建与管理 ----
- def create_project(
- self,
- name: str,
- path: Path | None = None,
- python_version: str = "3.12",
- ) -> dict[str, Any]:
- """创建新的 uv 项目
- Args:
- name: 项目名称
- path: 项目路径,默认在 tools/local/{name}
- python_version: Python 版本,默认 3.12
- """
- project_dir = path or (settings.tools_dir / "local" / name)
- project_dir.mkdir(parents=True, exist_ok=True)
- try:
- # uv init 创建项目
- result = subprocess.run(
- ["uv", "init", "--name", name, "--python", python_version],
- cwd=str(project_dir),
- capture_output=True,
- text=True,
- timeout=60,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr, "stdout": result.stdout}
- # 清理 uv init 自动创建的 git 仓库
- import shutil
- git_dir = project_dir / ".git"
- gitignore = project_dir / ".gitignore"
- if git_dir.exists():
- shutil.rmtree(git_dir)
- if gitignore.exists():
- gitignore.unlink()
- logger.info(f"Created uv project: {project_dir}")
- return {
- "status": "success",
- "project_dir": str(project_dir),
- "message": f"Project '{name}' created with Python {python_version}",
- }
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def create_venv(self, project_dir: Path, python_version: str = "3.12") -> dict[str, Any]:
- """在指定目录创建 venv
- Args:
- project_dir: 项目目录
- python_version: Python 版本
- """
- try:
- result = subprocess.run(
- ["uv", "venv", "--python", python_version],
- cwd=str(project_dir),
- capture_output=True,
- text=True,
- timeout=120,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr}
- logger.info(f"Created venv in: {project_dir}")
- return {
- "status": "success",
- "venv_path": str(project_dir / ".venv"),
- "message": f"Venv created with Python {python_version}",
- }
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def add_dependency(
- self,
- project_dir: Path,
- package: str,
- dev: bool = False,
- ) -> dict[str, Any]:
- """添加依赖
- Args:
- project_dir: 项目目录
- package: 包名,如 "flask" 或 "flask>=2.0"
- dev: 是否为开发依赖
- """
- cmd = ["uv", "add", package]
- if dev:
- cmd.append("--dev")
- try:
- result = subprocess.run(
- cmd,
- cwd=str(project_dir),
- capture_output=True,
- text=True,
- timeout=120,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr, "stdout": result.stdout}
- logger.info(f"Added dependency '{package}' to {project_dir}")
- return {
- "status": "success",
- "package": package,
- "message": f"Added {'dev ' if dev else ''}dependency: {package}",
- }
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def remove_dependency(self, project_dir: Path, package: str) -> dict[str, Any]:
- """移除依赖"""
- try:
- result = subprocess.run(
- ["uv", "remove", package],
- cwd=str(project_dir),
- capture_output=True,
- text=True,
- timeout=60,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr}
- return {"status": "success", "message": f"Removed dependency: {package}"}
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def sync_dependencies(self, project_dir: Path) -> dict[str, Any]:
- """同步依赖(uv sync)"""
- try:
- result = subprocess.run(
- ["uv", "sync"],
- cwd=str(project_dir),
- capture_output=True,
- text=True,
- timeout=300,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr}
- return {"status": "success", "message": "Dependencies synced", "output": result.stdout}
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def run_command(
- self,
- project_dir: Path,
- command: str,
- timeout: int = 120,
- background: bool = False,
- log_file: str = "last_run.log"
- ) -> dict[str, Any]:
- """在项目环境中运行命令,并同步写入日志文件"""
- # 日志统一写到 tests/ 目录
- tests_dir = project_dir / "tests"
- tests_dir.mkdir(parents=True, exist_ok=True)
- log_path = tests_dir / log_file
- try:
- # 拼接 uv run 命令
- full_cmd = ["uv", "run", "--directory", str(project_dir)] + command.split()
- if background:
- bg_log = tests_dir / f"background_{command.split()[0]}.log"
- with open(bg_log, "w") as f:
- proc = subprocess.Popen(
- full_cmd,
- stdout=f,
- stderr=subprocess.STDOUT,
- text=True,
- )
- self._background_procs.append(proc)
- return {
- "status": "success",
- "message": f"Command started in background (PID {proc.pid})",
- "pid": proc.pid,
- "log_file": str(bg_log),
- }
- # 使用 subprocess.run 并捕获输出
- result = subprocess.run(
- full_cmd,
- capture_output=True,
- text=True,
- timeout=timeout,
- )
- # --- 持久化日志 ---
- with open(log_path, "w", encoding="utf-8") as f:
- f.write(f"Command: {command}\n")
- f.write(f"Exit Code: {result.returncode}\n")
- f.write("--- STDOUT ---\n")
- f.write(result.stdout)
- f.write("\n--- STDERR ---\n")
- f.write(result.stderr)
- return {
- "exit_code": result.returncode,
- "stdout": result.stdout,
- "stderr": result.stderr,
- "log_path": str(log_path)
- }
- except subprocess.TimeoutExpired as e:
- # 超时也记录已捕获的部分输出
- return {"status": "error", "error": f"Timeout", "stdout": e.stdout, "stderr": e.stderr}
-
- def run_python(
- self,
- project_dir: Path,
- script: str,
- timeout: int = 120,
- ) -> dict[str, Any]:
- """在项目环境中运行 Python 脚本
- Args:
- project_dir: 项目目录
- script: Python 脚本路径(相对于项目目录)
- timeout: 超时秒数
- """
- return self.run_command(project_dir, f"python {script}", timeout)
- def run_code(
- self,
- project_dir: Path,
- code: str,
- timeout: int = 60,
- ) -> dict[str, Any]:
- """在项目环境中运行 Python 代码片段
- Args:
- project_dir: 项目目录
- code: Python 代码
- timeout: 超时秒数
- """
- try:
- result = subprocess.run(
- ["uv", "run", "--directory", str(project_dir), "python", "-c", code],
- capture_output=True,
- text=True,
- timeout=timeout,
- )
- return {
- "exit_code": result.returncode,
- "stdout": result.stdout,
- "stderr": result.stderr,
- }
- except subprocess.TimeoutExpired:
- return {"status": "error", "error": f"Code timeout after {timeout}s"}
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def list_dependencies(self, project_dir: Path) -> dict[str, Any]:
- """列出项目依赖"""
- try:
- result = subprocess.run(
- ["uv", "pip", "list", "--format", "json"],
- cwd=str(project_dir),
- capture_output=True,
- text=True,
- timeout=30,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr}
- packages = json.loads(result.stdout) if result.stdout else []
- return {"status": "success", "packages": packages}
- except Exception as e:
- return {"status": "error", "error": str(e)}
- def get_python_version(self, project_dir: Path) -> dict[str, Any]:
- """获取项目 Python 版本"""
- result = self.run_code(project_dir, "import sys; print(sys.version)")
- if result.get("exit_code") == 0:
- return {"status": "success", "version": result["stdout"].strip()}
- return {"status": "error", "error": result.get("stderr", result.get("error"))}
- # ---- 工具调用(原有功能) ----
- async def run(self, tool: ToolMeta, params: dict[str, Any], stream: bool = False) -> dict[str, Any]:
- """调用本地工具(通过 stdio JSON 协议)"""
- tool_dir = settings.tools_dir / "local" / tool.tool_id
- request = json.dumps({"action": "run", "params": params, "stream": stream})
- try:
- result = subprocess.run(
- ["uv", "run", "--directory", str(tool_dir), "python", "main.py"],
- input=request,
- capture_output=True,
- text=True,
- timeout=60,
- )
- if result.returncode != 0:
- return {"status": "error", "error": result.stderr}
- return json.loads(result.stdout)
- except subprocess.TimeoutExpired:
- return {"status": "error", "error": "timeout"}
- except Exception as e:
- return {"status": "error", "error": str(e)}
- async def health_check(self, tool: ToolMeta) -> bool:
- """工具健康检查"""
- tool_dir = settings.tools_dir / "local" / tool.tool_id
- request = json.dumps({"action": "health"})
- try:
- result = subprocess.run(
- ["uv", "run", "--directory", str(tool_dir), "python", "main.py"],
- input=request,
- capture_output=True,
- text=True,
- timeout=10,
- )
- data = json.loads(result.stdout)
- return data.get("healthy", False)
- except Exception:
- return False
-
- def read_logs(self, project_dir: Path, log_file: str = "last_run.log") -> dict[str, Any]:
- """读取项目内的日志文件"""
- log_path = project_dir / log_file
- if not log_path.exists():
- return {"status": "error", "error": "Log file not found"}
-
- try:
- content = log_path.read_text(encoding="utf-8")
- return {"status": "success", "content": content}
- except Exception as e:
- return {"status": "error", "error": str(e)}
|