import asyncio import traceback import time, json import datetime import contextlib from typing import Optional from aliyun.log import LogClient, PutLogsRequest, LogItem from app.core.config.settings import AliyunLogConfig class LogService: def __init__(self, log_config: AliyunLogConfig): self.config = log_config self.client: Optional[LogClient] = None self.queue: Optional[asyncio.Queue] = None self._worker_task: Optional[asyncio.Task] = None self._running = False async def start(self): if self._running: return self.client = LogClient( self.config.endpoint, self.config.access_key_id, self.config.access_key_secret, ) self.queue = asyncio.Queue(maxsize=10000) self._running = True self._worker_task = asyncio.create_task(self._worker()) async def stop(self): if not self._running: return self._running = False if self._worker_task: self._worker_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._worker_task self._worker_task = None self.queue = None self.client = None async def log(self, contents: dict): if not self._running or self.queue is None: return try: self.queue.put_nowait(contents) except asyncio.QueueFull: # 可以打 stderr / 统计丢日志数量 pass async def _worker(self): try: while self._running: contents = await self.queue.get() try: await asyncio.to_thread(self._put_log, contents) except Exception as e: print(f"[Log Error] {e}") print(traceback.format_exc()) except asyncio.CancelledError: pass def _put_log(self, contents: dict): timestamp = int(time.time()) contents["datetime"] = datetime.datetime.now().isoformat() safe_items = [ ( str(k), json.dumps(v, ensure_ascii=False) if isinstance(v, (dict, list)) else str(v), ) for k, v in contents.items() ] log_item = LogItem(timestamp=timestamp, contents=safe_items) req = PutLogsRequest( self.config.project, self.config.logstore, topic="", source="", logitems=[log_item], ) self.client.put_logs(req)