| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- """
- 监听 Workspace 下技能/配置目录变化并触发热重载回调(不影响已在跑的 Trace 执行线程,
- 仅通知上层重新加载配置)。
- """
- from __future__ import annotations
- import asyncio
- import logging
- import os
- from collections.abc import Callable, Coroutine
- from pathlib import Path
- from typing import Any
- logger = logging.getLogger(__name__)
- Callback = Callable[[str, list[str]], Coroutine[Any, Any, None]]
- class ConfigWatcher:
- def __init__(self, debounce_seconds: float = 0.5) -> None:
- self._debounce = debounce_seconds
- self._watchers: dict[str, Any] = {}
- self._tasks: dict[str, asyncio.Task[None]] = {}
- self._lock = asyncio.Lock()
- @classmethod
- def from_env(cls) -> ConfigWatcher:
- debounce = float(os.getenv("GATEWAY_CONFIG_WATCH_DEBOUNCE", "0.5"))
- return cls(debounce_seconds=debounce)
- async def watch(self, workspace_id: str, workspace_path: str, callback: Callback) -> None:
- """监听 ``workspace_path`` 下常见技能目录;``callback(workspace_id, changed_paths)``。"""
- async with self._lock:
- await self.stop_watch(workspace_id)
- root = Path(workspace_path)
- watch_roots = [root, root / "skills", root / "skills-config"]
- existing = [p for p in watch_roots if p.is_dir()]
- if not existing:
- existing = [root]
- root.mkdir(parents=True, exist_ok=True)
- try:
- from watchdog.events import FileSystemEventHandler
- from watchdog.observers import Observer
- except ImportError:
- logger.warning("未安装 watchdog,ConfigWatcher 使用轮询降级模式")
- task = asyncio.create_task(
- self._poll_loop(workspace_id, workspace_path, callback),
- name=f"config-watch-poll-{workspace_id}",
- )
- self._tasks[workspace_id] = task
- return
- loop = asyncio.get_running_loop()
- debounce = self._debounce
- active: list[asyncio.Task[None] | None] = [None]
- class _Handler(FileSystemEventHandler):
- def on_any_event(self, event): # type: ignore[no-untyped-def]
- if event.is_directory:
- return
- src = getattr(event, "src_path", None)
- if not src:
- return
- src_s = str(src)
- async def _fire() -> None:
- await asyncio.sleep(debounce)
- try:
- await callback(workspace_id, [src_s])
- except Exception:
- logger.exception("ConfigWatcher 回调失败 workspace_id=%s", workspace_id)
- cur = active[0]
- if cur is not None and not cur.done():
- cur.cancel()
- active[0] = loop.create_task(_fire())
- handler = _Handler()
- observer = Observer()
- for p in existing:
- try:
- observer.schedule(handler, str(p), recursive=True)
- except Exception as e:
- logger.warning("ConfigWatcher 无法监听 %s: %s", p, e)
- observer.start()
- self._watchers[workspace_id] = observer
- logger.info("ConfigWatcher 已启动 workspace_id=%s paths=%s", workspace_id, existing)
- async def _poll_loop(self, workspace_id: str, workspace_path: str, callback: Callback) -> None:
- root = Path(workspace_path)
- known: dict[str, float] = {}
- def scan(*, initial: bool) -> list[str]:
- changed: list[str] = []
- patterns = ("*.yaml", "*.yml", "*.json", "*.toml")
- for sub in [root, root / "skills", root / ".cursor"]:
- if not sub.is_dir():
- continue
- for pattern in patterns:
- for f in sub.rglob(pattern):
- if not f.is_file():
- continue
- try:
- m = f.stat().st_mtime
- except OSError:
- continue
- key = str(f)
- if initial:
- known[key] = m
- elif known.get(key) != m:
- known[key] = m
- changed.append(key)
- return changed
- scan(initial=True)
- while True:
- await asyncio.sleep(max(self._debounce, 2.0))
- try:
- ch = scan(initial=False)
- if ch:
- await callback(workspace_id, ch)
- except asyncio.CancelledError:
- raise
- except Exception:
- logger.exception("ConfigWatcher 轮询失败 workspace_id=%s", workspace_id)
- async def stop_watch(self, workspace_id: str) -> None:
- async with self._lock:
- obs = self._watchers.pop(workspace_id, None)
- if obs is not None:
- try:
- obs.stop()
- obs.join(timeout=5.0)
- except Exception:
- logger.exception("ConfigWatcher 停止 observer 异常 workspace_id=%s", workspace_id)
- task = self._tasks.pop(workspace_id, None)
- if task is not None:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
|