log_capture.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. """
  2. Tee 日志捕获工具
  3. 支持多 Agent 并发执行:
  4. - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer
  5. - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer
  6. - 同时输出到真实 stdout,不劫持 sys.stdout
  7. """
  8. import io
  9. import sys
  10. import contextvars
  11. import threading
  12. from pathlib import Path
  13. from typing import TextIO, Union
  14. from contextlib import contextmanager
  15. # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播)
  16. # 兼容历史:可能是 int,也可能是 uuid 字符串
  17. BuildId = Union[int, str]
  18. _current_build_id: contextvars.ContextVar[BuildId | None] = contextvars.ContextVar(
  19. "log_build_id", default=None
  20. )
  21. # build_id → StringIO buffer 的全局注册表(线程安全)
  22. _buffers: dict[BuildId, io.StringIO] = {}
  23. _buffers_lock = threading.Lock()
  24. # build_id → 追加写入的文件句柄(线程安全)
  25. _file_handles: dict[BuildId, TextIO] = {}
  26. _files_lock = threading.Lock()
  27. # 保存真实 stdout(进程启动时的值,不会被覆盖)
  28. _real_stdout = sys.stdout
  29. def attach_log_file(build_id: BuildId, file_path: str | Path) -> None:
  30. """
  31. 绑定实时落盘文件:后续 log() 会在写 stdout/buffer 的同时 append 到该文件。
  32. - 会自动创建父目录
  33. - 会以 utf-8 追加模式打开
  34. - 重复绑定同一个 build_id 会关闭旧句柄并替换
  35. """
  36. path = Path(file_path).expanduser()
  37. path.parent.mkdir(parents=True, exist_ok=True)
  38. fh = path.open("a", encoding="utf-8")
  39. with _files_lock:
  40. old = _file_handles.pop(build_id, None)
  41. if old is not None:
  42. try:
  43. old.close()
  44. except Exception:
  45. pass
  46. _file_handles[build_id] = fh
  47. def detach_log_file(build_id: BuildId) -> None:
  48. """解除绑定并关闭文件句柄(若存在)。"""
  49. with _files_lock:
  50. fh = _file_handles.pop(build_id, None)
  51. if fh is not None:
  52. try:
  53. fh.close()
  54. except Exception:
  55. pass
  56. def log(*args, **kwargs):
  57. """并发安全的日志函数,替代 print()。
  58. 同时输出到 stdout 和当前 Agent 的日志 buffer。
  59. 如果不在 Agent 上下文中,等同于普通 print()。
  60. """
  61. # 1. 始终输出到真实 stdout
  62. print(*args, file=_real_stdout, **kwargs)
  63. # 2. 如果在 Agent 上下文中,额外写入 buffer
  64. build_id = _current_build_id.get()
  65. if build_id is not None:
  66. buf = _buffers.get(build_id)
  67. if buf is not None:
  68. print(*args, file=buf, **kwargs)
  69. fh = _file_handles.get(build_id)
  70. if fh is not None:
  71. try:
  72. print(*args, file=fh, **kwargs)
  73. fh.flush()
  74. except Exception:
  75. # 文件写入失败不应影响主流程
  76. pass
  77. @contextmanager
  78. def build_log(build_id: BuildId):
  79. """Agent 执行的日志上下文管理器。
  80. 使用方式:
  81. with build_log(build_id):
  82. log("这条会写入 buffer")
  83. ...
  84. # with 结束后仅清理内存缓冲区
  85. """
  86. buf = io.StringIO()
  87. token = _current_build_id.set(build_id)
  88. with _buffers_lock:
  89. _buffers[build_id] = buf
  90. try:
  91. yield buf
  92. finally:
  93. # 清理
  94. with _buffers_lock:
  95. _buffers.pop(build_id, None)
  96. detach_log_file(build_id)
  97. _current_build_id.reset(token)
  98. buf.close()
  99. @contextmanager
  100. def log_fold(label: str):
  101. """可折叠日志块的上下文管理器"""
  102. log(f"[FOLD:{label}]")
  103. try:
  104. yield
  105. finally:
  106. log("[/FOLD]")
  107. def get_log_content(build_id: BuildId) -> str | None:
  108. """获取指定 build 当前已收集的日志内容(用于实时查看)"""
  109. buf = _buffers.get(build_id)
  110. return buf.getvalue() if buf else None
  111. def _save_to_db(build_id: int, content: str) -> bool:
  112. """兼容旧接口:已禁用 DB 落库。"""
  113. return False
  114. # ============================================================================
  115. # 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py)
  116. # ============================================================================
  117. class TeeStream(io.TextIOBase):
  118. """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区
  119. ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。
  120. """
  121. def __init__(self, original_stdout):
  122. super().__init__()
  123. self.original_stdout = original_stdout
  124. self._buffer = io.StringIO()
  125. def write(self, s):
  126. if s:
  127. self.original_stdout.write(s)
  128. self._buffer.write(s)
  129. return len(s) if s else 0
  130. def flush(self):
  131. self.original_stdout.flush()
  132. self._buffer.flush()
  133. def get_log(self) -> str:
  134. return self._buffer.getvalue()
  135. def save_to_db(self, build_id: int) -> bool:
  136. return False
  137. @property
  138. def encoding(self):
  139. return self.original_stdout.encoding
  140. def isatty(self):
  141. return False
  142. def readable(self):
  143. return False
  144. def writable(self):
  145. return True