""" 并发日志捕获工具 支持多个并发任务独立记录日志,避免日志混乱 参考 content_finder 的日志捕获实现 """ import logging import contextvars from io import StringIO from typing import Optional # 上下文变量:每个任务独立的日志缓冲区 log_buffer_var: contextvars.ContextVar[Optional[StringIO]] = contextvars.ContextVar( "log_buffer", default=None ) class ContextBufferHandler(logging.Handler): """将日志写入上下文变量的缓冲区""" def emit(self, record: logging.LogRecord): buffer = log_buffer_var.get() if buffer is not None: msg = self.format(record) buffer.write(msg + "\n") def setup_concurrent_logging(): """设置并发日志系统""" root_logger = logging.getLogger() # 添加上下文缓冲处理器 buffer_handler = ContextBufferHandler() buffer_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ) root_logger.addHandler(buffer_handler) class LogCapture: """日志捕获上下文管理器""" def __init__(self): self.buffer = StringIO() self.token = None def __enter__(self): self.token = log_buffer_var.set(self.buffer) return self def __exit__(self, exc_type, exc_val, exc_tb): log_buffer_var.reset(self.token) def get_logs(self) -> str: """获取捕获的日志""" return self.buffer.getvalue() # 使用示例 # with LogCapture() as capture: # logger.info("这条日志会被捕获") # logs = capture.get_logs()