| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import asyncio
- import contextlib
- import logging
- import time
- from collections import deque
- from typing import Any, Dict, Optional
- logger = logging.getLogger(__name__)
- class AlertService:
- """异步告警服务,避免告警发送阻塞主任务链路"""
- _instance: Optional["AlertService"] = None
- def __init__(self, feishu_client, max_queue_size: int = 1000):
- self.feishu = feishu_client
- self.queue: Optional[asyncio.Queue] = None
- self._worker_task: Optional[asyncio.Task] = None
- self._running = False
- self._max_queue_size = max_queue_size
- self._recent_alerts = deque(maxlen=200)
- self._dropped_count = 0
- @classmethod
- def initialize(cls, feishu_client, max_queue_size: int = 1000) -> "AlertService":
- if cls._instance is None:
- cls._instance = cls(feishu_client, max_queue_size=max_queue_size)
- return cls._instance
- @classmethod
- def get_instance(cls) -> Optional["AlertService"]:
- return cls._instance
- async def start(self):
- if self._running:
- return
- self.queue = asyncio.Queue(maxsize=self._max_queue_size)
- self._running = True
- self._worker_task = asyncio.create_task(self._worker(), name="alert_service")
- logger.info("AlertService started")
- async def stop(self, drain_timeout: float = 5.0):
- if not self._running:
- return
- self._running = False
- if self.queue and self.queue.qsize() > 0:
- logger.info(f"AlertService draining {self.queue.qsize()} alerts...")
- try:
- await asyncio.wait_for(self.queue.join(), timeout=drain_timeout)
- except asyncio.TimeoutError:
- logger.warning(
- f"AlertService drain timeout, {self.queue.qsize()} alerts remaining"
- )
- if self._worker_task:
- self._worker_task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await self._worker_task
- if self._dropped_count > 0:
- logger.warning(f"AlertService stopped, dropped alerts: {self._dropped_count}")
- self._worker_task = None
- self.queue = None
- async def send_alert(
- self,
- title: str,
- detail: Dict[str, Any],
- mention: bool = True,
- table: bool = False,
- env: str = "long_articles_task",
- mention_users=None,
- dedup_key: Optional[str] = None,
- ):
- if not self._running or self.queue is None:
- return
- if dedup_key and self._is_duplicate(dedup_key):
- return
- item = {
- "title": title,
- "detail": detail,
- "mention": mention,
- "table": table,
- "env": env,
- "mention_users": mention_users,
- }
- try:
- self.queue.put_nowait(item)
- except asyncio.QueueFull:
- self._dropped_count += 1
- logger.warning(f"Alert queue full, dropped alert: {title}")
- def _is_duplicate(self, dedup_key: str) -> bool:
- now = time.time()
- while self._recent_alerts and now - self._recent_alerts[0][1] > 60:
- self._recent_alerts.popleft()
- if any(key == dedup_key for key, _ in self._recent_alerts):
- logger.debug(f"Alert deduplicated: {dedup_key}")
- return True
- self._recent_alerts.append((dedup_key, now))
- return False
- async def _worker(self):
- while self._running:
- try:
- item = await self.queue.get()
- try:
- await self.feishu.bot(
- title=item["title"],
- detail=item["detail"],
- mention=item["mention"],
- table=item["table"],
- env=item["env"],
- mention_users=item["mention_users"],
- )
- except Exception as e:
- logger.error(f"Failed to send alert: {e}")
- finally:
- self.queue.task_done()
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.exception(f"AlertService worker error: {e}")
|