| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- import asyncio
- import contextlib
- import json
- import logging
- import sys
- import time
- import traceback
- from typing import Optional
- from aliyun.log import LogClient, PutLogsRequest, LogItem
- from app.core.config.settings import AliyunLogConfig
- logger = logging.getLogger(__name__)
- class LogService:
- def __init__(self, log_config: AliyunLogConfig):
- self.config = log_config
- self.client: Optional[LogClient] = None
- self.queue: Optional[asyncio.Queue] = None
- self._worker_task: Optional[asyncio.Task] = None
- self._running = False
- # metrics
- self._dropped_count = 0
- self._last_drop_warn_time = 0
- async def start(self):
- if self._running:
- return
- self.client = LogClient(
- self.config.endpoint,
- self.config.access_key_id,
- self.config.access_key_secret,
- )
- self.queue = asyncio.Queue(maxsize=10000)
- self._running = True
- self._worker_task = asyncio.create_task(self._worker())
- async def stop(self, drain_timeout: float = 10.0):
- if not self._running:
- return
- # 1. 停止接收新日志
- self._running = False
- # 2. drain 队列(带超时)
- if self.queue and self.queue.qsize() > 0:
- remaining = self.queue.qsize()
- logger.info(f"LogService draining {remaining} pending logs...")
- try:
- await asyncio.wait_for(self._drain_remaining(), timeout=drain_timeout)
- except asyncio.TimeoutError:
- logger.warning(
- f"LogService drain timeout, {self.queue.qsize()} logs lost"
- )
- # 3. 停止 worker
- if self._worker_task:
- self._worker_task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await self._worker_task
- if self._dropped_count > 0:
- logger.warning(f"LogService stopped, total dropped: {self._dropped_count}")
- self._worker_task = None
- self.queue = None
- self.client = None
- async def _drain_remaining(self):
- """把队列里剩余的日志全部发出去"""
- while not self.queue.empty():
- try:
- contents = self.queue.get_nowait()
- await asyncio.to_thread(self._put_log, contents)
- self.queue.task_done()
- except asyncio.QueueEmpty:
- break
- except Exception as e:
- logger.error(f"LogService drain error: {e}")
- self.queue.task_done()
- async def log(self, contents: dict):
- if not self._running or self.queue is None:
- return
- try:
- self.queue.put_nowait(contents)
- except asyncio.QueueFull:
- self._dropped_count += 1
- # 每 60 秒最多告警一次
- now = time.time()
- if now - self._last_drop_warn_time > 60:
- self._last_drop_warn_time = now
- logger.warning(
- f"LogService queue full, dropped {self._dropped_count} logs total"
- )
- # 关键事件降级到 stderr
- event_type = contents.get("event_type", "")
- if event_type in ("task_failed", "task_error", "task_cancelled"):
- print(
- f"[CRITICAL LOG DROPPED] {json.dumps(contents, ensure_ascii=False, default=str)}",
- file=sys.stderr,
- )
- def get_metrics(self) -> dict:
- """获取日志服务运行指标"""
- return {
- "queue_size": self.queue.qsize() if self.queue else 0,
- "queue_maxsize": self.queue.maxsize if self.queue else 0,
- "dropped_count": self._dropped_count,
- "is_running": self._running,
- }
- async def _worker(self):
- try:
- while self._running:
- contents = await self.queue.get()
- try:
- await asyncio.to_thread(self._put_log, contents)
- except Exception as e:
- logger.error(f"LogService put_log failed: {e}")
- finally:
- self.queue.task_done()
- except asyncio.CancelledError:
- pass
- def _put_log(self, contents: dict):
- import datetime
- timestamp = int(time.time())
- contents["datetime"] = datetime.datetime.now().isoformat()
- safe_items = [
- (
- str(k),
- json.dumps(v, ensure_ascii=False)
- if isinstance(v, (dict, list))
- else str(v),
- )
- for k, v in contents.items()
- ]
- log_item = LogItem(timestamp=timestamp, contents=safe_items)
- req = PutLogsRequest(
- self.config.project,
- self.config.logstore,
- topic="",
- source="",
- logitems=[log_item],
- )
- self.client.put_logs(req)
|