log_service.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import asyncio
  2. import traceback
  3. import time, json
  4. import datetime
  5. import contextlib
  6. from typing import Optional
  7. from aliyun.log import LogClient, PutLogsRequest, LogItem
  8. from app.core.config.settings import AliyunLogConfig
  9. class LogService:
  10. def __init__(self, log_config: AliyunLogConfig):
  11. self.config = log_config
  12. self.client: Optional[LogClient] = None
  13. self.queue: Optional[asyncio.Queue] = None
  14. self._worker_task: Optional[asyncio.Task] = None
  15. self._running = False
  16. async def start(self):
  17. if self._running:
  18. return
  19. self.client = LogClient(
  20. self.config.endpoint,
  21. self.config.access_key_id,
  22. self.config.access_key_secret,
  23. )
  24. self.queue = asyncio.Queue(maxsize=10000)
  25. self._running = True
  26. self._worker_task = asyncio.create_task(self._worker())
  27. async def stop(self):
  28. if not self._running:
  29. return
  30. self._running = False
  31. if self._worker_task:
  32. self._worker_task.cancel()
  33. with contextlib.suppress(asyncio.CancelledError):
  34. await self._worker_task
  35. self._worker_task = None
  36. self.queue = None
  37. self.client = None
  38. async def log(self, contents: dict):
  39. if not self._running or self.queue is None:
  40. return
  41. try:
  42. self.queue.put_nowait(contents)
  43. except asyncio.QueueFull:
  44. # 可以打 stderr / 统计丢日志数量
  45. pass
  46. async def _worker(self):
  47. try:
  48. while self._running:
  49. contents = await self.queue.get()
  50. try:
  51. await asyncio.to_thread(self._put_log, contents)
  52. except Exception as e:
  53. print(f"[Log Error] {e}")
  54. print(traceback.format_exc())
  55. except asyncio.CancelledError:
  56. pass
  57. def _put_log(self, contents: dict):
  58. timestamp = int(time.time())
  59. contents["datetime"] = datetime.datetime.now().isoformat()
  60. safe_items = [
  61. (
  62. str(k),
  63. json.dumps(v, ensure_ascii=False)
  64. if isinstance(v, (dict, list))
  65. else str(v),
  66. )
  67. for k, v in contents.items()
  68. ]
  69. log_item = LogItem(timestamp=timestamp, contents=safe_items)
  70. req = PutLogsRequest(
  71. self.config.project,
  72. self.config.logstore,
  73. topic="",
  74. source="",
  75. logitems=[log_item],
  76. )
  77. self.client.put_logs(req)