import asyncio import contextlib import logging import time from collections import deque from typing import Any, Dict, Optional logger = logging.getLogger(__name__) class AlertService: """异步告警服务,避免告警发送阻塞主任务链路""" _instance: Optional["AlertService"] = None def __init__(self, feishu_client, max_queue_size: int = 1000): self.feishu = feishu_client self.queue: Optional[asyncio.Queue] = None self._worker_task: Optional[asyncio.Task] = None self._running = False self._max_queue_size = max_queue_size self._recent_alerts = deque(maxlen=200) self._dropped_count = 0 @classmethod def initialize(cls, feishu_client, max_queue_size: int = 1000) -> "AlertService": if cls._instance is None: cls._instance = cls(feishu_client, max_queue_size=max_queue_size) return cls._instance @classmethod def get_instance(cls) -> Optional["AlertService"]: return cls._instance async def start(self): if self._running: return self.queue = asyncio.Queue(maxsize=self._max_queue_size) self._running = True self._worker_task = asyncio.create_task(self._worker(), name="alert_service") logger.info("AlertService started") async def stop(self, drain_timeout: float = 5.0): if not self._running: return self._running = False if self.queue and self.queue.qsize() > 0: logger.info(f"AlertService draining {self.queue.qsize()} alerts...") try: await asyncio.wait_for(self.queue.join(), timeout=drain_timeout) except asyncio.TimeoutError: logger.warning( f"AlertService drain timeout, {self.queue.qsize()} alerts remaining" ) if self._worker_task: self._worker_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._worker_task if self._dropped_count > 0: logger.warning(f"AlertService stopped, dropped alerts: {self._dropped_count}") self._worker_task = None self.queue = None async def send_alert( self, title: str, detail: Dict[str, Any], mention: bool = True, table: bool = False, env: str = "long_articles_task", mention_users=None, dedup_key: Optional[str] = None, ): if not self._running or self.queue is None: return if dedup_key and self._is_duplicate(dedup_key): return item = { "title": title, "detail": detail, "mention": mention, "table": table, "env": env, "mention_users": mention_users, } try: self.queue.put_nowait(item) except asyncio.QueueFull: self._dropped_count += 1 logger.warning(f"Alert queue full, dropped alert: {title}") def _is_duplicate(self, dedup_key: str) -> bool: now = time.time() while self._recent_alerts and now - self._recent_alerts[0][1] > 60: self._recent_alerts.popleft() if any(key == dedup_key for key, _ in self._recent_alerts): logger.debug(f"Alert deduplicated: {dedup_key}") return True self._recent_alerts.append((dedup_key, now)) return False async def _worker(self): while self._running: try: item = await self.queue.get() try: await self.feishu.bot( title=item["title"], detail=item["detail"], mention=item["mention"], table=item["table"], env=item["env"], mention_users=item["mention_users"], ) except Exception as e: logger.error(f"Failed to send alert: {e}") finally: self.queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.exception(f"AlertService worker error: {e}")