alert_service.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import asyncio
  2. import contextlib
  3. import logging
  4. import time
  5. from collections import deque
  6. from typing import Any, Dict, Optional
  7. logger = logging.getLogger(__name__)
  8. class AlertService:
  9. """异步告警服务,避免告警发送阻塞主任务链路"""
  10. _instance: Optional["AlertService"] = None
  11. def __init__(self, feishu_client, max_queue_size: int = 1000):
  12. self.feishu = feishu_client
  13. self.queue: Optional[asyncio.Queue] = None
  14. self._worker_task: Optional[asyncio.Task] = None
  15. self._running = False
  16. self._max_queue_size = max_queue_size
  17. self._recent_alerts = deque(maxlen=200)
  18. self._dropped_count = 0
  19. @classmethod
  20. def initialize(cls, feishu_client, max_queue_size: int = 1000) -> "AlertService":
  21. if cls._instance is None:
  22. cls._instance = cls(feishu_client, max_queue_size=max_queue_size)
  23. return cls._instance
  24. @classmethod
  25. def get_instance(cls) -> Optional["AlertService"]:
  26. return cls._instance
  27. async def start(self):
  28. if self._running:
  29. return
  30. self.queue = asyncio.Queue(maxsize=self._max_queue_size)
  31. self._running = True
  32. self._worker_task = asyncio.create_task(self._worker(), name="alert_service")
  33. logger.info("AlertService started")
  34. async def stop(self, drain_timeout: float = 5.0):
  35. if not self._running:
  36. return
  37. self._running = False
  38. if self.queue and self.queue.qsize() > 0:
  39. logger.info(f"AlertService draining {self.queue.qsize()} alerts...")
  40. try:
  41. await asyncio.wait_for(self.queue.join(), timeout=drain_timeout)
  42. except asyncio.TimeoutError:
  43. logger.warning(
  44. f"AlertService drain timeout, {self.queue.qsize()} alerts remaining"
  45. )
  46. if self._worker_task:
  47. self._worker_task.cancel()
  48. with contextlib.suppress(asyncio.CancelledError):
  49. await self._worker_task
  50. if self._dropped_count > 0:
  51. logger.warning(f"AlertService stopped, dropped alerts: {self._dropped_count}")
  52. self._worker_task = None
  53. self.queue = None
  54. async def send_alert(
  55. self,
  56. title: str,
  57. detail: Dict[str, Any],
  58. mention: bool = True,
  59. table: bool = False,
  60. env: str = "long_articles_task",
  61. mention_users=None,
  62. dedup_key: Optional[str] = None,
  63. ):
  64. if not self._running or self.queue is None:
  65. return
  66. if dedup_key and self._is_duplicate(dedup_key):
  67. return
  68. item = {
  69. "title": title,
  70. "detail": detail,
  71. "mention": mention,
  72. "table": table,
  73. "env": env,
  74. "mention_users": mention_users,
  75. }
  76. try:
  77. self.queue.put_nowait(item)
  78. except asyncio.QueueFull:
  79. self._dropped_count += 1
  80. logger.warning(f"Alert queue full, dropped alert: {title}")
  81. def _is_duplicate(self, dedup_key: str) -> bool:
  82. now = time.time()
  83. while self._recent_alerts and now - self._recent_alerts[0][1] > 60:
  84. self._recent_alerts.popleft()
  85. if any(key == dedup_key for key, _ in self._recent_alerts):
  86. logger.debug(f"Alert deduplicated: {dedup_key}")
  87. return True
  88. self._recent_alerts.append((dedup_key, now))
  89. return False
  90. async def _worker(self):
  91. while self._running:
  92. try:
  93. item = await self.queue.get()
  94. try:
  95. await self.feishu.bot(
  96. title=item["title"],
  97. detail=item["detail"],
  98. mention=item["mention"],
  99. table=item["table"],
  100. env=item["env"],
  101. mention_users=item["mention_users"],
  102. )
  103. except Exception as e:
  104. logger.error(f"Failed to send alert: {e}")
  105. finally:
  106. self.queue.task_done()
  107. except asyncio.CancelledError:
  108. break
  109. except Exception as e:
  110. logger.exception(f"AlertService worker error: {e}")