| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import asyncio
- import traceback
- import time, json
- import datetime
- import contextlib
- from typing import Optional
- from aliyun.log import LogClient, PutLogsRequest, LogItem
- from app.core.config.settings import AliyunLogConfig
- class LogService:
- def __init__(self, log_config: AliyunLogConfig):
- self.config = log_config
- self.client: Optional[LogClient] = None
- self.queue: Optional[asyncio.Queue] = None
- self._worker_task: Optional[asyncio.Task] = None
- self._running = False
- async def start(self):
- if self._running:
- return
- self.client = LogClient(
- self.config.endpoint,
- self.config.access_key_id,
- self.config.access_key_secret,
- )
- self.queue = asyncio.Queue(maxsize=10000)
- self._running = True
- self._worker_task = asyncio.create_task(self._worker())
- async def stop(self):
- if not self._running:
- return
- self._running = False
- if self._worker_task:
- self._worker_task.cancel()
- with contextlib.suppress(asyncio.CancelledError):
- await self._worker_task
- self._worker_task = None
- self.queue = None
- self.client = None
- async def log(self, contents: dict):
- if not self._running or self.queue is None:
- return
- try:
- self.queue.put_nowait(contents)
- except asyncio.QueueFull:
- # 可以打 stderr / 统计丢日志数量
- pass
- async def _worker(self):
- try:
- while self._running:
- contents = await self.queue.get()
- try:
- await asyncio.to_thread(self._put_log, contents)
- except Exception as e:
- print(f"[Log Error] {e}")
- print(traceback.format_exc())
- except asyncio.CancelledError:
- pass
- def _put_log(self, contents: dict):
- timestamp = int(time.time())
- contents["datetime"] = datetime.datetime.now().isoformat()
- safe_items = [
- (
- str(k),
- json.dumps(v, ensure_ascii=False)
- if isinstance(v, (dict, list))
- else str(v),
- )
- for k, v in contents.items()
- ]
- log_item = LogItem(timestamp=timestamp, contents=safe_items)
- req = PutLogsRequest(
- self.config.project,
- self.config.logstore,
- topic="",
- source="",
- logitems=[log_item],
- )
- self.client.put_logs(req)
|