docker_runner.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. """Docker 容器管理"""
  2. from __future__ import annotations
  3. import base64
  4. import json
  5. import logging
  6. import os
  7. import socket
  8. import threading
  9. import time
  10. import uuid
  11. from datetime import datetime, timezone
  12. from pathlib import Path
  13. from typing import Any, Callable
  14. import docker
  15. import docker.errors
  16. import docker.types
  17. import httpx
  18. from tool_agent.config import settings
  19. from tool_agent.models import ContainerInfo, ContainerStatus, ToolMeta
  20. logger = logging.getLogger(__name__)
  21. class ContainerStore:
  22. """容器状态 JSON 持久化层 — 替代 sandbox_repository 的 MySQL"""
  23. def __init__(self, path: Path | None = None) -> None:
  24. self._path = path or (settings.data_dir / "containers.json")
  25. self._lock = threading.Lock()
  26. def _load(self) -> list[dict[str, Any]]:
  27. if not self._path.exists():
  28. return []
  29. data = json.loads(self._path.read_text(encoding="utf-8"))
  30. return data.get("containers", [])
  31. def _save(self, containers: list[dict[str, Any]]) -> None:
  32. self._path.parent.mkdir(parents=True, exist_ok=True)
  33. self._path.write_text(
  34. json.dumps({"containers": containers}, indent=2, ensure_ascii=False, default=str),
  35. encoding="utf-8",
  36. )
  37. def create(self, info: ContainerInfo) -> None:
  38. with self._lock:
  39. containers = self._load()
  40. containers.append(info.model_dump(mode="json"))
  41. self._save(containers)
  42. def get(self, container_id: str) -> ContainerInfo | None:
  43. for item in self._load():
  44. if item["container_id"] == container_id and item["status"] == ContainerStatus.RUNNING:
  45. return ContainerInfo(**item)
  46. return None
  47. def get_all_active(self) -> list[ContainerInfo]:
  48. return [
  49. ContainerInfo(**item)
  50. for item in self._load()
  51. if item["status"] == ContainerStatus.RUNNING
  52. ]
  53. def update_last_accessed(self, container_id: str) -> None:
  54. with self._lock:
  55. containers = self._load()
  56. for item in containers:
  57. if item["container_id"] == container_id and item["status"] == ContainerStatus.RUNNING:
  58. item["last_accessed"] = datetime.now(timezone.utc).isoformat()
  59. break
  60. self._save(containers)
  61. def mark_destroyed(self, container_id: str) -> None:
  62. with self._lock:
  63. containers = self._load()
  64. for item in containers:
  65. if item["container_id"] == container_id and item["status"] == ContainerStatus.RUNNING:
  66. item["status"] = ContainerStatus.DESTROYED
  67. item["destroyed_at"] = datetime.now(timezone.utc).isoformat()
  68. break
  69. self._save(containers)
  70. def exists(self, container_id: str) -> bool:
  71. return any(
  72. item["container_id"] == container_id and item["status"] == ContainerStatus.RUNNING
  73. for item in self._load()
  74. )
  75. def get_expired(self, ttl_seconds: int) -> list[ContainerInfo]:
  76. now = datetime.now(timezone.utc)
  77. expired = []
  78. for item in self._load():
  79. if item["status"] != ContainerStatus.RUNNING:
  80. continue
  81. last = item.get("last_accessed") or item.get("created_at")
  82. if last:
  83. last_dt = datetime.fromisoformat(last) if isinstance(last, str) else last
  84. if last_dt.tzinfo is None:
  85. last_dt = last_dt.replace(tzinfo=timezone.utc)
  86. if (now - last_dt).total_seconds() > ttl_seconds:
  87. expired.append(ContainerInfo(**item))
  88. return expired
  89. def count_active(self) -> int:
  90. return sum(1 for item in self._load() if item["status"] == ContainerStatus.RUNNING)
  91. class DockerRunner:
  92. """Docker 容器完整生命周期管理
  93. 以 sandbox_manager.py 为蓝本,提供:
  94. - Docker 客户端懒加载
  95. - 容器创建(端口映射、资源限制、GPU)
  96. - 容器内命令执行(前台带超时 + 后台)
  97. - HTTP 调用容器内工具服务
  98. - 容器启停/销毁/重建
  99. - 健康检查
  100. - 线程安全容器缓存
  101. - 启动时从 JSON 恢复
  102. """
  103. def __init__(self, lazy_init: bool = True) -> None:
  104. self._docker_client: docker.DockerClient | None = None
  105. self._container_cache: dict[str, docker.models.containers.Container] = {}
  106. self._lock = threading.Lock()
  107. self._on_destroy_callbacks: list[Callable[[str], None]] = []
  108. self._store = ContainerStore()
  109. if not lazy_init:
  110. self._init_docker()
  111. # ---- Docker 客户端 ----
  112. @property
  113. def client(self) -> docker.DockerClient:
  114. if self._docker_client is None:
  115. self._init_docker()
  116. return self._docker_client
  117. def _init_docker(self) -> None:
  118. """连接 Docker(本地或远程)"""
  119. if settings.docker_host:
  120. # 远程 Docker:通过 SSH 连接
  121. ssh_key = settings.docker_ssh_key
  122. remote_host = settings.docker_host
  123. # Docker SDK 原生支持 SSH 连接
  124. # 格式:ssh://user@host
  125. docker_url = f"ssh://root@{remote_host}"
  126. logger.info(f"Connecting to remote Docker via SSH: {docker_url}")
  127. # 设置 SSH 密钥环境变量(Docker SDK 会使用)
  128. import os
  129. os.environ["DOCKER_SSH_KEY"] = ssh_key
  130. try:
  131. self._docker_client = docker.DockerClient(
  132. base_url=docker_url,
  133. timeout=30,
  134. use_ssh_client=True, # 使用系统的 SSH 客户端
  135. )
  136. # 测试连接
  137. self._docker_client.ping()
  138. logger.info(f"Successfully connected to remote Docker at {remote_host}")
  139. except Exception as e:
  140. logger.error(f"Failed to connect to remote Docker: {e}")
  141. raise RuntimeError(f"Cannot connect to remote Docker at {remote_host}: {e}")
  142. else:
  143. # 本地 Docker
  144. self._docker_client = docker.from_env()
  145. self._ensure_base_image()
  146. self._restore_container_cache()
  147. def _ensure_base_image(self) -> None:
  148. """检查基础镜像是否存在,不存在则从 Dockerfile 构建"""
  149. image_name = settings.docker_base_image
  150. try:
  151. self.client.images.get(image_name)
  152. logger.info(f"Base image '{image_name}' found locally.")
  153. except docker.errors.ImageNotFound:
  154. logger.info(f"Base image '{image_name}' not found. Building...")
  155. dockerfile_dir = settings.tools_dir / "docker"
  156. dockerfile_path = dockerfile_dir / "Dockerfile.sandbox"
  157. if not dockerfile_path.exists():
  158. logger.warning(f"Dockerfile '{dockerfile_path}' not found, skipping build.")
  159. return
  160. try:
  161. image, build_logs = self.client.images.build(
  162. path=str(dockerfile_dir),
  163. dockerfile="Dockerfile.sandbox",
  164. tag=image_name,
  165. rm=True,
  166. )
  167. for chunk in build_logs:
  168. if "stream" in chunk:
  169. logger.debug(chunk["stream"].strip())
  170. logger.info(f"Successfully built '{image_name}'.")
  171. except Exception as e:
  172. logger.error(f"Failed to build base image: {e}")
  173. def _restore_container_cache(self) -> None:
  174. """启动时从 JSON 恢复活跃容器到内存缓存"""
  175. for info in self._store.get_all_active():
  176. try:
  177. container = self.client.containers.get(info.container_id)
  178. if container.status == "running":
  179. with self._lock:
  180. self._container_cache[info.container_id] = container
  181. logger.info(f"Restored container cache: {info.container_id[:12]}")
  182. else:
  183. self._store.mark_stopped(info.container_id)
  184. logger.info(f"Container not running, marked stopped: {info.container_id[:12]}")
  185. except docker.errors.NotFound:
  186. self._store.mark_destroyed(info.container_id)
  187. logger.warning(f"Container not found, marked destroyed: {info.container_id[:12]}")
  188. # ---- 回调 ----
  189. def add_on_destroy_callback(self, callback: Callable[[str], None]) -> None:
  190. self._on_destroy_callbacks.append(callback)
  191. def _trigger_destroy_callbacks(self, container_id: str) -> None:
  192. for cb in self._on_destroy_callbacks:
  193. try:
  194. cb(container_id)
  195. except Exception as e:
  196. logger.error(f"Destroy callback failed: {e}")
  197. # ---- 端口 ----
  198. @staticmethod
  199. def _get_free_port() -> int:
  200. """获取一个空闲的宿主机端口"""
  201. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  202. s.bind(("", 0))
  203. return s.getsockname()[1]
  204. # ---- 容器生命周期 ----
  205. def create_container(
  206. self,
  207. tool_id: str = "",
  208. image: str | None = None,
  209. mem_limit: str | None = None,
  210. nano_cpus: int | None = None,
  211. ports: list[int] | None = None,
  212. volumes: dict[str, str] | None = None,
  213. use_gpu: bool = False,
  214. gpu_count: int = -1,
  215. ) -> dict[str, Any]:
  216. """创建新容器
  217. Args:
  218. tool_id: 关联的工具 ID
  219. image: 镜像名称,默认使用 settings.docker_base_image
  220. mem_limit: 内存限制,如 "1g"
  221. nano_cpus: CPU 限制,1_000_000_000 = 1 CPU
  222. ports: 需要映射的容器端口列表,如 [8080, 3000]
  223. volumes: 目录挂载,{宿主机路径: 容器路径},如 {"/home/user/project": "/app"}
  224. use_gpu: 是否启用 GPU
  225. gpu_count: GPU 数量,-1 表示全部
  226. """
  227. image = image or settings.docker_base_image
  228. mem_limit = mem_limit or settings.docker_mem_limit
  229. nano_cpus = nano_cpus or settings.docker_nano_cpus
  230. # 容器数量上限检查
  231. active_count = self._store.count_active()
  232. if active_count >= settings.hot_tool_max_containers:
  233. return {"error": f"Container limit reached ({settings.hot_tool_max_containers}). Destroy unused containers first."}
  234. try:
  235. # 端口映射
  236. port_bindings: dict[str, int] = {}
  237. port_mapping: dict[int, int] = {}
  238. if ports:
  239. for container_port in ports:
  240. host_port = self._get_free_port()
  241. port_bindings[f"{container_port}/tcp"] = host_port
  242. port_mapping[container_port] = host_port
  243. # 目录挂载
  244. docker_volumes = {}
  245. if volumes:
  246. for host_path, container_path in volumes.items():
  247. docker_volumes[host_path] = {"bind": container_path, "mode": "ro"}
  248. # GPU 配置
  249. device_requests = None
  250. if use_gpu:
  251. device_requests = [
  252. docker.types.DeviceRequest(count=gpu_count, capabilities=[["gpu"]])
  253. ]
  254. container = self.client.containers.run(
  255. image,
  256. command="tail -f /dev/null",
  257. detach=True,
  258. ports=port_bindings or None,
  259. volumes=docker_volumes or None,
  260. working_dir="/app",
  261. mem_limit=mem_limit,
  262. nano_cpus=nano_cpus,
  263. device_requests=device_requests,
  264. security_opt=["no-new-privileges"],
  265. )
  266. now = datetime.now(timezone.utc)
  267. info = ContainerInfo(
  268. container_id=container.id,
  269. tool_id=tool_id,
  270. image=image,
  271. port_mapping=port_mapping,
  272. volumes=volumes or {},
  273. mem_limit=mem_limit,
  274. nano_cpus=nano_cpus,
  275. use_gpu=use_gpu,
  276. gpu_count=gpu_count,
  277. created_at=now,
  278. last_accessed=now,
  279. )
  280. self._store.create(info)
  281. with self._lock:
  282. self._container_cache[container.id] = container
  283. logger.info(f"Created container {container.id[:12]} for tool '{tool_id}', ports={port_mapping}")
  284. return {
  285. "container_id": container.id,
  286. "tool_id": tool_id,
  287. "port_mapping": port_mapping,
  288. "message": "Container created.",
  289. }
  290. except Exception as e:
  291. logger.error(f"Failed to create container: {e}")
  292. return {"error": str(e)}
  293. def _get_container(self, container_id: str) -> docker.models.containers.Container | None:
  294. """从缓存或 Docker 获取容器对象"""
  295. with self._lock:
  296. container = self._container_cache.get(container_id)
  297. if container:
  298. return container
  299. try:
  300. container = self.client.containers.get(container_id)
  301. with self._lock:
  302. self._container_cache[container_id] = container
  303. return container
  304. except docker.errors.NotFound:
  305. self._store.mark_destroyed(container_id)
  306. return None
  307. def run_command(
  308. self,
  309. container_id: str,
  310. command: str,
  311. background: bool = False,
  312. timeout: int = 120,
  313. ) -> dict[str, Any]:
  314. """在容器内执行命令
  315. Args:
  316. container_id: 容器 ID
  317. command: Shell 命令
  318. background: 是否后台执行
  319. timeout: 前台命令超时秒数
  320. """
  321. if not self._store.exists(container_id):
  322. return {"error": "Container not found"}
  323. self._store.update_last_accessed(container_id)
  324. container = self._get_container(container_id)
  325. if not container:
  326. return {"error": "Container object not found"}
  327. logger.info(f"Running in {container_id[:12]}: {command} (bg={background})")
  328. try:
  329. if background:
  330. log_file = f"background_{uuid.uuid4().hex[:8]}.log"
  331. encoded_cmd = base64.b64encode(command.encode()).decode()
  332. safe_cmd = f"echo {encoded_cmd} | base64 -d | nohup sh > /app/{log_file} 2>&1 &"
  333. container.exec_run(["sh", "-c", safe_cmd], detach=True)
  334. return {
  335. "status": "success",
  336. "message": "Command started in background",
  337. "log_file": f"/app/{log_file}",
  338. }
  339. else:
  340. result_box: dict[str, Any] = {}
  341. def _exec():
  342. try:
  343. result_box["exec"] = container.exec_run(
  344. ["sh", "-c", command], demux=True,
  345. )
  346. except Exception as e:
  347. result_box["error"] = str(e)
  348. thread = threading.Thread(target=_exec, daemon=True)
  349. thread.start()
  350. thread.join(timeout=timeout)
  351. if thread.is_alive():
  352. return {"error": f"Command timeout after {timeout}s"}
  353. if "error" in result_box:
  354. return {"error": result_box["error"]}
  355. exec_result = result_box["exec"]
  356. stdout, stderr = exec_result.output
  357. return {
  358. "exit_code": exec_result.exit_code,
  359. "stdout": stdout.decode("utf-8", errors="replace") if stdout else "",
  360. "stderr": stderr.decode("utf-8", errors="replace") if stderr else "",
  361. }
  362. except Exception as e:
  363. return {"error": str(e)}
  364. def destroy_container(self, container_id: str) -> dict[str, Any]:
  365. """销毁容器并释放资源"""
  366. if not self._store.exists(container_id):
  367. return {"error": "Container not found"}
  368. with self._lock:
  369. container = self._container_cache.pop(container_id, None)
  370. if not container:
  371. try:
  372. container = self.client.containers.get(container_id)
  373. except docker.errors.NotFound:
  374. self._store.mark_destroyed(container_id)
  375. return {"error": "Container not found in Docker"}
  376. self._store.mark_destroyed(container_id)
  377. self._trigger_destroy_callbacks(container_id)
  378. try:
  379. container.remove(force=True)
  380. logger.info(f"Destroyed container {container_id[:12]}")
  381. return {"status": "success", "message": f"Container {container_id[:12]} destroyed"}
  382. except Exception as e:
  383. return {"error": str(e)}
  384. def rebuild_with_ports(
  385. self,
  386. container_id: str,
  387. ports: list[int],
  388. mem_limit: str | None = None,
  389. nano_cpus: int | None = None,
  390. use_gpu: bool = False,
  391. gpu_count: int = -1,
  392. ) -> dict[str, Any]:
  393. """重建容器并应用新端口映射,保留文件系统状态
  394. 通过 commit → 重建 → 清理临时镜像实现。
  395. """
  396. container = self._get_container(container_id)
  397. if not container:
  398. return {"error": "Container not found"}
  399. info = self._store.get(container_id)
  400. mem_limit = mem_limit or (info.mem_limit if info else settings.docker_mem_limit)
  401. nano_cpus = nano_cpus or (info.nano_cpus if info else settings.docker_nano_cpus)
  402. tool_id = info.tool_id if info else ""
  403. old_volumes = info.volumes if info else {}
  404. try:
  405. # 1. commit 当前容器为临时镜像
  406. temp_tag = f"sandbox-temp-{uuid.uuid4().hex[:8]}"
  407. logger.info(f"Committing {container_id[:12]} → {temp_tag}")
  408. container.commit(repository=temp_tag)
  409. # 2. 新端口映射
  410. port_bindings: dict[str, int] = {}
  411. port_mapping: dict[int, int] = {}
  412. for p in ports:
  413. hp = self._get_free_port()
  414. port_bindings[f"{p}/tcp"] = hp
  415. port_mapping[p] = hp
  416. # 恢复目录挂载
  417. docker_volumes = {}
  418. if old_volumes:
  419. for host_path, container_path in old_volumes.items():
  420. docker_volumes[host_path] = {"bind": container_path, "mode": "ro"}
  421. device_requests = None
  422. if use_gpu:
  423. device_requests = [
  424. docker.types.DeviceRequest(count=gpu_count, capabilities=[["gpu"]])
  425. ]
  426. # 3. 创建新容器
  427. new_container = self.client.containers.run(
  428. temp_tag,
  429. command="tail -f /dev/null",
  430. detach=True,
  431. ports=port_bindings or None,
  432. volumes=docker_volumes or None,
  433. working_dir="/app",
  434. mem_limit=mem_limit,
  435. nano_cpus=nano_cpus,
  436. device_requests=device_requests,
  437. security_opt=["no-new-privileges"],
  438. )
  439. # 4. 清理旧容器
  440. self._store.mark_destroyed(container_id)
  441. with self._lock:
  442. self._container_cache.pop(container_id, None)
  443. container.remove(force=True)
  444. # 5. 清理临时镜像
  445. try:
  446. self.client.images.remove(temp_tag, force=True)
  447. except Exception as e:
  448. logger.warning(f"Failed to remove temp image: {e}")
  449. # 6. 保存新容器
  450. now = datetime.now(timezone.utc)
  451. new_info = ContainerInfo(
  452. container_id=new_container.id,
  453. tool_id=tool_id,
  454. image=info.image if info else settings.docker_base_image,
  455. port_mapping=port_mapping,
  456. volumes=old_volumes,
  457. mem_limit=mem_limit,
  458. nano_cpus=nano_cpus,
  459. use_gpu=use_gpu,
  460. gpu_count=gpu_count,
  461. created_at=now,
  462. last_accessed=now,
  463. )
  464. self._store.create(new_info)
  465. with self._lock:
  466. self._container_cache[new_container.id] = new_container
  467. logger.info(f"Rebuilt {container_id[:12]} → {new_container.id[:12]}, ports={port_mapping}")
  468. return {
  469. "old_container_id": container_id,
  470. "new_container_id": new_container.id,
  471. "port_mapping": port_mapping,
  472. "message": "Container rebuilt with new port mappings. All files preserved.",
  473. }
  474. except Exception as e:
  475. logger.error(f"Failed to rebuild container: {e}")
  476. return {"error": str(e)}
  477. def start_container(self, container_id: str) -> bool:
  478. """启动已停止的容器"""
  479. container = self._get_container(container_id)
  480. if not container:
  481. return False
  482. try:
  483. container.start()
  484. logger.info(f"Started container {container_id[:12]}")
  485. return True
  486. except Exception as e:
  487. logger.error(f"Failed to start container: {e}")
  488. return False
  489. def stop_container(self, container_id: str) -> bool:
  490. """停止运行中的容器"""
  491. container = self._get_container(container_id)
  492. if not container:
  493. return False
  494. try:
  495. container.stop(timeout=10)
  496. logger.info(f"Stopped container {container_id[:12]}")
  497. return True
  498. except Exception as e:
  499. logger.error(f"Failed to stop container: {e}")
  500. return False
  501. # ---- 工具调用 (HTTP) ----
  502. async def run(self, tool: ToolMeta, params: dict[str, Any], stream: bool = False) -> dict[str, Any]:
  503. """通过 HTTP 调用容器内工具服务"""
  504. # 从注册表中找到该工具对应的容器端口
  505. for info in self._store.get_all_active():
  506. if info.tool_id == tool.tool_id and info.port_mapping:
  507. # 取第一个映射端口作为服务端口
  508. host_port = next(iter(info.port_mapping.values()))
  509. self._store.update_last_accessed(info.container_id)
  510. url = f"http://localhost:{host_port}/run"
  511. payload = {"params": params, "stream": stream}
  512. try:
  513. async with httpx.AsyncClient(timeout=60) as client:
  514. resp = await client.post(url, json=payload)
  515. return resp.json()
  516. except Exception as e:
  517. return {"status": "error", "error": str(e)}
  518. return {"status": "error", "error": f"No running container for tool '{tool.tool_id}'"}
  519. async def health_check(self, container_id: str) -> bool:
  520. """HTTP 健康检查"""
  521. info = self._store.get(container_id)
  522. if not info or not info.port_mapping:
  523. return False
  524. host_port = next(iter(info.port_mapping.values()))
  525. try:
  526. async with httpx.AsyncClient(timeout=5) as client:
  527. resp = await client.get(f"http://localhost:{host_port}/health")
  528. return resp.status_code == 200
  529. except Exception:
  530. return False
  531. # ---- 自动清理 ----
  532. def cleanup_expired(self) -> list[str]:
  533. """清理超过 TTL 的容器,返回被清理的 container_id 列表"""
  534. expired = self._store.get_expired(settings.docker_ttl_seconds)
  535. cleaned = []
  536. for info in expired:
  537. logger.info(f"Auto-cleaning expired container {info.container_id[:12]}")
  538. result = self.destroy_container(info.container_id)
  539. if "error" not in result:
  540. cleaned.append(info.container_id)
  541. return cleaned
  542. # ---- 查询 ----
  543. def list_active(self) -> list[ContainerInfo]:
  544. return self._store.get_all_active()
  545. def get_container_info(self, container_id: str) -> ContainerInfo | None:
  546. return self._store.get(container_id)