config_watcher.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. """
  2. 监听 Workspace 下技能/配置目录变化并触发热重载回调(不影响已在跑的 Trace 执行线程,
  3. 仅通知上层重新加载配置)。
  4. """
  5. from __future__ import annotations
  6. import asyncio
  7. import logging
  8. import os
  9. from collections.abc import Callable, Coroutine
  10. from pathlib import Path
  11. from typing import Any
  12. logger = logging.getLogger(__name__)
  13. Callback = Callable[[str, list[str]], Coroutine[Any, Any, None]]
  14. class ConfigWatcher:
  15. def __init__(self, debounce_seconds: float = 0.5) -> None:
  16. self._debounce = debounce_seconds
  17. self._watchers: dict[str, Any] = {}
  18. self._tasks: dict[str, asyncio.Task[None]] = {}
  19. self._lock = asyncio.Lock()
  20. @classmethod
  21. def from_env(cls) -> ConfigWatcher:
  22. debounce = float(os.getenv("GATEWAY_CONFIG_WATCH_DEBOUNCE", "0.5"))
  23. return cls(debounce_seconds=debounce)
  24. async def watch(self, workspace_id: str, workspace_path: str, callback: Callback) -> None:
  25. """监听 ``workspace_path`` 下常见技能目录;``callback(workspace_id, changed_paths)``。"""
  26. async with self._lock:
  27. await self.stop_watch(workspace_id)
  28. root = Path(workspace_path)
  29. watch_roots = [root, root / "skills", root / "skills-config"]
  30. existing = [p for p in watch_roots if p.is_dir()]
  31. if not existing:
  32. existing = [root]
  33. root.mkdir(parents=True, exist_ok=True)
  34. try:
  35. from watchdog.events import FileSystemEventHandler
  36. from watchdog.observers import Observer
  37. except ImportError:
  38. logger.warning("未安装 watchdog,ConfigWatcher 使用轮询降级模式")
  39. task = asyncio.create_task(
  40. self._poll_loop(workspace_id, workspace_path, callback),
  41. name=f"config-watch-poll-{workspace_id}",
  42. )
  43. self._tasks[workspace_id] = task
  44. return
  45. loop = asyncio.get_running_loop()
  46. debounce = self._debounce
  47. active: list[asyncio.Task[None] | None] = [None]
  48. class _Handler(FileSystemEventHandler):
  49. def on_any_event(self, event): # type: ignore[no-untyped-def]
  50. if event.is_directory:
  51. return
  52. src = getattr(event, "src_path", None)
  53. if not src:
  54. return
  55. src_s = str(src)
  56. async def _fire() -> None:
  57. await asyncio.sleep(debounce)
  58. try:
  59. await callback(workspace_id, [src_s])
  60. except Exception:
  61. logger.exception("ConfigWatcher 回调失败 workspace_id=%s", workspace_id)
  62. cur = active[0]
  63. if cur is not None and not cur.done():
  64. cur.cancel()
  65. active[0] = loop.create_task(_fire())
  66. handler = _Handler()
  67. observer = Observer()
  68. for p in existing:
  69. try:
  70. observer.schedule(handler, str(p), recursive=True)
  71. except Exception as e:
  72. logger.warning("ConfigWatcher 无法监听 %s: %s", p, e)
  73. observer.start()
  74. self._watchers[workspace_id] = observer
  75. logger.info("ConfigWatcher 已启动 workspace_id=%s paths=%s", workspace_id, existing)
  76. async def _poll_loop(self, workspace_id: str, workspace_path: str, callback: Callback) -> None:
  77. root = Path(workspace_path)
  78. known: dict[str, float] = {}
  79. def scan(*, initial: bool) -> list[str]:
  80. changed: list[str] = []
  81. patterns = ("*.yaml", "*.yml", "*.json", "*.toml")
  82. for sub in [root, root / "skills", root / ".cursor"]:
  83. if not sub.is_dir():
  84. continue
  85. for pattern in patterns:
  86. for f in sub.rglob(pattern):
  87. if not f.is_file():
  88. continue
  89. try:
  90. m = f.stat().st_mtime
  91. except OSError:
  92. continue
  93. key = str(f)
  94. if initial:
  95. known[key] = m
  96. elif known.get(key) != m:
  97. known[key] = m
  98. changed.append(key)
  99. return changed
  100. scan(initial=True)
  101. while True:
  102. await asyncio.sleep(max(self._debounce, 2.0))
  103. try:
  104. ch = scan(initial=False)
  105. if ch:
  106. await callback(workspace_id, ch)
  107. except asyncio.CancelledError:
  108. raise
  109. except Exception:
  110. logger.exception("ConfigWatcher 轮询失败 workspace_id=%s", workspace_id)
  111. async def stop_watch(self, workspace_id: str) -> None:
  112. async with self._lock:
  113. obs = self._watchers.pop(workspace_id, None)
  114. if obs is not None:
  115. try:
  116. obs.stop()
  117. obs.join(timeout=5.0)
  118. except Exception:
  119. logger.exception("ConfigWatcher 停止 observer 异常 workspace_id=%s", workspace_id)
  120. task = self._tasks.pop(workspace_id, None)
  121. if task is not None:
  122. task.cancel()
  123. try:
  124. await task
  125. except asyncio.CancelledError:
  126. pass