log_capture.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. """
  2. Tee 日志捕获工具
  3. 支持多 Agent 并发执行:
  4. - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer
  5. - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer
  6. - 同时输出到真实 stdout,不劫持 sys.stdout
  7. """
  8. import io
  9. import sys
  10. import contextvars
  11. import threading
  12. from contextlib import contextmanager
  13. # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播)
  14. _current_build_id: contextvars.ContextVar[int | None] = contextvars.ContextVar(
  15. 'log_build_id', default=None
  16. )
  17. # build_id → StringIO buffer 的全局注册表(线程安全)
  18. _buffers: dict[int, io.StringIO] = {}
  19. _buffers_lock = threading.Lock()
  20. # 保存真实 stdout(进程启动时的值,不会被覆盖)
  21. _real_stdout = sys.stdout
  22. def log(*args, **kwargs):
  23. """并发安全的日志函数,替代 print()。
  24. 同时输出到 stdout 和当前 Agent 的日志 buffer。
  25. 如果不在 Agent 上下文中,等同于普通 print()。
  26. """
  27. # 1. 始终输出到真实 stdout
  28. print(*args, file=_real_stdout, **kwargs)
  29. # 2. 如果在 Agent 上下文中,额外写入 buffer
  30. build_id = _current_build_id.get()
  31. if build_id is not None:
  32. buf = _buffers.get(build_id)
  33. if buf is not None:
  34. print(*args, file=buf, **kwargs)
  35. @contextmanager
  36. def build_log(build_id: int):
  37. """Agent 执行的日志上下文管理器。
  38. 使用方式:
  39. with build_log(build_id):
  40. log("这条会写入 buffer")
  41. ...
  42. # with 结束后仅清理内存缓冲区
  43. """
  44. buf = io.StringIO()
  45. token = _current_build_id.set(build_id)
  46. with _buffers_lock:
  47. _buffers[build_id] = buf
  48. try:
  49. yield buf
  50. finally:
  51. # 清理
  52. with _buffers_lock:
  53. _buffers.pop(build_id, None)
  54. _current_build_id.reset(token)
  55. buf.close()
  56. @contextmanager
  57. def log_fold(label: str):
  58. """可折叠日志块的上下文管理器"""
  59. log(f"[FOLD:{label}]")
  60. try:
  61. yield
  62. finally:
  63. log("[/FOLD]")
  64. def get_log_content(build_id: int) -> str | None:
  65. """获取指定 build 当前已收集的日志内容(用于实时查看)"""
  66. buf = _buffers.get(build_id)
  67. return buf.getvalue() if buf else None
  68. def _save_to_db(build_id: int, content: str) -> bool:
  69. """兼容旧接口:已禁用 DB 落库。"""
  70. return False
  71. # ============================================================================
  72. # 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py)
  73. # ============================================================================
  74. class TeeStream(io.TextIOBase):
  75. """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区
  76. ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。
  77. """
  78. def __init__(self, original_stdout):
  79. super().__init__()
  80. self.original_stdout = original_stdout
  81. self._buffer = io.StringIO()
  82. def write(self, s):
  83. if s:
  84. self.original_stdout.write(s)
  85. self._buffer.write(s)
  86. return len(s) if s else 0
  87. def flush(self):
  88. self.original_stdout.flush()
  89. self._buffer.flush()
  90. def get_log(self) -> str:
  91. return self._buffer.getvalue()
  92. def save_to_db(self, build_id: int) -> bool:
  93. return False
  94. @property
  95. def encoding(self):
  96. return self.original_stdout.encoding
  97. def isatty(self):
  98. return False
  99. def readable(self):
  100. return False
  101. def writable(self):
  102. return True