""" 监听 Workspace 下技能/配置目录变化并触发热重载回调(不影响已在跑的 Trace 执行线程, 仅通知上层重新加载配置)。 """ from __future__ import annotations import asyncio import logging import os from collections.abc import Callable, Coroutine from pathlib import Path from typing import Any logger = logging.getLogger(__name__) Callback = Callable[[str, list[str]], Coroutine[Any, Any, None]] class ConfigWatcher: def __init__(self, debounce_seconds: float = 0.5) -> None: self._debounce = debounce_seconds self._watchers: dict[str, Any] = {} self._tasks: dict[str, asyncio.Task[None]] = {} self._lock = asyncio.Lock() @classmethod def from_env(cls) -> ConfigWatcher: debounce = float(os.getenv("GATEWAY_CONFIG_WATCH_DEBOUNCE", "0.5")) return cls(debounce_seconds=debounce) async def watch(self, workspace_id: str, workspace_path: str, callback: Callback) -> None: """监听 ``workspace_path`` 下常见技能目录;``callback(workspace_id, changed_paths)``。""" async with self._lock: await self.stop_watch(workspace_id) root = Path(workspace_path) watch_roots = [root, root / "skills", root / "skills-config"] existing = [p for p in watch_roots if p.is_dir()] if not existing: existing = [root] root.mkdir(parents=True, exist_ok=True) try: from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer except ImportError: logger.warning("未安装 watchdog,ConfigWatcher 使用轮询降级模式") task = asyncio.create_task( self._poll_loop(workspace_id, workspace_path, callback), name=f"config-watch-poll-{workspace_id}", ) self._tasks[workspace_id] = task return loop = asyncio.get_running_loop() debounce = self._debounce active: list[asyncio.Task[None] | None] = [None] class _Handler(FileSystemEventHandler): def on_any_event(self, event): # type: ignore[no-untyped-def] if event.is_directory: return src = getattr(event, "src_path", None) if not src: return src_s = str(src) async def _fire() -> None: await asyncio.sleep(debounce) try: await callback(workspace_id, [src_s]) except Exception: logger.exception("ConfigWatcher 回调失败 workspace_id=%s", workspace_id) cur = active[0] if cur is not None and not cur.done(): cur.cancel() active[0] = loop.create_task(_fire()) handler = _Handler() observer = Observer() for p in existing: try: observer.schedule(handler, str(p), recursive=True) except Exception as e: logger.warning("ConfigWatcher 无法监听 %s: %s", p, e) observer.start() self._watchers[workspace_id] = observer logger.info("ConfigWatcher 已启动 workspace_id=%s paths=%s", workspace_id, existing) async def _poll_loop(self, workspace_id: str, workspace_path: str, callback: Callback) -> None: root = Path(workspace_path) known: dict[str, float] = {} def scan(*, initial: bool) -> list[str]: changed: list[str] = [] patterns = ("*.yaml", "*.yml", "*.json", "*.toml") for sub in [root, root / "skills", root / ".cursor"]: if not sub.is_dir(): continue for pattern in patterns: for f in sub.rglob(pattern): if not f.is_file(): continue try: m = f.stat().st_mtime except OSError: continue key = str(f) if initial: known[key] = m elif known.get(key) != m: known[key] = m changed.append(key) return changed scan(initial=True) while True: await asyncio.sleep(max(self._debounce, 2.0)) try: ch = scan(initial=False) if ch: await callback(workspace_id, ch) except asyncio.CancelledError: raise except Exception: logger.exception("ConfigWatcher 轮询失败 workspace_id=%s", workspace_id) async def stop_watch(self, workspace_id: str) -> None: async with self._lock: obs = self._watchers.pop(workspace_id, None) if obs is not None: try: obs.stop() obs.join(timeout=5.0) except Exception: logger.exception("ConfigWatcher 停止 observer 异常 workspace_id=%s", workspace_id) task = self._tasks.pop(workspace_id, None) if task is not None: task.cancel() try: await task except asyncio.CancelledError: pass