log_service.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import asyncio
  2. import traceback
  3. import time
  4. import json
  5. import datetime
  6. from aliyun.log import LogClient, PutLogsRequest, LogItem
  7. from app.core.config.settings import AliyunLogConfig
  8. class LogService:
  9. def __init__(self, log_config: AliyunLogConfig):
  10. endpoint = log_config.endpoint
  11. access_key_id = log_config.access_key_id
  12. access_key_secret = log_config.access_key_secret
  13. self.client = LogClient(endpoint, access_key_id, access_key_secret)
  14. self.project = log_config.project
  15. self.logstore = log_config.logstore
  16. self.queue = asyncio.Queue()
  17. self.running = False
  18. async def start(self):
  19. self.running = True
  20. asyncio.create_task(self._worker())
  21. async def stop(self):
  22. self.running = False
  23. async def log(self, contents: dict):
  24. """外部调用日志接口"""
  25. await self.queue.put(contents)
  26. async def _worker(self):
  27. while self.running:
  28. contents = await self.queue.get()
  29. try:
  30. await asyncio.to_thread(self._put_log, contents)
  31. except Exception as e:
  32. print(f"[Log Error] {e}")
  33. print(traceback.format_exc())
  34. def _put_log(self, contents: dict):
  35. timestamp = int(time.time())
  36. contents["datetime"] = datetime.datetime.now().__str__()
  37. safe_items = [
  38. (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
  39. for k, v in contents.items()
  40. ]
  41. log_item = LogItem(timestamp=timestamp, contents=safe_items)
  42. req = PutLogsRequest(
  43. self.project, self.logstore, topic="", source="", logitems=[log_item]
  44. )
  45. self.client.put_logs(req)