log_service.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import asyncio
  2. import traceback
  3. import time
  4. import json
  5. import datetime
  6. from aliyun.log import LogClient, PutLogsRequest, LogItem
  7. class LogService:
  8. def __init__(self, endpoint, access_key_id, access_key_secret, project, logstore):
  9. self.client = LogClient(endpoint, access_key_id, access_key_secret)
  10. self.project = project
  11. self.logstore = logstore
  12. self.queue = asyncio.Queue()
  13. self.running = False
  14. async def start(self):
  15. self.running = True
  16. asyncio.create_task(self._worker())
  17. async def stop(self):
  18. self.running = False
  19. async def log(self, contents: dict):
  20. """外部调用日志接口"""
  21. await self.queue.put(contents)
  22. async def _worker(self):
  23. while self.running:
  24. contents = await self.queue.get()
  25. try:
  26. await asyncio.to_thread(self._put_log, contents)
  27. except Exception as e:
  28. print(f"[Log Error] {e}")
  29. print(traceback.format_exc())
  30. def _put_log(self, contents: dict):
  31. timestamp = int(time.time())
  32. contents["datetime"] = datetime.datetime.now().__str__()
  33. safe_items = [
  34. (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
  35. for k, v in contents.items()
  36. ]
  37. log_item = LogItem(timestamp=timestamp, contents=safe_items)
  38. req = PutLogsRequest(
  39. self.project, self.logstore, topic="", source="", logitems=[log_item]
  40. )
  41. self.client.put_logs(req)