log_service.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import asyncio
  2. import contextlib
  3. import json
  4. import logging
  5. import sys
  6. import time
  7. import traceback
  8. from typing import Optional
  9. from aliyun.log import LogClient, PutLogsRequest, LogItem
  10. from app.core.config.settings import AliyunLogConfig
  11. logger = logging.getLogger(__name__)
  12. class LogService:
  13. def __init__(self, log_config: AliyunLogConfig):
  14. self.config = log_config
  15. self.client: Optional[LogClient] = None
  16. self.queue: Optional[asyncio.Queue] = None
  17. self._worker_task: Optional[asyncio.Task] = None
  18. self._running = False
  19. # metrics
  20. self._dropped_count = 0
  21. self._last_drop_warn_time = 0
  22. async def start(self):
  23. if self._running:
  24. return
  25. self.client = LogClient(
  26. self.config.endpoint,
  27. self.config.access_key_id,
  28. self.config.access_key_secret,
  29. )
  30. self.queue = asyncio.Queue(maxsize=10000)
  31. self._running = True
  32. self._worker_task = asyncio.create_task(self._worker())
  33. async def stop(self, drain_timeout: float = 10.0):
  34. if not self._running:
  35. return
  36. # 1. 停止接收新日志
  37. self._running = False
  38. # 2. drain 队列(带超时)
  39. if self.queue and self.queue.qsize() > 0:
  40. remaining = self.queue.qsize()
  41. logger.info(f"LogService draining {remaining} pending logs...")
  42. try:
  43. await asyncio.wait_for(self._drain_remaining(), timeout=drain_timeout)
  44. except asyncio.TimeoutError:
  45. logger.warning(
  46. f"LogService drain timeout, {self.queue.qsize()} logs lost"
  47. )
  48. # 3. 停止 worker
  49. if self._worker_task:
  50. self._worker_task.cancel()
  51. with contextlib.suppress(asyncio.CancelledError):
  52. await self._worker_task
  53. if self._dropped_count > 0:
  54. logger.warning(f"LogService stopped, total dropped: {self._dropped_count}")
  55. self._worker_task = None
  56. self.queue = None
  57. self.client = None
  58. async def _drain_remaining(self):
  59. """把队列里剩余的日志全部发出去"""
  60. while not self.queue.empty():
  61. try:
  62. contents = self.queue.get_nowait()
  63. await asyncio.to_thread(self._put_log, contents)
  64. self.queue.task_done()
  65. except asyncio.QueueEmpty:
  66. break
  67. except Exception as e:
  68. logger.error(f"LogService drain error: {e}")
  69. self.queue.task_done()
  70. async def log(self, contents: dict):
  71. if not self._running or self.queue is None:
  72. return
  73. try:
  74. self.queue.put_nowait(contents)
  75. except asyncio.QueueFull:
  76. self._dropped_count += 1
  77. # 每 60 秒最多告警一次
  78. now = time.time()
  79. if now - self._last_drop_warn_time > 60:
  80. self._last_drop_warn_time = now
  81. logger.warning(
  82. f"LogService queue full, dropped {self._dropped_count} logs total"
  83. )
  84. # 关键事件降级到 stderr
  85. event_type = contents.get("event_type", "")
  86. if event_type in ("task_failed", "task_error", "task_cancelled"):
  87. print(
  88. f"[CRITICAL LOG DROPPED] {json.dumps(contents, ensure_ascii=False, default=str)}",
  89. file=sys.stderr,
  90. )
  91. def get_metrics(self) -> dict:
  92. """获取日志服务运行指标"""
  93. return {
  94. "queue_size": self.queue.qsize() if self.queue else 0,
  95. "queue_maxsize": self.queue.maxsize if self.queue else 0,
  96. "dropped_count": self._dropped_count,
  97. "is_running": self._running,
  98. }
  99. async def _worker(self):
  100. try:
  101. while self._running:
  102. contents = await self.queue.get()
  103. try:
  104. await asyncio.to_thread(self._put_log, contents)
  105. except Exception as e:
  106. logger.error(f"LogService put_log failed: {e}")
  107. finally:
  108. self.queue.task_done()
  109. except asyncio.CancelledError:
  110. pass
  111. def _put_log(self, contents: dict):
  112. import datetime
  113. timestamp = int(time.time())
  114. contents["datetime"] = datetime.datetime.now().isoformat()
  115. safe_items = [
  116. (
  117. str(k),
  118. json.dumps(v, ensure_ascii=False)
  119. if isinstance(v, (dict, list))
  120. else str(v),
  121. )
  122. for k, v in contents.items()
  123. ]
  124. log_item = LogItem(timestamp=timestamp, contents=safe_items)
  125. req = PutLogsRequest(
  126. self.config.project,
  127. self.config.logstore,
  128. topic="",
  129. source="",
  130. logitems=[log_item],
  131. )
  132. self.client.put_logs(req)