logs_websocket.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. """
  2. Logs WebSocket - 实时推送后端日志到前端
  3. """
  4. import asyncio
  5. import logging
  6. from typing import Set
  7. from fastapi import APIRouter, WebSocket, WebSocketDisconnect
  8. from datetime import datetime
  9. router = APIRouter(prefix="/api/logs", tags=["logs"])
  10. # 存储所有连接的WebSocket客户端
  11. _clients: Set[WebSocket] = set()
  12. class WebSocketLogHandler(logging.Handler):
  13. """自定义日志处理器,将日志推送到WebSocket客户端"""
  14. def emit(self, record: logging.LogRecord):
  15. """发送日志记录到所有连接的客户端"""
  16. try:
  17. log_entry = self.format(record)
  18. # 构造日志消息
  19. message = {
  20. "timestamp": datetime.now().isoformat(),
  21. "level": record.levelname,
  22. "name": record.name,
  23. "message": log_entry,
  24. }
  25. # uvicorn 热重载线程等非主线程打印日志时,可能没有 running loop
  26. try:
  27. loop = asyncio.get_running_loop()
  28. # 如果能在当前循环找到,说明在协程中,安全地抛出一个 task
  29. loop.create_task(broadcast_log(message))
  30. except RuntimeError:
  31. # 找不到 running loop 说明不是在 async 上下文下,我们需要新开一个临时事件循环发消息
  32. # 或者更简单的方式是:在没循环时就暂时丢弃/降级,因为没人监听的时候发了也没意义。
  33. # 由于这只是实时看日志的前端需求,如果在纯同步上下文中可以采用 run_coroutine_threadsafe 交给一个已知的 loop(但这里拿不到全局 loop)。
  34. # 我们这里使用兼容写法:短暂新开 loop 去触发一次网络写入(或者最稳妥的是:当前线程没有客户端也就没必要强求广播了)。
  35. if _clients:
  36. asyncio.run(broadcast_log(message))
  37. except Exception:
  38. self.handleError(record)
  39. async def broadcast_log(message: dict):
  40. """广播日志消息到所有连接的客户端"""
  41. disconnected = set()
  42. for client in _clients:
  43. try:
  44. await client.send_json(message)
  45. except Exception:
  46. disconnected.add(client)
  47. # 移除断开连接的客户端
  48. for client in disconnected:
  49. _clients.discard(client)
  50. @router.websocket("/watch")
  51. async def logs_websocket(websocket: WebSocket):
  52. """
  53. 日志WebSocket端点
  54. 客户端连接后,实时接收后端日志
  55. """
  56. await websocket.accept()
  57. _clients.add(websocket)
  58. try:
  59. # 发送欢迎消息
  60. await websocket.send_json({
  61. "timestamp": datetime.now().isoformat(),
  62. "level": "INFO",
  63. "name": "logs_websocket",
  64. "message": "Connected to logs stream",
  65. })
  66. # 保持连接,等待客户端断开
  67. while True:
  68. # 接收客户端消息(用于保持连接)
  69. await websocket.receive_text()
  70. except WebSocketDisconnect:
  71. pass
  72. finally:
  73. _clients.discard(websocket)
  74. def setup_websocket_logging(level=logging.INFO):
  75. """
  76. 设置WebSocket日志处理器
  77. 将根日志器的日志推送到WebSocket客户端
  78. """
  79. handler = WebSocketLogHandler()
  80. handler.setLevel(level)
  81. # 设置日志格式
  82. formatter = logging.Formatter(
  83. "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
  84. datefmt="%Y-%m-%d %H:%M:%S"
  85. )
  86. handler.setFormatter(formatter)
  87. # 添加到根日志器
  88. root_logger = logging.getLogger()
  89. root_logger.addHandler(handler)
  90. return handler