""" Tee 日志捕获工具 支持多 Agent 并发执行: - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer - 同时输出到真实 stdout,不劫持 sys.stdout """ import io import sys import contextvars import threading from contextlib import contextmanager # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播) _current_build_id: contextvars.ContextVar[int | None] = contextvars.ContextVar( 'log_build_id', default=None ) # build_id → StringIO buffer 的全局注册表(线程安全) _buffers: dict[int, io.StringIO] = {} _buffers_lock = threading.Lock() # 保存真实 stdout(进程启动时的值,不会被覆盖) _real_stdout = sys.stdout def log(*args, **kwargs): """并发安全的日志函数,替代 print()。 同时输出到 stdout 和当前 Agent 的日志 buffer。 如果不在 Agent 上下文中,等同于普通 print()。 """ # 1. 始终输出到真实 stdout print(*args, file=_real_stdout, **kwargs) # 2. 如果在 Agent 上下文中,额外写入 buffer build_id = _current_build_id.get() if build_id is not None: buf = _buffers.get(build_id) if buf is not None: print(*args, file=buf, **kwargs) @contextmanager def build_log(build_id: int): """Agent 执行的日志上下文管理器。 使用方式: with build_log(build_id): log("这条会写入 buffer") ... # with 结束后仅清理内存缓冲区 """ buf = io.StringIO() token = _current_build_id.set(build_id) with _buffers_lock: _buffers[build_id] = buf try: yield buf finally: # 清理 with _buffers_lock: _buffers.pop(build_id, None) _current_build_id.reset(token) buf.close() @contextmanager def log_fold(label: str): """可折叠日志块的上下文管理器""" log(f"[FOLD:{label}]") try: yield finally: log("[/FOLD]") def get_log_content(build_id: int) -> str | None: """获取指定 build 当前已收集的日志内容(用于实时查看)""" buf = _buffers.get(build_id) return buf.getvalue() if buf else None def _save_to_db(build_id: int, content: str) -> bool: """兼容旧接口:已禁用 DB 落库。""" return False # ============================================================================ # 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py) # ============================================================================ class TeeStream(io.TextIOBase): """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区 ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。 """ def __init__(self, original_stdout): super().__init__() self.original_stdout = original_stdout self._buffer = io.StringIO() def write(self, s): if s: self.original_stdout.write(s) self._buffer.write(s) return len(s) if s else 0 def flush(self): self.original_stdout.flush() self._buffer.flush() def get_log(self) -> str: return self._buffer.getvalue() def save_to_db(self, build_id: int) -> bool: return False @property def encoding(self): return self.original_stdout.encoding def isatty(self): return False def readable(self): return False def writable(self): return True