log_capture.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. """
  2. 并发日志捕获工具
  3. 支持多个并发任务独立记录日志,避免日志混乱
  4. 参考 content_finder 的日志捕获实现
  5. """
  6. import logging
  7. import contextvars
  8. from io import StringIO
  9. from typing import Optional
  10. # 上下文变量:每个任务独立的日志缓冲区
  11. log_buffer_var: contextvars.ContextVar[Optional[StringIO]] = contextvars.ContextVar(
  12. "log_buffer", default=None
  13. )
  14. class ContextBufferHandler(logging.Handler):
  15. """将日志写入上下文变量的缓冲区"""
  16. def emit(self, record: logging.LogRecord):
  17. buffer = log_buffer_var.get()
  18. if buffer is not None:
  19. msg = self.format(record)
  20. buffer.write(msg + "\n")
  21. def setup_concurrent_logging():
  22. """设置并发日志系统"""
  23. root_logger = logging.getLogger()
  24. # 添加上下文缓冲处理器
  25. buffer_handler = ContextBufferHandler()
  26. buffer_handler.setFormatter(
  27. logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
  28. )
  29. root_logger.addHandler(buffer_handler)
  30. class LogCapture:
  31. """日志捕获上下文管理器"""
  32. def __init__(self):
  33. self.buffer = StringIO()
  34. self.token = None
  35. def __enter__(self):
  36. self.token = log_buffer_var.set(self.buffer)
  37. return self
  38. def __exit__(self, exc_type, exc_val, exc_tb):
  39. log_buffer_var.reset(self.token)
  40. def get_logs(self) -> str:
  41. """获取捕获的日志"""
  42. return self.buffer.getvalue()
  43. # 使用示例
  44. # with LogCapture() as capture:
  45. # logger.info("这条日志会被捕获")
  46. # logs = capture.get_logs()