| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import asyncio
- import traceback
- import time
- import json
- import datetime
- from aliyun.log import LogClient, PutLogsRequest, LogItem
- from app.core.config.settings import AliyunLogConfig
- class LogService:
- def __init__(self, log_config: AliyunLogConfig):
- endpoint = log_config.endpoint
- access_key_id = log_config.access_key_id
- access_key_secret = log_config.access_key_secret
- self.client = LogClient(endpoint, access_key_id, access_key_secret)
- self.project = log_config.project
- self.logstore = log_config.logstore
- self.queue = asyncio.Queue()
- self.running = False
- async def start(self):
- self.running = True
- asyncio.create_task(self._worker())
- async def stop(self):
- self.running = False
- async def log(self, contents: dict):
- """外部调用日志接口"""
- await self.queue.put(contents)
- async def _worker(self):
- while self.running:
- contents = await self.queue.get()
- try:
- await asyncio.to_thread(self._put_log, contents)
- except Exception as e:
- print(f"[Log Error] {e}")
- print(traceback.format_exc())
- def _put_log(self, contents: dict):
- timestamp = int(time.time())
- contents["datetime"] = datetime.datetime.now().__str__()
- safe_items = [
- (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
- for k, v in contents.items()
- ]
- log_item = LogItem(timestamp=timestamp, contents=safe_items)
- req = PutLogsRequest(
- self.project, self.logstore, topic="", source="", logitems=[log_item]
- )
- self.client.put_logs(req)
|