""" Logs WebSocket - 实时推送后端日志到前端 """ import asyncio import logging from typing import Set from fastapi import APIRouter, WebSocket, WebSocketDisconnect from datetime import datetime router = APIRouter(prefix="/api/logs", tags=["logs"]) # 存储所有连接的WebSocket客户端 _clients: Set[WebSocket] = set() class WebSocketLogHandler(logging.Handler): """自定义日志处理器,将日志推送到WebSocket客户端""" def emit(self, record: logging.LogRecord): """发送日志记录到所有连接的客户端""" try: log_entry = self.format(record) # 构造日志消息 message = { "timestamp": datetime.now().isoformat(), "level": record.levelname, "name": record.name, "message": log_entry, } # uvicorn 热重载线程等非主线程打印日志时,可能没有 running loop try: loop = asyncio.get_running_loop() # 如果能在当前循环找到,说明在协程中,安全地抛出一个 task loop.create_task(broadcast_log(message)) except RuntimeError: # 找不到 running loop 说明不是在 async 上下文下,我们需要新开一个临时事件循环发消息 # 或者更简单的方式是:在没循环时就暂时丢弃/降级,因为没人监听的时候发了也没意义。 # 由于这只是实时看日志的前端需求,如果在纯同步上下文中可以采用 run_coroutine_threadsafe 交给一个已知的 loop(但这里拿不到全局 loop)。 # 我们这里使用兼容写法:短暂新开 loop 去触发一次网络写入(或者最稳妥的是:当前线程没有客户端也就没必要强求广播了)。 if _clients: asyncio.run(broadcast_log(message)) except Exception: self.handleError(record) async def broadcast_log(message: dict): """广播日志消息到所有连接的客户端""" disconnected = set() for client in _clients: try: await client.send_json(message) except Exception: disconnected.add(client) # 移除断开连接的客户端 for client in disconnected: _clients.discard(client) @router.websocket("/watch") async def logs_websocket(websocket: WebSocket): """ 日志WebSocket端点 客户端连接后,实时接收后端日志 """ await websocket.accept() _clients.add(websocket) try: # 发送欢迎消息 await websocket.send_json({ "timestamp": datetime.now().isoformat(), "level": "INFO", "name": "logs_websocket", "message": "Connected to logs stream", }) # 保持连接,等待客户端断开 while True: # 接收客户端消息(用于保持连接) await websocket.receive_text() except WebSocketDisconnect: pass finally: _clients.discard(websocket) def setup_websocket_logging(level=logging.INFO): """ 设置WebSocket日志处理器 将根日志器的日志推送到WebSocket客户端 """ handler = WebSocketLogHandler() handler.setLevel(level) # 设置日志格式 formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) handler.setFormatter(formatter) # 添加到根日志器 root_logger = logging.getLogger() root_logger.addHandler(handler) return handler