log_service.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import asyncio
  2. import traceback
  3. import time
  4. import json
  5. import datetime
  6. from aliyun.log import LogClient, PutLogsRequest, LogItem
  7. class LogService:
  8. def __init__(self, endpoint, access_key_id, access_key_secret, project, logstore):
  9. self.client = LogClient(endpoint, access_key_id, access_key_secret)
  10. self.project = project
  11. self.logstore = logstore
  12. self.queue = asyncio.Queue()
  13. self.running = False
  14. async def start(self):
  15. self.running = True
  16. asyncio.create_task(self._worker())
  17. async def stop(self):
  18. self.running = False
  19. async def log(self, contents: dict):
  20. """外部调用日志接口"""
  21. await self.queue.put(contents)
  22. async def _worker(self):
  23. while self.running:
  24. contents = await self.queue.get()
  25. try:
  26. await asyncio.to_thread(self._put_log, contents)
  27. except Exception as e:
  28. print(f"[Log Error] {e}")
  29. print(traceback.format_exc())
  30. def _put_log(self, contents: dict):
  31. timestamp = int(time.time())
  32. contents["datetime"] = datetime.datetime.now().__str__()
  33. safe_items = [
  34. (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
  35. for k, v in contents.items()
  36. ]
  37. log_item = LogItem(timestamp=timestamp, contents=safe_items)
  38. print(type(log_item))
  39. print(log_item)
  40. req = PutLogsRequest(
  41. self.project, self.logstore, topic="", source="", logitems=[log_item]
  42. )
  43. self.client.put_logs(req)