| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- """
- 并发日志捕获工具
- 支持多个并发任务独立记录日志,避免日志混乱
- 参考 content_finder 的日志捕获实现
- """
- import logging
- import contextvars
- from io import StringIO
- from typing import Optional
- # 上下文变量:每个任务独立的日志缓冲区
- log_buffer_var: contextvars.ContextVar[Optional[StringIO]] = contextvars.ContextVar(
- "log_buffer", default=None
- )
- class ContextBufferHandler(logging.Handler):
- """将日志写入上下文变量的缓冲区"""
- def emit(self, record: logging.LogRecord):
- buffer = log_buffer_var.get()
- if buffer is not None:
- msg = self.format(record)
- buffer.write(msg + "\n")
- def setup_concurrent_logging():
- """设置并发日志系统"""
- root_logger = logging.getLogger()
- # 添加上下文缓冲处理器
- buffer_handler = ContextBufferHandler()
- buffer_handler.setFormatter(
- logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
- )
- root_logger.addHandler(buffer_handler)
- class LogCapture:
- """日志捕获上下文管理器"""
- def __init__(self):
- self.buffer = StringIO()
- self.token = None
- def __enter__(self):
- self.token = log_buffer_var.set(self.buffer)
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- log_buffer_var.reset(self.token)
- def get_logs(self) -> str:
- """获取捕获的日志"""
- return self.buffer.getvalue()
- # 使用示例
- # with LogCapture() as capture:
- # logger.info("这条日志会被捕获")
- # logs = capture.get_logs()
|